Кольцов Кирилл Евгеньевич, ВМК МГУ, группа 209, 2025
Эвристический детектор смены сцен в видео на основе ансамбля трёх метрик

Весь блок ниже - функции, скопированные из предоставленного шаблона задания, обеспечивающие чтение и оценку точности детектора

In [41]:
import numpy as np
import cv2 # Для установки opencv воспользуйтесь командой в терминале conda install -c conda-forge opencv
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
import os

%matplotlib inline

import json
def load_json_from_file(filename):
    with open(filename, "r") as f:
        return json.load(f, strict=False)


def dump_json_to_file(obj, filename, **kwargs):
    with open(filename, "w") as f:
        json.dump(obj, f, **kwargs)

def read_video(video_path):
    cap = cv2.VideoCapture(video_path)
    frames = []
    while(cap.isOpened()):
        ret, frame = cap.read()
        if ret==False:
            break
        yield frame
    cap.release()

def visualize_metric_error(frame, prev_frame, value):
    fig = plt.figure(figsize=(16,4))
    plt.suptitle('Значение метрики на текущем кадре: {:.4f}'.format(value), fontsize=24)
    ax = fig.add_subplot(1, 2, 1)
    ax.imshow(prev_frame[:,:,::-1])
    ax.set_title("Предыдущий кадр", fontsize=18)
    ax.set_xticks([])
    ax.set_yticks([])
    ax = fig.add_subplot(1, 2, 2)
    ax.imshow(frame[:,:,::-1])
    ax.set_title("Текущий кадр", fontsize=18)
    ax.set_xticks([])
    ax.set_yticks([])
    plt.subplots_adjust(top=0.80)

def visualize_metric_values(metric_values, threshold, cuts = None):
    sns.set()
    plt.figure(figsize=(16, 8))
    plt.plot(metric_values, label='Значение метрики на кадрах')
    plt.xlabel('Номер кадра')
    plt.ylabel('Значение метрики')
    plt.hlines(y=threshold, xmin=0, xmax=len(metric_values), linewidth=2, color='r', label='Пороговое значение')
    
    if cuts is not None:
        for cut in cuts:
            plt.axvline(x=cut, color='k', linestyle=':', linewidth=0.5, label='Смена сцены')

    handles, labels = plt.gca().get_legend_handles_labels()
    by_label = dict(zip(labels, handles))
    plt.legend(by_label.values(), by_label.keys())
    plt.show()

def calculate_matrix(true_scd, predicted_scd, scene_len, not_to_use_frames=set()):
    predicted_scd = set(predicted_scd)
    tp, fp, tn, fn = 0, 0, 0, 0
    scene_len = scene_len
    for scd in predicted_scd:
        if scd in true_scd:
            tp += 1
        elif scd not in not_to_use_frames:
            fp += 1
    for scd in true_scd:
        if scd not in predicted_scd:
            fn += 1
    tn = scene_len - len(not_to_use_frames) - tp - fp - fn
    return tp, fp, tn, fn

def calculate_precision(tp, fp, tn, fn):
    return tp / max(1, (tp + fp))

def calculate_recall(tp, fp, tn, fn):
    return tp / max(1, (tp + fn))

def f1_score(true_scd, predicted_scd, scene_len, not_to_use_frames=set()):
    tp, fp, tn, fn = calculate_matrix(true_scd, predicted_scd, scene_len, not_to_use_frames)
    precision_score = calculate_precision(tp, fp, tn, fn)
    recall_score = calculate_recall(tp, fp, tn, fn)
    if precision_score + recall_score == 0:
        return 0
    else:
        return 2 * precision_score * recall_score / (precision_score + recall_score)
    
def f1_score_matrix(tp, fp, tn, fn):
    precision_score = calculate_precision(tp, fp, tn, fn)
    recall_score = calculate_recall(tp, fp, tn, fn)
    if precision_score + recall_score == 0:
        return 0
    else:
        return 2 * precision_score * recall_score / (precision_score + recall_score)
    
def run_scene_change_detector_all_video(scene_change_detector, dataset_path):
    video_dataset = load_json_from_file(os.path.join(dataset_path, 'info.json'))
    param_log = {
        '_mean_f1_score': []
    }
    for video_info in tqdm(video_dataset, leave=False):
        # Загружаем видео, его длину и смены сцен
        frames = read_video(os.path.join(dataset_path, video_info['source']))
        video_len = video_info['len']
        true_scene_changes = load_json_from_file(os.path.join(dataset_path, video_info['scene_change']))
        
        # Составляем список сцен, которые не будут тестироваться
        not_use_frames = set()
        for type_scene_change in ['trash', 'fade', 'dissolve']:
            for bad_scene_range in true_scene_changes.get(type_scene_change, []):
                not_use_frames.update(list(range(bad_scene_range[0], bad_scene_range[1] + 1)))
        
        predicted_scene_changes, _, _ = scene_change_detector(frames)
        
        param_log['f1_score_{}'.format(video_info['source'])] = f1_score(
            true_scene_changes['cut'],
            predicted_scene_changes,
            video_len,
            not_use_frames
        )
        video_tp, video_fp, video_tn, video_fn = calculate_matrix(
            true_scene_changes['cut'],
            predicted_scene_changes,
            video_len,
            not_use_frames
        )
        
        param_log['tp_{}'.format(video_info['source'])] = video_tp
        param_log['fp_{}'.format(video_info['source'])] = video_fp
        param_log['tn_{}'.format(video_info['source'])] = video_tn
        param_log['fn_{}'.format(video_info['source'])] = video_fn 
        param_log['_mean_f1_score'].append(param_log['f1_score_{}'.format(video_info['source'])])
    param_log['_mean_f1_score'] = np.mean(param_log['_mean_f1_score'])
    return param_log

Следующая ячейка содержит вспомогательную функцию view_as_blocks, которая делит поданный в неё кадр на количество квадратных блоков, равное второму аргументу number_of_blocks в квадрате.
Во всех трех метриках используется подобное разбиение, так как оно позволят повысить устойчивость к незначительным изменениям и к движению объектов в кадре. Все дальнейшие расчеты метрик производятся между соответствующими блоками двух соседних кадров

In [None]:
def view_as_blocks(frame, number_of_blocks):
    H, W, C = frame.shape
    block_h = H // number_of_blocks   
    block_w = W // number_of_blocks 

    H_cropped = block_h * number_of_blocks
    W_cropped = block_w * number_of_blocks
    frame = frame[:H_cropped, :W_cropped, :]

    blocks = frame.reshape(
        number_of_blocks, block_h,
        number_of_blocks, block_w,
        C
    ).transpose(0, 2, 1, 3, 4)

    return blocks

Далее представлено описание трёх метрик. 
Названия главных функций, возвращающих вектор значений, по которым можно судить о смене сцены, имеют структуру calculate_<metric>_vector. Значения в векторе есть ни что иное, как значения метрики для каждого отдельного блока кадра

Первая из них - DSSIM (structural dissimilarity index measure). Она ориентируется на общие структурные признаки в соседних блоках, считая среднее значение, дисперсию и ковариацию. Из всех трёх на тестовых данных имеет наибольшую точность. (1)

In [None]:
def compute_ssim_for_blocks(prev_block, block, k1=0.01, k2=0.03, L=255):
    block = cv2.cvtColor(block, cv2.COLOR_BGR2GRAY).astype(np.float64)
    prev_block = cv2.cvtColor(prev_block, cv2.COLOR_BGR2GRAY).astype(np.float64)

    C1 = (k1 * L) ** 2
    C2 = (k2 * L) ** 2

    mu1 = np.mean(block, axis=(0, 1))
    mu2 = np.mean(prev_block, axis=(0, 1))
    mu1_sq = mu1 ** 2
    mu2_sq = mu2 ** 2
    mu1_mu2 = mu1 * mu2
    
    sigma1_sq = np.var(block, axis=(0, 1))
    sigma2_sq = np.var(prev_block, axis=(0, 1)) 
    sigma12 = np.mean(block * prev_block, axis=(0, 1)) - mu1_mu2
    
    numerator = (2 * mu1_mu2 + C1) * (2 * sigma12 + C2)
    denominator = (mu1_sq + mu2_sq + C1) * (sigma1_sq + sigma2_sq + C2)
    
    if denominator == 0:
        ssim = 1.0 if numerator == 0 else 0.0
    else:
        ssim = numerator / denominator

    return ssim


def calculate_dssim_vector(prev_frame, frame, number_of_blocks=10):
    frame = view_as_blocks(frame, number_of_blocks)
    prev_frame = view_as_blocks(prev_frame, number_of_blocks)

    dssim_vector = []
    
    for i in range(0, number_of_blocks):
        for j in range(0, number_of_blocks):
            block = frame[i, j]
            prev_block = prev_frame[i, j]
            
            block_ssim = compute_ssim_for_blocks(prev_block, block)
            
            dssim_vector.append((1 - block_ssim) / 2)
    
    return dssim_vector


Вторая - на основе преобразования Собеля. Она стремится выявить границы объектов на видео и определить, насколько они изменились между кадрами. Нормализация способствовала приросту точности метрики. 

Исключена из итогового решения ради увеличения производительности. (2)

In [None]:
def normalize_manual(frame):
    min_val = np.min(frame)
    max_val = np.max(frame)
    if max_val - min_val == 0:
        return np.zeros_like(frame, dtype=np.uint8)
    
    norm = (frame - min_val) * 255.0 / (max_val - min_val)
    return norm.astype(np.uint8)

def compute_sobel_euclidean(block, ksize=3):
    if len(block.shape) == 3:
        gray = cv2.cvtColor(block, cv2.COLOR_BGR2GRAY)
    else:
        gray = block.copy()
    
    grad_x = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=ksize)
    grad_y = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=ksize)
    
    sobel = np.sqrt(grad_x**2 + grad_y**2)
    sobel_norm = normalize_manual(sobel)

    return sobel_norm


def calculate_sobel_vector(prev_frame, frame, number_of_blocks=10, ksize=3):
    blocks_prev = view_as_blocks(prev_frame, number_of_blocks)
    blocks_curr = view_as_blocks(frame, number_of_blocks)
    
    diff_vector = []
    
    for i in range(number_of_blocks):
        for j in range(number_of_blocks):
            block_prev = blocks_prev[i, j]
            block_curr = blocks_curr[i, j]
            
            sobel_prev = compute_sobel_euclidean(block_prev, ksize)
            sobel_curr = compute_sobel_euclidean(block_curr, ksize)
            
            diff = np.abs(sobel_prev.astype(np.int16) - sobel_curr.astype(np.int16))
            diff_vector.append(np.mean(diff))
    
    return diff_vector


И третья работает с цветовыми гистограммами в пространстве HSV. Она выявляет зависимости в насыщенности, тоне и яркости, а также обладает большей устойчивостью к шумам, в отличие от той же метрики в пространстве RGB. Данный факт описывается в литературе и был подтвержден на практике. 
Сравнение двух гистограмм происходит по так называемой чи-квадрат метрике. 
В процессе тестирования было обнаружено, что метрика заметно хуже показывает себя на черно-белых видео. Соответственно проводится одна проверка на цветовую гамму и при необходимости корректируются параметры. (3)

In [None]:
# Сравнивает две гистограммы по чи-квадрат метрике
def compare_hist_chi_square(hist1, hist2, eps=1e-10):
    h1 = hist1.flatten()
    h2 = hist2.flatten()

    chi_square = np.sum(((h1 - h2) ** 2) / (h1 + h2 + eps))
    return chi_square

def normalize_hist_manual(hist, alpha=0, beta=1):
    min_val = np.min(hist)
    max_val = np.max(hist)
    
    if max_val - min_val == 0:
        return np.full(hist.shape, alpha, dtype=hist.dtype)
    
    norm_hist = (hist - min_val) * (beta - alpha) / (max_val - min_val) + alpha
    return norm_hist

def is_grayscale_block(block, tol=1e-5):
    return np.allclose(block[:,:,0], block[:,:,1], atol=tol) and np.allclose(block[:,:,1], block[:,:,2], atol=tol)

def is_grayscale_frame(frame, number_of_blocks=10):
    frame_blocks = view_as_blocks(frame, number_of_blocks)

    for i in range(number_of_blocks):
        for j in range(number_of_blocks):
          if is_grayscale_block(frame_blocks[i][j]):
              return True

    return False

def calculate_hist_vector(prev_frame, frame, hist_gray_frame_flag=False, h_bins=50, s_bins=60, blocks=10):
    if not hist_gray_frame_flag:
        prev_frame_blocks = view_as_blocks(cv2.cvtColor(prev_frame, cv2.COLOR_BGR2HSV), blocks)
        frame_blocks = view_as_blocks(cv2.cvtColor(frame, cv2.COLOR_BGR2HSV), blocks)
    else:
        prev_frame_blocks = view_as_blocks(prev_frame, blocks)
        frame_blocks = view_as_blocks(frame, blocks)

    hist_vector = []

    for i in range(blocks):
        for j in range(blocks):
            prev_frame_block = prev_frame_blocks[i, j]
            frame_block = frame_blocks[i, j]
            
            if hist_gray_frame_flag:
                hist1 = cv2.calcHist([prev_frame_block], [0], None, [256], [0, 256])
                hist2 = cv2.calcHist([frame_block], [0], None, [256], [0, 256])
            else:
                hist1 = cv2.calcHist([prev_frame_block], [0, 1], None, [h_bins, s_bins], [0, 180, 0, 256])
                hist2 = cv2.calcHist([frame_block], [0, 1], None, [h_bins, s_bins], [0, 180, 0, 256])
            
            hist1_norm = normalize_hist_manual(hist1, alpha=0, beta=1)
            hist2_norm = normalize_hist_manual(hist2, alpha=0, beta=1)
            
            corr = compare_hist_chi_square(hist1_norm, hist2_norm)
            hist_vector.append(corr)
    
    return hist_vector

Далее идёт основной алгоритм.

Главная идея заключается в динамическом вычислении порога по формуле в зависимости от кадров в окне. Под окном подразумеватся группа подряд идущих N кадров, в данном случае N = 10. Сама формула имеет вид: 
threshold = mean + k * std (*),
где mean - это среднее значение последовательности средних значений каждого кадра, посчитанных на основе вектора значений метрики в блоках кадра, std - стандартное отклонение той же последовательности, k - некоторый коэффициент. 
Далее каждый блок нового кадра сравнивается с посчитанным порогом threshold. Затем считается процент прошедших порог блоков percentage, и это число сравнивается с некоторым фиксированным значением, после чего делается вывод о наличии смены сцены. 
В случае ансамбля используется метод мягкого голосования soft_vote, где все три метрики вносят свой вклад на основе предварительно посчитанной точности accuracy и процента блоков percentage:
soft_vote = (accuracy1 * percentage1 + accuracy2 * percentage2 + accuracy3 * percentage3) / (accuracy1 + accuracy2 + accuracy3). Далее soft_vote сравнивается с установленным порогом и делается вывод о наличии смены сцены.

Данный подход эмпирически показал наилучший результат, в отличие от остальных, где: формула (*) считается отдельно для каждой последовательности соответствующих блоков; проверку на преодоление порога проходит не каждый блок нового кадра, а среднее значение посчитанных для его блоков выходов метрик.

Кэффициент k = 3 (и k = 13 в случае черно-белого видео для гстограммной метрики) был подобран эмпирически. Данная формула показала хорошие результаты по сравнению с фиксированным глобальным порогом.

Сам алгоритм прохождения по видео окном выглядит так: существует переменная remained_frames, которая показывает, сколько кадров не хватает в окне для полной загруженности. Она изначально равно размеру окна, и при первых итерациях уменьшается, тем самым позволяя окну заполняться. Само окно с каждой итерацией пополняется вектором значений метрики, которые были посчитаны между двумя соседними кадрами. Когда remained_frames становится равной нулю, начинаюся расчеты порога, а также голосование, по которому заключается присутствие или отсутствие смены сцены в новом кадре, по сравнению с предыдущими в окне. Если смены не было, то происходит обычное обновление окна. В обратном случае remained_frames сбрасывается до нуля и окно заполняется заново.

Ниже приведены две вспомогательные функции

In [None]:
# Подсчет порога по формуле и вычисление вектора блоков, которые порог прошли
def get_mask(window, current, k):
    mean = np.mean(window, axis=0)
    std = np.std(window, axis=0)
    threshold = mean + k * std

    mask = current > threshold
    return mask

# Обновление окна
def update_window(window, current, number_of_max_elements_in_window):
    window.append(current)

    if len(window) > number_of_max_elements_in_window:
        window.pop(0)

In [None]:
# GRADED CELL: scene_change_detector

def scene_change_detector(frames, threshold=None, with_vis=False):
    scene_changes = []
    vis = []
    metric_values = []
    
    ### START CODE HERE ###
    
    # Общие параметры
    prev_frame = None
    number_of_frames_in_window = 10 
    remained_frames = number_of_frames_in_window
    global_threshold = 0.6
    
    # (1)
    dssim_window = []
    dssim_k = 3
    dssim_accuracy = 0.89  # Значения <metric>_accuracy были посчитаны отдельно для каждой метрики 

    # (2)
    sobel_k = 2
    sobel_window = []
    sobel_accuracy = 0.76

    # (3)
    hist_window = []
    hist_k = 3
    hist_grey_k = 13
    hist_gray_frame_flag = False
    hist_accuracy = 0.80

    ###  END CODE HERE  ###
    
    for idx, frame in tqdm(enumerate(frames), leave=False):

        ### START CODE HERE ###
        
        if idx == 0:
            prev_frame = frame
            continue

        # (1)
        dssim_current = calculate_dssim_vector(prev_frame, frame)

        # (2)
        sobel_current = calculate_sobel_vector(prev_frame, frame)

        # (3)
        if hist_gray_frame_flag or is_grayscale_frame(frame):
            hist_gray_frame_flag = True
            hist_k = hist_grey_k

        hist_current = calculate_hist_vector(prev_frame, frame, hist_gray_frame_flag)

        if remained_frames == 0:
            dssim_percentage = np.mean(get_mask(dssim_window, dssim_current, dssim_k))
            sobel_percentage = np.mean(get_mask(sobel_window, sobel_current, sobel_k))
            hist_percentage = np.mean(get_mask(hist_window, hist_current, hist_k))
            
            soft_vote = 0
            soft_vote += dssim_accuracy * dssim_percentage 
            soft_vote += sobel_accuracy * sobel_percentage 
            soft_vote += hist_accuracy * hist_percentage 
            soft_vote /= dssim_accuracy + hist_accuracy

            if (soft_vote > global_threshold):
                scene_changes.append(idx)
            
                if with_vis:
                    if len(vis) < 100:
                        vis.append([prev_frame, frame])

                remained_frames = number_of_frames_in_window

            metric_values.append([dssim_percentage, sobel_percentage, hist_percentage])
            metric_values.append([dssim_percentage, hist_percentage])
        else:
            remained_frames -= 1

        update_window(dssim_window, np.mean(dssim_current), number_of_frames_in_window)
        update_window(sobel_window, np.mean(sobel_current), number_of_frames_in_window)
        update_window(hist_window, np.mean(hist_current), number_of_frames_in_window)
            
        prev_frame = frame

        ###  END CODE HERE  ###
        pass

    return scene_changes, vis, metric_values