In [None]:
# imports
import cv2
import os
import numpy as np
import math
from dataclasses import dataclass,fields
from utils import *
from typing import List, Tuple

In [None]:
# TODO: convert_video_to_frames
"""
params : video name 
return : create output directory include the frames of this video 
"""
def convert_video_to_frames(video_name):
    video_path = "../Dataset/"+video_name
    output_dir = "../Dataset/frames"+video_name
    # may be changed
    sampling_rate = 60
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    for frame_idx in range(0, total_frames, sampling_rate):
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
        ret, frame = cap.read()
        if ret:
            output_path = os.path.join(output_dir, "frame_{:06d}.jpg".format(frame_idx))
            cv2.imwrite(output_path, frame)
    cap.release()
    return 

In [None]:
# TODO: create classes 
"""
desc: Difference between pixel hue,saturation,luma,edges values of adjacent frames.
"""
from typing import Optional
@dataclass
class delta_feature:
    # by default set all weights to 1
    delta_hue: float = 1.0
    delta_sat: float = 1.0
    delta_lum: float = 1.0
    # TODO: have bigger values that other features,detection threshold may need to be adjusted
    delta_edges: float = 1.0  

"""
desc: features calculated for each frame
"""
@dataclass
class frame_data:
    hue: np.ndarray
    sat: np.ndarray
    lum: np.ndarray
    edges: Optional[np.ndarray]
    # TODO: add optical flow

# create defualt weights 
default_delta_feature_weights = delta_feature()
@dataclass
class shot_detector:
    threshold: float = 27.0
    min_scene_len: int = 15
    weights: 'delta_feature' = default_delta_feature_weights
    kernel_size: Optional[int] = None
    last_frame: Optional[frame_data] = None


In [None]:
"""Detect edges  in the frame 
    params:
        lum:  the luma channel of a frame.
        kernel: kernel size 
    return:
        2D 8-bit image where 255--> edge , 0--> other 
"""
import math
def detect_edges(lum: np.ndarray,kernel:int =None) -> np.ndarray:
    if kernel == None:
        # calculate kernel size  depend on the video reselution
        kernel_size = 4 + round(math.sqrt(lum.shape[1]*lum.shape[0]) / 192)
        if kernel_size % 2 == 0:
            kernel_size += 1
        kernel = np.ones((kernel_size, kernel_size), np.uint8) 
    # Estimate levels for thresholding.
    sigma: float = 1.0 / 3.0
    median = np.median(lum)
    low = int(max(0, (1.0 - sigma) * median))
    high = int(min(255, (1.0 + sigma) * median))
    # Calculate edges using Canny algorithm, and reduce noise by dilating the edges.
    # TODO : Implement  Canny 
    edges = cv2.Canny(lum, low, high)
    return cv2.dilate(edges,kernel)

In [None]:
"""
calculate fram score , to compare with the threshold and know if it is shot boundry or not 
params
 frame_num: index of the frame 
 frame_img: frame itself
"""
def calculate_frame_score(video_param: shot_detector, frame_img: np.ndarray) -> float:
    # TODO: Add option to enable motion estimation
    # convert image into HSV colorspace
    hue, sat, lum = cv2.split(cv2.cvtColor(frame_img, cv2.COLOR_BGR2HSV))
    # calculate edges 
    edges = detect_edges(lum) 
    if video_param.last_frame is None:
        video_param.last_frame = frame_data(hue, sat, lum, edges)
        return 0.0
    score_components = delta_feature(
        delta_hue=mean_pixel_distance(hue, video_param.last_frame.hue),
        delta_sat=mean_pixel_distance(sat, video_param.last_frame.sat),
        delta_lum=mean_pixel_distance(lum, video_param.last_frame.lum),
        delta_edges=(0.0 if edges is None else mean_pixel_distance(
            edges, video_param.last_frame.edges)),
    )
    frame_score: float = (
        sum(
            getattr(score_components, field.name) * getattr(video_param.weights, field.name)
            for field in fields(delta_feature)
        )
        / sum(abs(getattr(video_param.weights, field.name)) for field in fields(delta_feature)))
    # Store all data required to calculate the next frame's score.
    video_param.last_frame = frame_data(hue, sat, lum, edges)
    return frame_score

In [None]:
"""
return: List of frames where scene cuts have been detected
"""
def process_frame(
    video_param: shot_detector, frames_since_last_cut: List[frame_data],
    frames_count_since_last_cut: int, frame_img: np.ndarray, op) -> Tuple[Optional[float], frame_data]:
    """Returns the shot representation/embeddings (by performing `op`) for a span of frames
    if `frame_img` is a boundary, None otherwise.
    Also returns the `frame_data` for `frame_img` to be used by the caller.
    """
    frame_score = calculate_frame_score(video_param, frame_img)
    # consider any frame over the threshold a new scene, but only if
    # the minimum scene length has been reached (otherwise it is ignored).
    # NOTE: `frames_count_since_last_cut` is the actual number of frames since the last cut, i.e. not sampled every x frames.
    if frames_count_since_last_cut < video_param.min_scene_len:
        return (None, video_param.last_frame)

    shot_score = op(frames_since_last_cut) if frame_score >= video_param.threshold else None
    return (shot_score, video_param.last_frame)

In [None]:
def avg_frames_features(frames: List[frame_data]) -> frame_data:
    """A reduction operation to perform (averaging) on a list of frame data.
    The reduced data would then be used as a representation of the whole shot.
    """
    assert frames, "Operation can not be performed on 0 frames"
    avg_frame_data = frame_data(0, 0, 0, None)
    count = len(frames)
    attrs = [f.name for f in fields(frames[0])]
    for attr_name in attrs:
        attr_value = sum(getattr(frame, attr_name) for frame in frames) / count
        setattr(avg_frame_data, attr_name, attr_value)
    return avg_frame_data

In [None]:

# try 
import cv2
import os
video_path = "../tears_of_steel_1080p.mov"
output_dir = "frames"
sampling_rate = 10
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
cap = cv2.VideoCapture(video_path)
video_try = shot_detector()
assert video_try.min_scene_len > sampling_rate, "The sampling rate is must be strictly less than the minimum scene length"

total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
frame_data_since_last_cut = []
for frame_idx in range(0, total_frames, sampling_rate):
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
    _, frame = cap.read()
    # output_path = os.path.join(output_dir, "frame_{:06d}.jpg".format(frame_idx))
    # cv2.imwrite(output_path, frame)

    shot_score, data = process_frame(
        video_try, frame_data_since_last_cut,
        len(frame_data_since_last_cut) * sampling_rate,
        frame, avg_frames_features)

    if shot_score is not None:
        print(frame_idx)
        # Reset the data accumulator.
        frame_data_since_last_cut = []
    frame_data_since_last_cut.append(data)

cap.release()

In [None]:
"""
get the average feature vector to represent shot with it's frames features 
params:
 List: array of the shot cut frames number
Return:
 average feature vector represent each frame   
"""
def get_avg_features_shot(shot_cuts:np.ndarray):
    
    return 


In [None]:
# # TODO: get_HSV_feature_frame
"""
params : frames's path
return : HSV hist for all frames
"""
def get_HSV_feature_frame():
    # Set the number of histogram bins
    num_bins = 16
    HSV_hists = []
    frames_path = 'Dataset/frames'
    # Loop through the frames and extract the color histograms
    for fram_name in os.listdir(frames_path):

        frame = cv2.imread(os.path.join(frames_path, fram_name))
        # Convert the frame to the HSV color space
        hsv_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

        # Compute the color histogram for each channel
        hist_hue = cv2.calcHist([hsv_frame], [0], None, [num_bins], [0, 180])
        hist_saturation = cv2.calcHist([hsv_frame], [1], None, [num_bins], [0, 255])
        hist_value = cv2.calcHist([hsv_frame], [2], None, [num_bins], [0, 255])

        # Concatenate the histograms into a single feature vector
        hist = np.concatenate([hist_hue, hist_saturation, hist_value], axis=0)

        # Normalize the histogram to have unit L2 norm
        hist = cv2.normalize(hist, hist)
        HSV_hists.append(hist)
    return 