<a href="https://colab.research.google.com/github/Marshal9158/BDA-ASSIGNMENT/blob/main/Assignment_2(BDA).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install -q pyspark
!pip install -q opencv-python
from pyspark.sql import SparkSession
import cv2
from google.colab.patches import cv2_imshow
import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, IntegerType

# Initialize Spark session
spark = SparkSession.builder.appName("VideoAnalysis").getOrCreate()

# Path to your video file
video_file_path = "/content/AdobeStock_360465490_Video_HD_Preview.mov"  # Replace with your video file path

# Initialize video capture
video_capture = cv2.VideoCapture(video_file_path)

# Check if video file opened successfully
if not video_capture.isOpened():
    print("Error: Could not open video.")
else:
    frame_list = []
    # Read video frame by frame
    while True:
        ret, frame = video_capture.read()
        if not ret:
            break
        # Append frame to the list as a flattened array for easier handling in Spark
        frame_list.append(frame.flatten().tolist())

    # Convert frame list to Spark DataFrame
    frame_df = spark.createDataFrame(frame_list, ArrayType(IntegerType()))

    # Define a UDF to process each frame
    def process_frame(frame_array):
        # Reshape flattened frame array back to image dimensions (assuming 720p video, adjust as needed)
        frame = np.array(frame_array, dtype=np.uint8).reshape((720, 1280, 3))
        gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        edges = cv2.Canny(gray_frame, 100, 200)
        return edges.flatten().tolist()  # Flatten to save as a Spark array

    # Register UDF
    process_frame_udf = udf(process_frame, ArrayType(IntegerType()))

    # Apply UDF to process frames
    processed_df = frame_df.withColumn("processed_frame", process_frame_udf("value")).select("processed_frame")

    # Collect processed frames for displaying
    processed_frames = processed_df.collect()

    # Display each processed frame
    for row in processed_frames:
        edge_frame = np.array(row.processed_frame, dtype=np.uint8).reshape((720, 1280))
        cv2_imshow(edge_frame)

# Release the video capture
video_capture.release()
spark.stop()
