# Visual Feature Analysis Pipeline

## Overview
This Jupyter notebook implements a comprehensive visual feature extraction pipeline for video analysis. It processes videos by extracting keyframes and analyzing various visual characteristics.

### Key Features
- **Frame Analysis**:
  - Color statistics (warmth, distribution, entropy)
  - HSV color space analysis
  - Texture features (GLCM, wavelets)
  - Image composition metrics
- **Object Detection**:
  - Face detection
  - Person detection
  - Hand tracking
- **Technical Features**:
  - Blur detection
  - Edge detection
  - Rule of thirds analysis
  - Depth of field estimation

### Prerequisites


In [None]:
!pip install opencv-python
!pip install ffmpeg-python
!pip install scipy
!pip install PyWavelets
!pip install scikit-image
!pip install pandas
!pip install matplotlib
!pip install librosa
!pip install mediapipe



### Usage
The pipeline processes videos in batches, extracting keyframes at specified intervals and analyzing various visual features. Results are saved to CSV files for further analysis. The processing can be resumed from previous runs using checkpointing.

### Technical Notes
- Processes videos in blocks of 2,018 files
- Supports 10 processing blocks total (20,180 videos)
- Saves intermediate results to prevent data loss
- Uses multiple computer vision libraries for comprehensive analysis

In [2]:
import cv2
import os
import pandas as pd
import numpy as np
import ffmpeg
import matplotlib.pyplot as plt
from scipy.stats import skew
from copy import deepcopy
from skimage.feature import local_binary_pattern
import librosa
import subprocess
import json
import time
import warnings
from video_analysis_utils import *
from tqdm import tqdm

In [3]:
def analyse_video(video_id, video_dir, keyframe_dir, keyframe_interval=10, measures=None):
    """ 
    Extract and analyze keyframes from video.
    Output is a dictionary with video_id and mean, sd, and volatility for different measures.
    
    `measures` is a list of tuples, where each tuple contains:
        (measure_name (str), measure_function (function), output_keys (list of str for mean, sd, vol))
    """
    video_measurements = {}
    video_measurements['video_id'] = video_id
    
    # Ensure the output folder path ends with a separator
    if not video_dir.endswith(os.sep):
        video_dir += os.sep

    video_path = video_dir + video_id + '.mp4'

    # Extract keyframes to keyframe folder for analysis
    nb_keyframes = extract_keyframes(video_path, keyframe_dir, s=keyframe_interval)
    
    video_measurements['nb_keyframes'] = nb_keyframes

    # Default to an empty list if no measures are provided
    if measures is None:
        measures = []

    # Apply each measure
    for measure_name, measure_function, output_keys in measures:
        # Get measure result (e.g., mean, sd, vol)
        measure_values = measure_function(keyframe_dir)
        
        # Map result to the output keys in the dictionary
        for i, key in enumerate(output_keys):
            video_measurements[f"{measure_name}_{key}"] = measure_values[i]
    
    ## complicated functions
    # color %
    results = calculate_color_percentages_statistics(keyframe_dir)
    colors = ['black', 'gray', 'white', 'red', 'green', 'blue', 'yellow', 'orange', 'brown', 'pink', 'purple']
    statistics = ['mean', 'sd', 'vol']
    for i, col in enumerate(colors):
        for j, stat in enumerate(statistics):
            video_measurements[f"{col}%_{stat}"] = results[j][i]

    # HSV stats
    results = calculate_hsv_statistics(keyframe_dir)
    output_keys = ["mean_avg_value", "mean_avg_saturation", "mean_sd_value", "mean_sd_saturation",
                   "sd_avg_value", "sd_avg_saturation", "sd_sd_value", "sd_sd_saturation", 
                    "vol_avg_value", "vol_avg_saturation", "vol_sd_value", "vol_sd_saturation"]
    for i, key in enumerate(output_keys):
        video_measurements[f"{key}"] = results[i]
    results = calculate_hue_circular_variance_statistics(keyframe_dir)
    output_keys = ["mean_circular_var_hue", "sd_circular_var_hue", "vol_circular_var_hue"]
    for i, key in enumerate(output_keys):
        video_measurements[f"{key}"] = results[i]

    # avg wavelet value
    results = calculate_hsv_wavelet_mean_statistics(keyframe_dir)
    keys = ['hue', 'saturation', 'value']
    statistics = ['mean', 'sd', 'vol']
    for i, key in enumerate(keys):
        for j, stat in enumerate(statistics):
            video_measurements[f"{key}_avg_wavelet_{stat}"] = results[j][i]

    # HSV (GLCM) statistics
    results = calculate_glcm_statistics(keyframe_dir)
    keys = ['hue', 'saturation', 'value']
    features = ["contrast", "correlation", "energy", "homogeneity"]
    statistics = ['mean', 'sd', 'vol']
    for j, key in enumerate(keys):
        for i, stat in enumerate(statistics):
            for k, feature in enumerate(features):
                video_measurements[f"{key}_{feature}_{stat}"] = results[i][j][k]

    # Low deapth of field (DOF) 
    results = calculate_low_dof_statistics(keyframe_dir)
    keys = ['hue', 'saturation', 'value']
    statistics = ['mean', 'sd', 'vol']
    for i, key in enumerate(keys):
        for j, stat in enumerate(statistics):
            video_measurements[f"{key}_low_dof_{stat}"] = results[j][i]

    # Rule of thirds 
    results = calculate_rule_of_thirds_statistics(keyframe_dir)
    keys = ['saturation', 'value']
    statistics = ['mean', 'sd', 'vol']
    for j, key in enumerate(keys):
        for i, stat in enumerate(statistics):
            video_measurements[f"avg_inner_{key}_{stat}"] = results[i][j]
    
    # number of people, confidence and size of bounding boxes 
    results = calculate_nb_persons_statistics(keyframe_dir)
    keys = ['nb', 'largest_confidence', 'avg_confidence', 'largest_bb', 'sum_of_bb']
    statistics = ['mean', 'sd', 'vol']
    for j, key in enumerate(keys):
        for i, stat in enumerate(statistics):
            video_measurements[f"{key}_people_{stat}"] = results[i][j]

    # Number of faces, confidence and size of bounding boxes 
    results = calculate_nb_faces_statistics(keyframe_dir)
    keys = ['nb', 'largest_confidence', 'avg_confidence', 'largest_bb', 'sum_of_bb']
    statistics = ['mean', 'sd', 'vol']
    for j, key in enumerate(keys):
        for i, stat in enumerate(statistics):
            video_measurements[f"{key}_faces_{stat}"] = results[i][j]

    # Number of hands and size of bounding boxes 
    results = calculate_nb_hands_statistics(keyframe_dir)
    keys = ['nb', 'largest_bb', 'sum_of_bb']
    statistics = ['mean', 'sd', 'vol']
    for j, key in enumerate(keys):
        for i, stat in enumerate(statistics):
            video_measurements[f"{key}_hands_{stat}"] = results[i][j]
   
    return video_measurements

# Change the index, i, here to procces a new block of videos:

In [1]:
i = 1 # (integer): block 1-10 of each 2018 of 20180 the total videos

In [5]:
# Try to load previously processed data, if it exists
try:
    processed_df = pd.read_csv(f"visual_features_{i}.csv")
    processed_video_ids = set(processed_df["video_id"].unique())
    print(f"Resuming from {len(processed_video_ids)} previously processed videos.")
except FileNotFoundError:
    processed_video_ids = set()
    print("Starting fresh, no previously processed videos found.")
    
# List of measures to analyze (measure_name, measure_function, output_keys)
measures = [
    ("color_warmth", calculate_color_warmth, ["mean", "sd", "vol"]),
    ("valence", calculate_valence_statistics, ["mean", "sd", "vol"]),
    ("dominance", calculate_dominance_statistics, ["mean", "sd", "vol"]),
    ("arousal", calculate_arousal_statistics, ["mean", "sd", "vol"]),
    ("colorfulness", calculate_colorfulness_statistics, ["mean", "sd", "vol"]),
    ("clarity", calculate_clarity_statistics, ["mean", "sd", "vol"]),
    ("gray_distribution_entropy", calculate_gray_distribution_entropy_statistics, ["mean", "sd", "vol"]),
    ("blurriness", calculate_blurriness_statistics, ["mean", "median", "sd", "vol"]),
    ("edge_points", calculate_edge_points_statistics, ["mean", "sd", "vol"])
]     

keyframe_interval=10

video_folder = '../../YouTube_Downloader/Complete_Downloads'
keyframe_folder = f'Keyframes/{i}'
files = os.listdir(video_folder)
mp4_files = [file for file in files if file.endswith('.mp4')]

    
    # Iterate over the i'th block of .mp4 files in the directory
for filename in tqdm(mp4_files[2018*(i-1):2018*(i)], total=2018): # section i/10 of the videos
    video_path = os.path.join(video_folder, filename)
    video_id = os.path.splitext(filename)[0]  # Get filename without extension

    # Skip if video has already been processed
    if video_id in processed_video_ids:
        continue
        
    # Analyze video
    video_analysis = analyse_video(video_id, video_folder, keyframe_folder, keyframe_interval=keyframe_interval, measures=measures)

    # Append to the CSV, ensuring headers are written only once
    df = pd.DataFrame([video_analysis])
    df.to_csv(f"visual_features_{i}.csv", mode="a", header=not bool(processed_video_ids), index=False)

    # Add the processed video ID to the set
    processed_video_ids.add(video_id)

Starting fresh, no previously processed videos found.


100%|██████████| 100/100 [07:58<00:00,  4.79s/it]
