In [4]:
pip install opencv-python scikit-image scikit-learn networkx

Note: you may need to restart the kernel to use updated packages.


In [5]:
# Import necessary libraries
import os
import cv2
import numpy as np
from skimage.metrics import structural_similarity as ssim
from sklearn.cluster import MiniBatchKMeans
from sklearn.decomposition import PCA
import networkx as nx
from scipy.sparse.csgraph import connected_components
from scipy.spatial.distance import cdist
import time
import psutil

In [6]:
# Install required libraries
# !pip install opencv-python scikit-image scikit-learn networkx

def calculate_ssim(frame1, frame2):
    # Convert frames to grayscale
    gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
    gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)

    # Calculate SSIM between the two frames
    score, _ = ssim(gray1, gray2, full=True)
    return score

def extract_frames(video_path):
    # Create a directory to store extracted frames
    frames_dir = "extracted_frames"
    os.makedirs(frames_dir, exist_ok=True)

    # Read the video and extract frames
    video = cv2.VideoCapture(video_path)
    frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT))

    for frame_number in range(frame_count):
        ret, frame = video.read()
        if ret:
            frame_path = os.path.join(frames_dir, f"frame_{frame_number}.jpg")
            cv2.imwrite(frame_path, frame)

    video.release()

    return frames_dir

def calculate_ccfv(color_feature, edge_feature, wavelet_feature):
    # Initialization
    X = color_feature
    Y = edge_feature + wavelet_feature

    # Covariance Matrices
    Sxx = np.cov(X, rowvar=False)
    Syy = np.cov(Y, rowvar=False)
    Sxy = np.cov(X, Y, rowvar=False)

    # Compute Transformation Matrices G1 and G2
    G1 = np.dot(np.dot(np.linalg.inv(np.sqrt(Sxx)), Sxy), np.dot(np.linalg.inv(Syy), Sxy.T))
    G2 = np.dot(np.dot(np.linalg.inv(np.sqrt(Syy)), Sxy.T), np.dot(np.linalg.inv(Sxx), Sxy))

    # Compute Eigen Vectors and Rank
    _, u = np.linalg.eigh(G1)
    _, v = np.linalg.eigh(G2)
    r = np.linalg.matrix_rank(Sxy)

    # Choose Canonical Variables
    d = 100  # Set to a suitable value
    Wx = np.dot(np.linalg.inv(np.sqrt(Sxx)), u[:, :d])
    Wy = np.dot(np.linalg.inv(np.sqrt(Syy)), v[:, :d])

    # Compute Canonically Correlated Feature Vector (CCFV)
    CCFV = np.dot(Wx.T, X) + np.dot(Wy.T, Y)

    return CCFV

def graph_modularity_clustering(similarity_graph, edge_set, weight_set, num_frames, scaling_parameter):
    # Iterative Edge Pruning
    for i in range(num_frames):
        for j in range(num_frames):
            # Calculate the difference Δij
            delta_ij = similarity_graph[i, j] - scaling_parameter
            # Prune edges with high Δ values
            if delta_ij > 0:
                similarity_graph[i, j] = 0

    # Edge Pruning and Clustering Enhancement
    while True:
        # Find connected components within the VSG
        n_components, labels = connected_components(similarity_graph, directed=False)
        prev_modularity = calculate_modularity(similarity_graph, labels)

        # Calculate Dev for each edge
        dev = calculate_dev(similarity_graph, edge_set, weight_set, scaling_parameter)
        # Select edges with high Dev values and remove them
        for i, j in edge_set:
            if dev[i, j] > 0:
                similarity_graph[i, j] = 0

        # Calculate modularity after pruning
        n_components, labels = connected_components(similarity_graph, directed=False)
        modularity = calculate_modularity(similarity_graph, labels)

        # Check for improvement in modularity
        if modularity - prev_modularity < 1e-6:
            break

    # Obtain Individual Clusters
    n_components, labels = connected_components(similarity_graph, directed=False)

    return labels

def calculate_modularity(graph, labels):
    num_edges = np.sum(graph) / 2
    modularity = 0
    for i in range(len(labels)):
        for j in range(len(labels)):
            if labels[i] == labels[j]:
                modularity += (graph[i, j] - np.sum(graph[i, :]) * np.sum(graph[:, j]) / (2 * num_edges))

    modularity /= (2 * num_edges)

    return modularity

def calculate_dev(similarity_graph, edge_set, weight_set, scaling_parameter):
    dev = np.zeros_like(similarity_graph)
    for i, j in edge_set:
        dev[i, j] = similarity_graph[i, j] - scaling_parameter

    return dev

def summarize_video_multi_feature(video_path, num_frames, batch_size=100):
    frames_dir = extract_frames(video_path)

    keyframes = []
    prev_frame = None

    for frame_number in range(num_frames):
        frame_path = os.path.join(frames_dir, f"frame_{frame_number}.jpg")
        current_frame = cv2.imread(frame_path)

        if prev_frame is not None:
            # Extract features from Color, Edge, and Wavelet
            color_feature = prev_frame.flatten()
            edge_feature = cv2.Canny(cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY), 50, 150).flatten()
            wavelet_feature = cv2.dft(cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY).astype(float), flags=cv2.DFT_COMPLEX_OUTPUT).flatten()

            # Calculate Canonically Correlated Feature Vector (CCFV)
            ccfv = calculate_ccfv(color_feature, edge_feature, wavelet_feature)

            # Append CCFV to keyframes
            keyframes.append(ccfv)

        prev_frame = current_frame

    # Convert keyframes to numpy array
    keyframes = np.array(keyframes)

    # Perform Graph Modularity Clustering
    similarity_graph = cdist(keyframes, keyframes, metric='cosine')
    np.fill_diagonal(similarity_graph, 0)  # Set diagonal elements to zero
    edge_set = np.column_stack(np.where(similarity_graph > 0))
    weight_set = similarity_graph[edge_set[:, 0], edge_set[:, 1]]
    labels = graph_modularity_clustering(similarity_graph, edge_set, weight_set, num_frames, scaling_parameter=0.1)

    # Create the "process_frames" directory and move keyframes there
    process_frames_dir = os.path.join(frames_dir, "process_frames")
    os.makedirs(process_frames_dir, exist_ok=True)

    for i, label in enumerate(labels):
        frame_path = os.path.join(frames_dir, f"frame_{i}.jpg")
        target_path = os.path.join(process_frames_dir, f"frame_{i}_cluster_{label}.jpg")
        try:
            os.rename(frame_path, target_path)
        except FileNotFoundError:
            continue

    selected_frames = [os.path.join(process_frames_dir, f"frame_{i}_cluster_{label}.jpg") for i, label in enumerate(labels)]

    # Extract keyframes closest to centroids
    keyframes = []
    for cluster_label in np.unique(labels):
        cluster_indices = np.where(labels == cluster_label)[0]
        cluster_centroid = np.mean(keyframes[cluster_indices], axis=0)
        closest_frame_index = np.argmin(np.linalg.norm(keyframes[cluster_indices] - cluster_centroid, axis=1))
        keyframes.append(selected_frames[cluster_indices[closest_frame_index]])

    # Arrange keyframes in temporal order
    keyframes.sort(key=lambda x: int(x.split('_')[1]))

    return keyframes

def create_summarized_video(input_video_path, output_video_path, selected_frames):
    # Get the video properties from the input video
    input_video = cv2.VideoCapture(input_video_path)
    frame_width = int(input_video.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(input_video.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(input_video.get(cv2.CAP_PROP_FPS))
    video_codec = cv2.VideoWriter_fourcc(*"mp4v")

    # Create the output video writer
    output_video = cv2.VideoWriter(output_video_path, video_codec, fps, (frame_width, frame_height))

    # Read the selected frames and write them to the output video
    for frame_path in selected_frames:
        frame = cv2.imread(frame_path)
        output_video.write(frame)

    # Release resources
    input_video.release()
    output_video.release()

def get_time_complexity():
    start_time = time.time()
    # Input video path
    input_video_path = r'C:\Users\Sumit\Desktop\Test Results 01\Surveilance_camera_test01.mp4'

    # Specify the desired number of resultant frames
    num_frames = 10

    try:
        # Summarize the video and obtain selected frames
        selected_frames = summarize_video_multi_feature(input_video_path, num_frames)

        # Output video path
        output_video_path = os.path.join(os.path.dirname(input_video_path), 'C:/Users/Sumit/Desktop/Test Results 01/Summarized_Surveilance_camera_test01.mp4')

        # Create the summarized output video
        create_summarized_video(input_video_path, output_video_path, selected_frames)

        print("Video summarization completed successfully!")
    except ValueError as e:
        print(f"Video summarization failed: {str(e)}")

    elapsed_time = time.time() - start_time
    return elapsed_time

def get_space_complexity():
    process = psutil.Process(os.getpid())
    memory_usage = process.memory_info().rss  # in bytes
    memory_usage_mb = memory_usage / (1024 ** 2)  # convert to megabytes
    return memory_usage_mb

In [7]:
def main():
    time_complexity = get_time_complexity()
    space_complexity = get_space_complexity()

    print(f"Overall Time Complexity: {time_complexity} seconds")
    print(f"Overall Space Complexity: {space_complexity} MB")

if __name__ == "__main__":
    main()

Video summarization failed: operands could not be broadcast together with shapes (230400,) (460800,) 
Overall Time Complexity: 13.8375883102417 seconds
Overall Space Complexity: 154.21875 MB
