In [6]:
# % pip install opencv-python
# % pip install scipy

In [7]:
import cv2
import os

import subprocess

In [8]:
def extract_frames(video_path, output_folder, interval=10):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    # Open the video file
    cap = cv2.VideoCapture(video_path)

    # Get the frames per second (fps) of the video
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    # Calculate the interval in frames
    frame_interval = interval * fps

    frame_count = 0
    saved_count = 0

    while True:
        ret, frame = cap.read()

        if not ret:
            break

        if frame_count % frame_interval == 0:
            frame_filename = os.path.join(output_folder, f"frame_{saved_count:04d}.jpg")
            cv2.imwrite(frame_filename, frame)
            saved_count += 1

        frame_count += 1

    cap.release()


def extract_video_segment(input_file, output_file, start_time, duration):
    """
    Extracts a segment from a video file.

    Args:
        input_file (str): Path to the input video file.
        output_file (str): Path to the output video file.
        start_time (str): Start time in the format "HH:MM:SS".
        duration (str): Duration of the segment in the format "HH:MM:SS".
    """
    command = [
        "ffmpeg",
        "-i",
        input_file,
        "-ss",
        start_time,
        "-t",
        duration,
        "-c",
        "copy",
        output_file,
    ]

    try:
        subprocess.run(command, check=True)
        print(f"Extracted segment saved to {output_file}")
    except subprocess.CalledProcessError as e:
        print(f"An error occurred: {e}")

In [12]:
# extract_frames("./data/video_for_motion_mag.mp4", "./data/frames", interval=10)
extract_video_segment(
    "./data/video_for_motion_mag.mp4",
    "./data/segment_1.avi",
    start_time="00:02:00",
    duration="00:02:00",
)

Extracted segment saved to ./data/segment_1.avi


In [16]:
import cv2
import numpy as np
import scipy.fftpack as fftpack


def build_gaussian_pyramid(src, levels=3):
    src_copy = src.copy()
    pyramid = [src_copy]
    for i in range(levels):
        src_copy = cv2.pyrDown(src_copy)
        pyramid.append(src_copy)
    return pyramid


def reconstruct_from_gaussian_pyramid(pyramid):
    levels = len(pyramid)
    res = pyramid[levels - 1].astype(np.float32)
    for i in range(levels - 2, -1, -1):
        up = cv2.pyrUp(res)
        if up.shape != pyramid[i].shape:
            up = cv2.resize(up, (pyramid[i].shape[1], pyramid[i].shape[0]))
        res = cv2.add(up, pyramid[i].astype(np.float32))
    return res


def temporal_bandpass_filter(video_frames, low_cutoff, high_cutoff, fps):
    fft = fftpack.fft(video_frames, axis=0)
    frequencies = fftpack.fftfreq(video_frames.shape[0], d=1.0 / fps)
    bound_low = np.abs(frequencies) >= low_cutoff
    bound_high = np.abs(frequencies) <= high_cutoff
    bandpass_mask = bound_low & bound_high
    fft[~bandpass_mask] = 0
    return np.real(fftpack.ifft(fft, axis=0))


def amplify_motion(
    amplification_factor=2, low_cutoff=0.4, high_cutoff=3, fps=30, pyramid_levels=3
):
    cap = cv2.VideoCapture("./data/segment_1.avi")
    ret, src = cap.read()
    if not ret:
        print("Error: Could not read the video file.")
        return

    src_pyramid = build_gaussian_pyramid(src, pyramid_levels)
    for level in range(pyramid_levels):
        src_pyramid[level] = np.float32(src_pyramid[level])

    height, width, _ = src.shape
    fourcc = cv2.VideoWriter_fourcc(*"XVID")
    out = cv2.VideoWriter("output.avi", fourcc, fps, (width, height))

    video_frames = []

    for frame_num in range(fps * 5):  # Example: processing 5 seconds of video
        ret, frame = cap.read()
        if not ret:
            break
        video_frames.append(frame)

    video_frames = np.array(video_frames)
    filtered_frames = temporal_bandpass_filter(
        video_frames, low_cutoff, high_cutoff, fps
    )

    for frame_num in range(filtered_frames.shape[0]):
        frame = filtered_frames[frame_num]
        frame_pyramid = build_gaussian_pyramid(frame, pyramid_levels)
        for level in range(pyramid_levels):
            frame_pyramid[level] = np.float32(frame_pyramid[level])

        for level in range(pyramid_levels):
            src_pyramid[level] = cv2.addWeighted(
                src_pyramid[level],
                1 - amplification_factor,
                frame_pyramid[level],
                amplification_factor,
                0,
            )

        magnified_frame = reconstruct_from_gaussian_pyramid(src_pyramid)
        magnified_frame = np.clip(magnified_frame, 0, 255)
        magnified_frame = np.uint8(magnified_frame)
        out.write(magnified_frame)

    cap.release()
    out.release()

In [17]:
amplify_motion()

TypeError: amplify_motion() missing 1 required positional argument: 'src'