In [1]:
import os
import cv2
import numpy as np
import scipy.fftpack
import scipy.fft as fft
from skimage.metrics import structural_similarity as ssim

## Applying a temporal filtering to an input video
The following function applies a temporal filtering to a video. this filtering can be a `lowpass`, `highpass`, or a `bandpass` filter, and we need to provide the low and high cutting frequencies of these filters

In [2]:
def apply_temporal_filter(input_video, output_dir, format, low_freq, high_freq, mode):
    # Create results directory
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Open video
    cap = cv2.VideoCapture(input_video)
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    print(f"Processing {input_video} | FPS: {fps}, Frames: {frame_count}, Resolution: {width}x{height}")

    # Create a memory-mapped file to store frames
    mmap_file = "video_frames.dat"
    frames = np.memmap(mmap_file, dtype=np.float16, mode="w+", shape=(frame_count, height, width, 3))

    for i in range(frame_count):
        ret, frame = cap.read()
        if not ret:
            break
        frames[i] = frame.astype(np.float16)

    cap.release()

    # FFT + Filtering
    print("Applying FFT (rFFT)...")
    frames_fft = scipy.fft.rfft(frames, axis=0) 
    freqs = scipy.fft.rfftfreq(frame_count, d=1/fps)

    # Creating filters
    print("Filtering...")
    filtered_fft = np.zeros_like(frames_fft)
    lowpass_mask = (freqs <= low_freq)
    highpass_mask = (freqs >= high_freq)

    # Apply filter: Zero out frequencies outside the selected range
    filtered_fft[:] = 0
    if mode == "lowpass":
        np.multiply(frames_fft, lowpass_mask[:, None, None, None], out=filtered_fft)
    elif mode == "highpass":
        np.multiply(frames_fft, highpass_mask[:, None, None, None], out=filtered_fft)
    elif mode == "bandpass":
        bandpass_mask = (freqs >= low_freq) & (freqs <= high_freq)
        np.multiply(frames_fft, bandpass_mask[:, None, None, None], out=filtered_fft)

    # Inverse FFT to reconstruct filtered motion
    print("Applying Inverse FFT...")
    filtered_frames = np.real(scipy.fft.irfft(filtered_fft, axis=0)) 

    # Normalize back to 8-bit
    filtered_frames = np.clip(filtered_frames, 0, 255).astype(np.uint8)

    # Save filtered video
    output_path = os.path.join(output_dir, f"{os.path.splitext(os.path.basename(input_video))[0]}_filtered{format}")
    fourcc = cv2.VideoWriter_fourcc(*'DIVX')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    print(f"Saving filtered video to {output_path}")
    for frame in filtered_frames:
        out.write(frame)
    out.release()

    print("Processing complete!")  

    # Clean up memory-mapped file
    del frames
    os.remove(mmap_file)

In [4]:
sequences = {"sea_view": ".mov", "grass": ".mp4", "custom": ".mp4"}
amps = ["10", "20", "50", "100"]

In [None]:
# Config
seq = "sea_view" # "grass" "custom"
output_dir = "filtered_results"
video_format = ".mp4"

# Filter settings
low_pass = 2
high_pass = 100

for i in range(4):
    video_name = "res-videos" + "/" + seq + "/" + seq + f"_amp{amps[i]+sequences[seq]}"
    apply_temporal_filter(video_name, output_dir, video_format, low_pass, high_pass, "lowpass")

## Computing SSIM
We want to evaluate the quality of the magnified, and filtered magnified videos compared to the original ones. The goal is to assess the structural fidelity of the magnified videos to the original ones.

In [None]:
def compute_video_ssim(video1_path, video2_path, debug=True):
    cap1 = cv2.VideoCapture(video1_path)
    cap2 = cv2.VideoCapture(video2_path)
    
    h1 = int(cap1.get(cv2.CAP_PROP_FRAME_WIDTH))
    h2 = int(cap2.get(cv2.CAP_PROP_FRAME_WIDTH))
    
    if debug:
        print(f"Comparing videos: {video1_path} and {video2_path}")
        print(f"1st video shape: {h1, int(cap1.get(cv2.CAP_PROP_FRAME_HEIGHT))}, 2nd: {h2, int(cap2.get(cv2.CAP_PROP_FRAME_HEIGHT))}")

    if not cap1.isOpened() or not cap2.isOpened():
        print("Error: Could not open one or both videos.")
        return None

    ssim_scores = []
    
    print("Computing SSIM...")
    while True:
        ret1, frame1 = cap1.read()
        ret2, frame2 = cap2.read()

        # Break if either video ends
        if not ret1 or not ret2:
            break
        
        # Rotate the second frame if both videos have rotated dimensions
        if h1 != h2:
            frame2 = cv2.rotate(frame2, cv2.ROTATE_90_CLOCKWISE)

        # SSIM is computed on luminance space => convert to Grayscale
        gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
        gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)
        score, _ = ssim(gray1, gray2, full=True)
        ssim_scores.append(score)

    cap1.release()
    cap2.release()
    
    print(f"Finished computing SSIM for {video2_path}")

    # Average SSIMs
    avg_ssim = np.mean(ssim_scores)
    return avg_ssim

In [None]:
seq = "custom" # "grass" "custom"
ssims = []

for i in range(4):
    video_1 = "res-videos" + "/" + seq + "/" + seq + f"_amp{amps[i]+sequences[seq]}"
    # video_1 = "filtered_results" + "/" + seq + f"_amp{amps[i]}_filtered.mp4"
    video_2 = "videos/" + seq + f"{sequences[seq]}"
    ssims.append(compute_video_ssim(video_1, video_2, debug=True))
    
print(ssims)