Task 1
A.Data Source
(videocapture)


Simulate the ingestion of a live video stream using Python. You can use libraries like OpenCV to read video frames from a video file. Consider that as a live stream and provide code to capture frames continuously from the source.


2. Frame Processing
Develop a function or class that takes each incoming video frame and performs the
following actions:
a. Frame by frame process and create a json object for each frame.

b. Extract relevant information from the processed frame. The json must contain the
following information:
i. camera_id
ii. frame_id
iii. geo_location
iv. image_path (write the frames as jpg image file)


c. Consider that the streaming is 25 FPS. Hence for every second write any one frame
as an image file and reuse that file for the rest 24 frames. Hence it is enough to write
only one frame per second as an image file.


D. Simultaneously while processing frames, all the frame information must be written in a
json file.

3. Batching
As mentioned earlier, the duration of the video file (in secs) will be mentioned in the
config file. Based on the duration value, perform batching of above processed frame’s
information. Create a dictionary for every batch that consists of following keys:
i. batch_id
ii. starting_frame_id
iii. ending_frame_id
iv. timestamp

Here, In below class, Impelemented Task 1, First Three things, followed by applying conditions.

Every fuction is well commented to know. To end video Stream press "q" key

In [2]:
import cv2
import json
import time
import os
import math
import pyodbc

#Creating video-Streaming class
class VideoStreaming:
    def __init__(self, camera_id, geo_location, output_folder, config_filename):
        self.camera_id = camera_id
        self.geo_location = geo_location
        self.output_folder = output_folder
        self.frame_id = 0
        self.cap = cv2.VideoCapture(camera_id)
        self.config_filename = config_filename
        self.batch_duration = self.read_config_duration()[0]
        self.batch_size = self.read_config_duration()[1]
        
        # Create the output folder
        os.makedirs(output_folder, exist_ok=True)

        #JSON data list and batch list
        self.json_data = []
        self.batches = []

    def read_config_duration(self):
        # Read the duration from the config file
        with open(self.config_filename, "r") as config_file:
            config_data = json.load(config_file)
            duration = config_data.get("duration")
            batch_size = config_data.get("batch_size")
        return duration,batch_size

    def video_capture(self):
        # Check if the video stream was opened successfully
        if not self.cap.isOpened():
            exit()

    def frame_processing(self):
        # Set the desired frames per second (fps) based on condition
        desired_fps = 25
        frame_count = 0
        frame_to_save = None

        while True:
            # Read a frame from the video stream
            ret, frame = self.cap.read()

            # Check if the frame was successfully read
            if not ret:
                break

            frame_count += 1

            # Save one frame per second as an image file
            if frame_count % desired_fps == 0:
                outframe_id = (frame_count // desired_fps)
                frame_to_save = frame  # Save the frame to be reused
                frame_filename = os.path.join(self.output_folder, f"frame_{outframe_id}.jpg")
                cv2.imwrite(frame_filename, frame_to_save)  # Save the frame as an image

                # Append frame info to the JSON data list
                frame_info = {
                    "camera_id": self.camera_id,
                    "frame_id": outframe_id,
                    "geo_location": self.geo_location,
                    "image_path": frame_filename
                }
                self.json_data.append(frame_info)

            # Reuse the saved frame for the next 24 frames within the same second
            if frame_to_save is not None:
                cv2.imshow("Frame", frame_to_save)

            # Exit the loop when the 'q' key is pressed
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
            # Release the video capture object and close any open windows
        self.cap.release()
        cv2.destroyAllWindows()

    def batch_frames(self):     
    # Calculate the number of batches.
        num_batches = math.ceil(self.batch_duration / self.batch_size)

    # Create a dictionary for each batch.
        #batches = {}
        for i in range(num_batches):
            batch_id = i + 1

        # Calculate the starting and ending frame IDs for the batch.
            starting_frame_id = batch_id * self.batch_size - self.batch_size + 1
            ending_frame_id = min(starting_frame_id + self.batch_size - 1, self.batch_duration)

        # Calculate the timestamp of the batch.
            timestamp = starting_frame_id / self.batch_duration
        #timestamp = time.time()

        # Add the batch to the dictionary.
            batch = {
            "batch_id": batch_id,
            "starting_frame_id": starting_frame_id,
            "ending_frame_id": ending_frame_id,
            "timestamp": timestamp
            }
            self.batches.append(batch)
            return self.batches
        
    def save_json_data(self, json_filename):
        # Write JSON data to a JSON file
        with open(json_filename, "w") as json_file:
            json.dump(self.json_data, json_file, indent=4)

    def save_batch_info(self, batch_filename):
        with open(batch_filename, "w") as batch_file:
            json.dump(self.batches, batch_file, indent=4)

if __name__ == "__main__":
    camera_id = 0
    geo_location = "latitude: 123.456, longitude: 789.012"
    output_folder = "output_frames"
    config_filename = "config.json"
    json_filename = "frame_info.json"
    batch_filename = "batch_info.json"

    processor = VideoStreaming(camera_id, geo_location, output_folder, config_filename)
    processor.video_capture()
    processor.frame_processing()
    processor.batch_frames()
    processor.save_json_data(json_filename)
    processor.save_batch_info(batch_filename)


    print("Frame processing and batching complete. JSON data saved to:", json_filename)
    print("Batch information saved to:", batch_filename)


Frame processing and batching complete. JSON data saved to: frame_info.json
Batch information saved to: batch_info.json


3. Data Storage
Use any SQL Database and create necessary tables and columns to store batch
information. Every batch information must be logged in the DB.

here, I used local postgresql as to store data in a database table for every batch infomation of video stream.

In [8]:
import json
import psycopg2

#create a connection to database;
conncetion = psycopg2.connect(dbname="videostream",user="postgres",password="leo@#838",host="localhost",port=5432)


# Create a cursor object for executing SQL queries
cursor = conncetion.cursor()

with open('batch_info.json', 'r') as json_file:
    data = json.load(json_file)

# Define the INSERT SQL statement
sql_insert = "INSERT INTO logfiles (batch_id, starting_frame_id, ending_frame_id,timestamp) VALUES (%s, %s, %s, %s)"

# Insert records from the JSON data
for record in data:
    cursor.execute(sql_insert,(record['batch_id'], record['starting_frame_id'], record['ending_frame_id'],record['timestamp']))

# Commit the transaction
conncetion.commit()


# Close the cursor and connection
cursor.close()
conncetion.close()


Loging and Error Handling

4. Error Handling and Logging
Implement error handling and logging mechanisms in your code to capture and handle
exceptions that may occur during frame processing, data storage, or transmission.
Ensure that the code logs relevant information for debugging.

Here, Logging is done by expecting each function above class "videostreaming"

In [None]:
import cv2
import json
import os
import time
import logging

class FrameProcessor:
    def __init__(self, camera_id, geo_location, output_folder, duration):
        self.camera_id = camera_id
        self.geo_location = geo_location
        self.output_folder = output_folder
        self.duration = duration  
        self.frame_id = 0
        self.current_batch = None

        # Create the output folder if it doesn't exist
        os.makedirs(output_folder, exist_ok=True)

        # Initialize JSON data list and batch list
        self.json_data = []
        self.batches = []

        # Initialize logging
        self.logger = self.setup_logger()

    def setup_logger(self):
        logger = logging.getLogger("frame_processor")
        logger.setLevel(logging.DEBUG)

        # Create a file handler and set the log level
        log_file = os.path.join(self.output_folder, "frame_processor.log")
        file_handler = logging.FileHandler(log_file)
        file_handler.setLevel(logging.DEBUG)

        # Create a console handler and set the log level
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)

        # Create a formatter and set it for the handlers
        formatter = logging.Formatter(
            "%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
        )
        file_handler.setFormatter(formatter)
        console_handler.setFormatter(formatter)

        # Add the handlers to the logger
        logger.addHandler(file_handler)
        logger.addHandler(console_handler)

        return logger

    def process_frame(self, frame, timestamp):
        try:
            # Write one frame per second as an image file
            if self.frame_id % 25 == 0:
                image_filename = os.path.join(self.output_folder, f"frame_{self.frame_id // 25}.jpg")
                cv2.imwrite(image_filename, frame)

            # Create JSON object for the frame
            frame_info = {
                "camera_id": self.camera_id,
                "frame_id": self.frame_id,
                "geo_location": self.geo_location,
                "image_path": os.path.abspath(image_filename),
                "timestamp": timestamp,
            }

            # Append frame info to the JSON data list
            self.json_data.append(frame_info)

            # Check if a new batch needs to be created
            if self.current_batch is None or timestamp - self.current_batch["timestamp"] >= self.duration:
                self.create_new_batch()

            # Increment frame ID
            self.frame_id += 1

        except Exception as e:
            self.logger.error(f"Error processing frame: {str(e)}")

    def create_new_batch(self):
        try:
            # Create a new batch dictionary
            batch_id = len(self.batches) + 1
            starting_frame_id = self.frame_id - 25  # Start of the last second
            ending_frame_id = self.frame_id - 1  # End of the last second
            timestamp = time.time()

            batch_info = {
                "batch_id": batch_id,
                "starting_frame_id": starting_frame_id,
                "ending_frame_id": ending_frame_id,
                "timestamp": timestamp,
            }

            # Append batch info to the batch list
            self.batches.append(batch_info)

            # Update the current batch
            self.current_batch = batch_info

        except Exception as e:
            self.logger.error(f"Error creating new batch: {str(e)}")

    def save_json_data(self, json_filename):
        try:
            # Write JSON data to a JSON file
            with open(json_filename, "w") as json_file:
                json.dump(self.json_data, json_file, indent=4)

        except Exception as e:
            self.logger.error(f"Error saving JSON data: {str(e)}")

    def save_batch_info(self, batch_filename):
        try:
            # Write batch information to a JSON file
            with open(batch_filename, "w") as batch_file:
                json.dump(self.batches, batch_file, indent=4)

        except Exception as e:
            self.logger.error(f"Error saving batch information: {str(e)}")

if __name__ == "__main__":
    # Example usage:
    camera_id = "camera1"
    geo_location = "latitude: 123.456, longitude: 789.012"
    output_folder = "output_frames"
    json_filename = "frame_info.json"
    batch_filename = "batch_info.json"
    duration = 10  # Duration of each batch in seconds

    frame_processor = FrameProcessor(camera_id, geo_location, output_folder, duration)

    # Simulated video capture (you can replace this with your actual video capture logic)
    cap = cv2.VideoCapture("sample_video.mp4")

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        timestamp = time.time()
        frame_processor.process_frame(frame, timestamp)

    frame_processor.save_json_data(json_filename)
    frame_processor.save_batch_info(batch_filename)

    cap.release()
    cv2.destroyAllWindows()


5. Concurrency and Performance

To improve performance, modify your code to handle multiple camera streams
concurrently. Consider there are 2 live streams and enable concurrent processing of
those frames in the application. Explain how you ensure thread safety and avoid race
conditions.

Here, It accepts two cameras from system as same code followed above from above VideoStream class. Some slight changes added to increase code when run with both cameras

In [None]:
import cv2
import json
import time
import os
import math
import pyodbc


class img_stream_process:
    def __init__(self, camera_id, geo_location, output_folder,output_folder1, config_filename):
        self.camera_id = camera_id
        self.geo_location = geo_location
        self.output_folder = output_folder
        self.frame_id = 0
        self.cap1 = cv2.VideoCapture(camera_id[0])
        self.cap2 = cv2.VideoCapture(camera_id[1])
        self.config_filename = config_filename
        self.batch_duration = self.read_config_duration()[0]
        self.batch_size = self.read_config_duration()[1]
        

        # Create the output folder
        os.makedirs(output_folder, exist_ok=True)
        os.makedirs(output_folder1, exist_ok=True)


        #JSON data list and batch list
        self.json_data = []
        self.batches = []

    def read_config_duration(self):
        # Read the duration from the config file
        with open(self.config_filename, "r") as config_file:
            config_data = json.load(config_file)
            duration = config_data.get("duration")
            batch_size = config_data.get("batch_size")
        return duration,batch_size

    def video_capture(self):
        # Check if the video stream was opened successfully
        if not self.cap1.isOpened():
            print("Error: Could not open video source.")
            exit()
        if not self.cap2.isOpened():
            print("Error: Could not open video source.")
            exit()

    def frame_processing(self):
        # Set the desired frames per second (fps)
        desired_fps = 25

        frame_count = 0
        frame_to_save1 = None  
        frame_to_save2 = None

        while True:
            # Read a frame from the video stream
            ret1, frame1 = self.cap1.read()
            ret2,frame2 = self.cap2.read()

            # Check if the frame was successfully read
            if not ret1:
                print("Error: Could not read frame.")
                break
            if not ret2:
                print("Error: Could not read frame.")
                break

            frame_count += 1

            # Save one frame per second as an image file
            if frame_count % desired_fps == 0:
                outframe_id = (frame_count // desired_fps)
                frame_to_save1 = frame1  # Save the frame to be reused
                frame_to_save2 = frame2
                frame_filename1 = os.path.join(self.output_folder, f"frame_{outframe_id}.jpg")
                frame_filename2 = os.path.join(self.output_folder1, f"frame_{outframe_id}.jpg")

                cv2.imwrite(frame_filename1, frame_to_save1)  # Save the frame as an image
                cv2.imwrite(frame_filename2, frame_to_save2)  # Save the frame as an image


                # Append frame info to the JSON data list
                frame_info1 = {
                    "camera_id": self.camera_id[0],
                    "frame_id": outframe_id,
                    "geo_location": self.geo_location,
                    "image_path": frame_filename1
                }
                frame_info2 = {
                    "camera_id": self.camera_id[1],
                    "frame_id": outframe_id,
                    "geo_location": self.geo_location,
                    "image_path": frame_filename2
                }
                self.json_data.append(frame_info1)
                self.json_data.append(frame_info2)


            # Reuse the saved frame for the next 24 frames within the same second
            if frame_to_save1 is not None:
                cv2.imshow("Frame", frame_to_save1)

            # Exit the loop when the 'q' key is pressed
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
            # Release the video capture object and close any open windows
        self.cap.release()
        cv2.destroyAllWindows()

    def batch_frames(self):     
    # Calculate the number of batches.
        num_batches = math.ceil(self.batch_duration / self.batch_size)

    # Create a dictionary for each batch.
        #batches = {}
        for i in range(num_batches):
            batch_id = i + 1

        # Calculate the starting and ending frame IDs for the batch.
            starting_frame_id = batch_id * self.batch_size - self.batch_size + 1
            ending_frame_id = min(starting_frame_id + self.batch_size - 1, self.batch_duration)

        # Calculate the timestamp of the batch.
            timestamp = starting_frame_id / self.batch_duration
        #timestamp = time.time()

        # Add the batch to the dictionary.
            batch = {
            "batch_id": batch_id,
            "starting_frame_id": starting_frame_id,
            "ending_frame_id": ending_frame_id,
            "timestamp": timestamp
            }
            self.batches.append(batch)
            return self.batches
    def sqlserver_db(self):
        conn = psycopg2.connect(dbname="videostream",user="postgres",password="leo@#838",host="localhost",port=5432)
        cursor = conn.cursor()
        print(self.batches[0].values())

        
    def save_json_data(self, json_filename):
        # Write JSON data to a JSON file
        with open(json_filename, "w") as json_file:
            json.dump(self.json_data, json_file, indent=4)

    def save_batch_info(self, batch_filename):
        with open(batch_filename, "w") as batch_file:
            json.dump(self.batches, batch_file, indent=4)

if __name__ == "__main__":
    camera_id = 0
    geo_location = "latitude: 123.456, longitude: 789.012"
    output_folder = "output_frames"
    output_folder1 = "outframes"
    config_filename = "config.json"
    json_filename = "frame_info.json"
    batch_filename = "batch_info.json"

    processor = img_stream_process(camera_id, geo_location, output_folder,output_folder1, config_filename)
    processor.video_capture()
    processor.frame_processing()
    processor.batch_frames()
    processor.save_json_data(json_filename)
    processor.save_batch_info(batch_filename)


    print("Frame processing and batching complete. JSON data saved to:", json_filename)
    print("Batch information saved to:", batch_filename)


Task 2:
1. Write a user driven python program that accepts,

➢ TIMESTAMP
➢ DURATION OF THE VIDEO FILE from the user.
Based on the above information, iterate through the batch information in the Database. Create a
metadata out of it which will be helpful in gathering the frame information from the json file.
Once the necessary frames are gathered convert them to a mp4 file and present them to the
user.


2. Error Handling and Logging
Implement error handling and logging mechanisms in your code to capture and handle
exceptions that may occur during frame processing, data storage, or transmission. Ensure that
the code logs relevant information for debugging.

It is same as task one but timestamp and duration by user can be taken by input() in class. Create respective metadata and generate a video from the frames infomation present.

In [None]:
import cv2
import json
import time
import os
import math
import pyodbc


class img_stream_process:
    def __init__(self, camera_id, geo_location, output_folder, config_filename):
        self.camera_id = camera_id
        self.geo_location = geo_location
        self.output_folder = output_folder
        self.frame_id = 0
        self.cap = cv2.VideoCapture(camera_id)
        self.config_filename = config_filename
        #video duration
        self.batch_duration = int(input())
        #batch size
        self.batch_size = int(input())
        

        # Create the output folder
        os.makedirs(output_folder, exist_ok=True)

        #JSON data list and batch list
        self.json_data = []
        self.batches = []

    def video_capture(self):
        # Check if the video stream was opened successfully
        if not self.cap.isOpened():
            print("Error: Could not open video source.")
            exit()

    def frame_processing(self):
        # Set the desired frames per second (fps)
        desired_fps = 25

        frame_count = 0
        frame_to_save = None

        while True:
            # Read a frame from the video stream
            ret, frame = self.cap.read()

            # Check if the frame was successfully read
            if not ret:
                print("Error: Could not read frame.")
                break

            frame_count += 1

            # Save one frame per second as an image file
            if frame_count % desired_fps == 0:
                outframe_id = (frame_count // desired_fps)
                frame_to_save = frame  # Save the frame to be reused
                frame_filename = os.path.join(self.output_folder, f"frame_{outframe_id}.jpg")
                cv2.imwrite(frame_filename, frame_to_save)  # Save the frame as an image

                # Append frame info to the JSON data list
                frame_info = {
                    "camera_id": self.camera_id,
                    "frame_id": outframe_id,
                    "geo_location": self.geo_location,
                    "image_path": frame_filename
                }
                self.json_data.append(frame_info)

            # Reuse the saved frame for the next 24 frames within the same second
            if frame_to_save is not None:
                cv2.imshow("Frame", frame_to_save)

            # Exit the loop when the 'q' key is pressed
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
            # Release the video capture object and close any open windows
        self.cap.release()
        cv2.destroyAllWindows()

    def batch_frames(self):     
    # Calculate the number of batches.
        num_batches = math.ceil(self.batch_duration / self.batch_size)

    # Create a dictionary for each batch.
        #batches = {}
        for i in range(num_batches):
            batch_id = i + 1

        # Calculate the starting and ending frame IDs for the batch.
            starting_frame_id = batch_id * self.batch_size - self.batch_size + 1
            ending_frame_id = min(starting_frame_id + self.batch_size - 1, self.batch_duration)

        # Calculate the timestamp of the batch.
            timestamp = starting_frame_id / self.batch_duration
        #timestamp = time.time()

        # Add the batch to the dictionary.
            batch = {
            "batch_id": batch_id,
            "starting_frame_id": starting_frame_id,
            "ending_frame_id": ending_frame_id,
            "timestamp": timestamp
            }
            self.batches.append(batch)
            return self.batches
    def sqlserver_db(self):
        conn = psycopg2.connect(dbname="videostream",user="postgres",password="leo@#838",host="localhost",port=5432)
        cursor = conn.cursor()
        print(self.batches[0].values())

        
    def save_json_data(self, json_filename):
        # Write JSON data to a JSON file
        with open(json_filename, "w") as json_file:
            json.dump(self.json_data, json_file, indent=4)

    def save_batch_info(self, batch_filename):
        with open(batch_filename, "w") as batch_file:
            json.dump(self.batches, batch_file, indent=4)

if __name__ == "__main__":
    camera_id = 0
    geo_location = "latitude: 123.456, longitude: 789.012"
    output_folder = "output_frames"
    config_filename = "config.json"
    json_filename = "frame_info.json"
    batch_filename = "batch_info.json"

    processor = img_stream_process(camera_id, geo_location, output_folder, config_filename)
    processor.video_capture()
    processor.frame_processing()
    processor.batch_frames()
    processor.sqlserver_db()
    processor.save_json_data(json_filename)
    processor.save_batch_info(batch_filename)


    print(json_filename)
    print(batch_filename)


In [23]:
import cv2
import json
width = 650
height = 650

# Load the frame metadata from the JSON file
with open("frame_info.json", "r") as json_file:
    frame_metadata = json.load(json_file)

# Create a VideoWriter object to save the compiled video
fourcc = cv2.VideoWriter_fourcc(*'mkvv')  # Codec for MP4 format
output_video = cv2.VideoWriter("output_video.mkv", fourcc, 25, (width, height))  # Adjust width and height

# Loop through the frame metadata and add frames to the video
for frame_info in frame_metadata:
    frame = cv2.imread(frame_info["image_path"])  # Load the frame image
    output_video.write(frame)

# Release the VideoWriter
output_video.release()

In [25]:
import imageio
import json

# Load the frame metadata from the JSON file
with open("frame_info.json", "r") as json_file:
    frame_metadata = json.load(json_file)

# Create a list to store frames
frames = []

# Loop through the frame metadata and add frames to the list
for frame_info in frame_metadata:
    frame = imageio.imread(frame_info["image_path"])  # Load the frame image
    frames.append(frame)

# Define the output video file path
output_video_path = "output_video.mp4"

# Create the MP4 video from the list of frames
imageio.mimsave(output_video_path, frames, fps=25)  # Adjust the frame rate (fps) as needed

print(f"Video saved to {output_video_path}")


  frame = imageio.imread(frame_info["image_path"])  # Load the frame image


Video saved to output_video.mp4
