In [1]:
import numpy as np
import cv2
from scipy.spatial.distance import correlation
from scipy.stats import entropy
import heapq
import time
from scipy.stats import wasserstein_distance
from sklearn.metrics import precision_recall_fscore_support
import scipy.io as sio

# Hyper Parameter

In [2]:
dictionary = []
dictionary_correlation_scores = []
SIZE_SUPERFRAME = 30
UNIFORM_COLOR_HISTOGRAM = np.ones(64) / 64
SIZE_DICTIONARY = 10
SUMMARY_RATIO = 0.15

# Helper Function

In [3]:
def extract_color_histogram(superframe):
    """
    Extract color histogram from a list of frames using HSV color space.
    
    Args:
        frames (list): List of frames (numpy arrays in BGR format).
        
    Returns:
        numpy.ndarray: Histogram (64 bins for Hue).
    """
    if not superframe:
        return np.zeros(64)
    
    hists = []
    for frame in superframe:
        if frame is None or not isinstance(frame, np.ndarray):
            continue
        hist = cv2.calcHist([frame], [0, 1, 2], None, [4, 4, 4], [0, 256, 0, 256, 0, 256])
        hist = hist.flatten()
        hists.append(hist)
    
    if not hists:
        return np.zeros(64)
    
    avg_hist = np.mean(hists, axis=0)
    avg_hist = cv2.normalize(avg_hist, avg_hist).flatten()
    return avg_hist

In [4]:
def bhattacharyya_distance(hist1, hist2):
        """
        Calculate Bhattacharyya distance between two histograms
        
        Args:
            hist1: First histogram
            hist2: Second histogram
            
        Returns:
            Bhattacharyya distance
        """
        # Ensure histograms are normalized
        if np.sum(hist1) != 0:
            hist1 = hist1 / np.sum(hist1)
        if np.sum(hist2) != 0:
            hist2 = hist2 / np.sum(hist2)
        
        # Calculate Bhattacharyya coefficient
        bc = np.sum(np.sqrt(hist1 * hist2))
        
        # Convert to distance
        distance = -np.log(bc + 1e-10)  # Add small epsilon to avoid log(0)
        
        return distance

In [5]:
def update_dictionary(index, feature):
    """
    Update the dictionary using correlation-based replacement
    
    Args:
        index: Index of the super-frame
        feature: Feature vector of the super-frame
    """
    # Calculate correlation score for new feature
    correlation_scores = []
    for i, (dict_index, dict_feature) in enumerate(dictionary):
        if dict_index != index:
            distance = bhattacharyya_distance(feature, dict_feature)
            correlation_scores.append((i, -distance))
    
    # Calculate new feature's correlation score (negative sum of distances)
    new_correlation = sum(score for _, score in correlation_scores)
    
    # Find element with highest correlation in dictionary
    max_corr_index = -1
    max_corr_value = float('-inf')
    for i, (dict_index, dict_feature) in enumerate(dictionary):
        # Calculate correlation for this dictionary element
        corr = 0
        for j, (other_index, other_feature) in enumerate(dictionary):
            if dict_index != other_index:
                distance = bhattacharyya_distance(dict_feature, other_feature)
                corr -= distance
        
        if corr > max_corr_value:
            max_corr_value = corr
            max_corr_index = i
    
    # Replace element with highest correlation if new correlation is lower
    if new_correlation < max_corr_value and max_corr_index >= 0:
        dictionary[max_corr_index] = (index, feature)

# Interestingness Score

## Global Camera Motion Score

In [6]:
def compute_global_camera_motion(superframe):
    """
    Compute the interesness of a superframe with camera motion
    S_gcm = S_a * S_s * S_t
    
    Args:
        superframe: List of frames in the super-frame
    Returns:
        Global camera motion score
    """

    if len(superframe) < 3:
        return 1 # Default score cuz cannot to summarize a very short superframe

    # Select frame
    first_frame = superframe[0]
    middle_frame = superframe[len(superframe) // 2]

    # Convert Gray-scale - need optical, translate, not color
    first_gray = cv2.cvtColor(first_frame, cv2.COLOR_BGR2GRAY)
    middle_gray = cv2.cvtColor(middle_frame, cv2.COLOR_BGR2GRAY)

    # Calculate optical flow using Lucas-Kanade
    # Parameters for ShiTomasi corner detection

    # feature_params = dict(maxCorners=100, qualityLevel=0.3, minDistance=7, blockSize=7)
    
    # # Parameters for Lucas-Kanade optical flow
    # lk_params = dict(winSize=(15, 15), maxLevel=2,
    #                  criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

    flow = cv2.calcOpticalFlowFarneback(
        first_gray, middle_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
    mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
    
    ## Compute S_a - Orientation Consistency score
    w_alpha = 15
    orientation_change = np.mean(np.abs(ang))
    S_a = np.exp(-w_alpha * orientation_change / 180)

    ## Compute S_s - Scale Consistency score
    w_s = 10
    scale_change = np.mean(mag)
    S_s = 0.5 * (np.exp(-w_s * max(1 - scale_change, scale_change - 1)) + 
             np.exp(-w_s * max(1 - scale_change, scale_change - 1)))

    ## Compute S_t - Translation Consistency score
    w_t = 10
    translation_change = np.std(mag)
    S_t = np.exp(-w_t * translation_change / first_frame.shape[0])

    return S_a * S_s * S_t

## Dictionary Learning Score

In [7]:
def compute_dictionary_learning_score(super_frame, index):
    """
    Calculate the Dictionary Learning score
    
    Args:
        super_frame: List of frames in the super-frame
        index: Index of the super-frame
        
    Returns:
        Dictionary Learning score
    """
    # Extract color histogram feature
    feature = extract_color_histogram(super_frame)
    
    # Initialize dictionary with first super-frames
    if len(dictionary) == 0:
        dictionary.append((index, feature))
        return 0.5  # Maximum score for initial dictionary elements
    
    # Calculate distance to closest element in dictionary
    min_distance = float('inf')
    for _, dict_feature in dictionary:
        distance = bhattacharyya_distance(feature, dict_feature)
        min_distance = min(min_distance, distance)
    
    # Calculate distinctiveness score
    s_dl = min_distance  # Higher distance means more distinct
    
    # Update dictionary if needed
    update_dictionary(index, feature)
    
    return s_dl

## Colorfulness Score

In [8]:
def compute_colorfulness_score(super_frame):
    """
    Calculate the Colorfulness score using Earth Mover's Distance (EMD)
    
    Args:
        super_frame: List of frames in the super-frame
        
    Returns:
        Colorfulness score based on EMD
    """
    # Sample frames from the super-frame
    sample_indices = [0, len(super_frame) // 2, -1]
    sampled_frames = [super_frame[i] for i in sample_indices if i < len(super_frame)]
    
    if not sampled_frames:
        return 0.5  # Default score if no frames
        
    colorfulness_scores = []
    
    # Reference histogram - using grayscale as reference (low colorfulness)
    # Create a grayscale reference frame
    reference_frame = cv2.cvtColor(sampled_frames[0], cv2.COLOR_BGR2GRAY)
    reference_frame = cv2.cvtColor(reference_frame, cv2.COLOR_GRAY2BGR)  # Convert back to BGR
    reference_hsv = cv2.cvtColor(reference_frame, cv2.COLOR_BGR2HSV)
    
    # Reference histogram
    hist_reference = cv2.calcHist([reference_hsv], [0, 1], None, [8, 8], [0, 180, 0, 256])
    hist_reference = cv2.normalize(hist_reference, hist_reference).flatten()
    
    # Calculate EMD for each frame
    for frame in sampled_frames:
        try:
            # Convert to HSV color space
            hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
            
            # Calculate histogram
            hist = cv2.calcHist([hsv], [0, 1], None, [8, 8], [0, 180, 0, 256])
            hist = cv2.normalize(hist, hist).flatten()
            
            # Prepare histograms for EMD
            hist1_emd = np.zeros((hist.shape[0], 3), dtype=np.float32)
            hist2_emd = np.zeros((hist_reference.shape[0], 3), dtype=np.float32)
            
            # Fill first two columns with bin coordinates (i//8, i%8)
            for i in range(hist.shape[0]):
                hist1_emd[i, 0] = i // 8  # H coordinate
                hist1_emd[i, 1] = i % 8   # S coordinate
                hist1_emd[i, 2] = hist[i] # Weight
                
                hist2_emd[i, 0] = i // 8
                hist2_emd[i, 1] = i % 8
                hist2_emd[i, 2] = hist_reference[i]
            
            # Calculate EMD
            emd_distance = cv2.EMD(hist1_emd, hist2_emd, cv2.DIST_L2)[0]
            
            # Normalize EMD to [0, 1] range
            # Higher EMD means more different from grayscale (more colorful)
            max_possible_emd = np.sqrt(8**2 + 8**2)  # Maximum distance in 8x8 grid
            normalized_emd = min(emd_distance / max_possible_emd, 1.0)
            
            colorfulness_scores.append(normalized_emd)
            
        except Exception as e:
            print(f"Error in colorfulness calculation: {e}")
            colorfulness_scores.append(0.5)  # Default score if calculation fails
    
    # Return maximum colorfulness score
    return max(colorfulness_scores) if colorfulness_scores else 0.5

## Final

In [17]:
def compute_interesness_score(superframe, index):
    """
    Compute total interesnes score for a superframe

    Args:
        superframe: [ndarray]

    Return:
        Interness Score: scalar
    """
    dictionary = []
    dictionary_correlation_scores = []
    
    w_gcm, w_dl, w_clr = 0.15, 0.4, 0.45
    S_gcm = compute_global_camera_motion(superframe)
    S_dl = compute_dictionary_learning_score(superframe, index)
    S_clr = compute_colorfulness_score(superframe)
    print(f"S_gcm = {S_gcm:.3f} - S_dl = {S_dl:.3f} - S_clr = {S_clr:.3f}")
    return w_gcm * S_gcm + w_dl * S_dl + w_clr * S_clr

# Process Video

In [10]:
!wget https://data.vision.ee.ethz.ch/cvl/SumMe/SumMe.zip
!unzip SumMe.zip

--2025-04-13 07:36:50--  https://data.vision.ee.ethz.ch/cvl/SumMe/SumMe.zip
Resolving data.vision.ee.ethz.ch (data.vision.ee.ethz.ch)... 129.132.52.178, 2001:67c:10ec:36c2::178
Connecting to data.vision.ee.ethz.ch (data.vision.ee.ethz.ch)|129.132.52.178|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2396688224 (2.2G) [application/zip]
Saving to: ‘SumMe.zip’


2025-04-13 07:38:46 (19.9 MB/s) - ‘SumMe.zip’ saved [2396688224/2396688224]

Archive:  SumMe.zip
   creating: GT/
  inflating: GT/Air_Force_One.mat    
  inflating: GT/Base jumping.mat     
  inflating: GT/Bearpark_climbing.mat  
  inflating: GT/Bike Polo.mat        
  inflating: GT/Bus_in_Rock_Tunnel.mat  
  inflating: GT/Car_railcrossing.mat  
  inflating: GT/Cockpit_Landing.mat  
  inflating: GT/Cooking.mat          
  inflating: GT/Eiffel Tower.mat     
  inflating: GT/Excavators river crossing.mat  
  inflating: GT/Fire Domino.mat      
  inflating: GT/Jumps.mat            
  inflating: GT/Kids_play

## Function to Evaluate

In [11]:
# Evaluation Functions
def binarize_ground_truth(gt_scores, summary_length_ratio=0.15):
    num_frames = len(gt_scores)
    num_summary_frames = int(num_frames * summary_length_ratio)
    threshold = np.sort(gt_scores)[::-1][num_summary_frames]
    binary_labels = (gt_scores >= threshold).astype(int)
    return binary_labels

def get_summary_labels(selected_indices, superframes, total_frames, size_superframe):
    summary_labels = np.zeros(total_frames, dtype=int)
    for idx in selected_indices:
        start_frame = idx * size_superframe
        end_frame = start_frame + len(superframes[idx])
        summary_labels[start_frame:end_frame] = 1
    return summary_labels

def compute_f_measure(summary_labels, gt_labels):
    precision, recall, f_measure, _ = precision_recall_fscore_support(
        gt_labels, summary_labels, average='binary', pos_label=1, zero_division=0
    )
    return f_measure

In [14]:
def process_video(input_path: str, gt_path: str, output_path: str = None):
    """
    Process a video to create a summarized version based on interestingness scores.
    
    Args:
        input_path (str): Path to the input video file.
        output_path (str): Path to save the summarized video.
    """
    start_time = time.time()
    cap = cv2.VideoCapture(input_path)
    if not cap.isOpened():
        print("Error: Cannot open video file.")
        return

    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    T_video = total_frames / fps if fps > 0 else 0
    print(f"Video has resolution: {width} x {height} with {fps} FPS and {total_frames} frames")

    superframes = []
    current_superframe = []
    frame_count = 0
    scores = []

    T1_start = time.time()
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        current_superframe.append(frame)
        frame_count += 1

        if len(current_superframe) >= SIZE_SUPERFRAME:
            superframes.append(current_superframe)
            
            current_superframe = []

    if current_superframe:
        superframes.append(current_superframe)
    T1 = time.time() - T1_start
    cap.release()
    
    print(f"Total super-frames created: {len(superframes)}")
    
    T2_start = time.time()
    for i, superframe in enumerate(superframes):
        score = compute_interesness_score(superframe, i)
        scores.append(score)

    # Chọn super-frames có điểm cao nhất
    max_summary_frames = int(SUMMARY_RATIO * total_frames)  
    selected_indices = []
    total_selected_frames = 0

    # Sắp xếp theo điểm giảm dần để chọn các super-frame tốt nhất
    sorted_indices = np.argsort(scores)[::-1]
    for idx in sorted_indices:
        superframe_length = len(superframes[idx])
        if total_selected_frames + superframe_length <= max_summary_frames:
            selected_indices.append(idx)
            total_selected_frames += superframe_length

    T2 = time.time() - T2_start
    T_total = time.time() - start_time
    I_ratio = T_total / T_video if T_video > 0 else 0

    print(f"Timing Metrics:")
    print(f"T1 (Feature Extraction): {T1:.2f} s (T1/T_video: {T1/T_video:.4f})")
    print(f"T2 (Interestingness + Dictionary Update): {T2:.2f} s (T2/T_video: {T2/T_video:.4f})")
    print(f"T_total: {T_total:.2f} s")
    print(f"T_video: {T_video:.2f} s")
    print(f"I_ratio: {I_ratio:.4f}")

    gt_data = sio.loadmat(gt_path)
    gt_scores = gt_data.get('gt_score', None)
    if gt_scores is None:
        print("Error: 'gt_score' not found in ground truth file.")
        return None
    gt_scores = gt_scores.flatten()
    if len(gt_scores) != total_frames:
        print(f"Warning: Ground truth length ({len(gt_scores)}) does not match video length ({total_frames}).")
        return None

    gt_labels = binarize_ground_truth(gt_scores, summary_length_ratio=SUMMARY_RATIO)
    summary_labels = get_summary_labels(selected_indices, superframes, total_frames, SIZE_SUPERFRAME)
    f_measure = compute_f_measure(summary_labels, gt_labels)
    print(f"F-measure (at {SUMMARY_RATIO * 100}% summary length): {f_measure:.3f}\n")
    
    # Generate video result if needed
    if output_path:
        generate_video_summary(selected_indices, output_path)
        
    ## Return index of selected superframe
    return f_measure, I_ratio

In [15]:
def generate_video_summary(selected_indices, output_path):
    if selected_indices:
        final_output_path = output_path
        
        # Khởi tạo VideoWriter
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(final_output_path, fourcc, fps, (width, height))
        if not out.isOpened():
            print(f"Error: Cannot create output video file '{final_output_path}'. "
                  f"Check if OpenCV is built with FFmpeg support or try a different codec.")
            return
    
        # Ghi các super-frames theo thứ tự thời gian
        for idx in sorted(selected_indices):  # Sắp xếp theo chỉ số để giữ thứ tự thời gian
            for frame in superframes[idx]:
                if frame is not None and frame.size > 0:
                    out.write(frame)
                else:
                    print(f"Warning: Invalid frame in super-frame {idx}, skipping.")
    
        out.release()
        print(f"Summary video saved at '{final_output_path}' with {total_selected_frames} frames "
              f"({total_selected_frames/fps:.2f} seconds).")
    else:
        print("No super-frames selected for summary.")
    

In [18]:
f_measure, i_ratio = process_video('/kaggle/working/videos/Scuba.mp4', '/kaggle/input/summe-dataset/Scuba.mat')

Video has resolution: 1280 x 720 with 30.0 FPS and 2221 frames
Total super-frames created: 75
S_gcm = 0.000 - S_dl = 0.396 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.004 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.001 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.001 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.014 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.008 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.002 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.006 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.007 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.002 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.011 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.014 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.013 - S_clr = 0.004
S_gcm = 0.000 - S_dl = 0.016 - S_clr = 0.004
S_gcm = 0.000 - S_dl = 0.004 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.008 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.010 - S_clr = 0.004
S_gcm = 0.000 - S_dl = 0.003 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.004 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.011 - S_clr = 0.003
S_gcm 

In [20]:
## Visualize Test
f_scores = []
i_ratios = []
sizes = [i for i in  range(20, 110, 10)]

for size in sizes:
    SIZE_SUPERFRAME = size
    
    print(f"Size of superframe: {SIZE_SUPERFRAME}")
    f_score, i_ratio = process_video('/kaggle/working/videos/Scuba.mp4', '/kaggle/input/summe-dataset/Scuba.mat')
    f_scores.append(f_score)
    i_ratios.append(i_ratio)

Size of superframe: 20
Video has resolution: 1280 x 720 with 30.0 FPS and 2221 frames
Total super-frames created: 112
S_gcm = 0.000 - S_dl = 0.042 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.002 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.003 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.001 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.002 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.002 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.010 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.007 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.002 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.002 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.004 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.004 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.004 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.002 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.003 - S_clr = 0.004
S_gcm = 0.000 - S_dl = 0.004 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.018 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.006 - S_clr = 0.003
S_gcm = 0.000 - S_dl = 0.006 - S_clr = 0.004
S_gcm = 0.000 - S_dl = 0.01

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 6))
plt.plot(Sizes, f_scores)
plt.xlabel('Super-frame Size (frames)')
plt.ylabel('F-Score')
plt.legend()

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(Sizes, i_ratios)
plt.xlabel('Super-frame Size (frames)')
plt.ylabel('I_Ratio')
plt.legend()