###constants

In [1]:
import numpy as np


gaussian_kernel = (
    np.array(
        [
            [1,  4,  6,  4, 1],
            [4, 16, 24, 16, 4],
            [6, 24, 36, 24, 6],
            [4, 16, 24, 16, 4],
            [1,  4,  6,  4, 1]
        ]
    )
    / 256
)


yiq_from_rgb = (
    np.array(
            [
                [0.29900000,  0.58700000,  0.11400000],
                [0.59590059, -0.27455667, -0.32134392],
                [0.21153661, -0.52273617,  0.31119955]
            ]
        )
    ).astype(np.float32)


rgb_from_yiq = np.linalg.inv(yiq_from_rgb)

### processing

In [2]:
import cv2
import numpy as np
import tqdm

# from constants import rgb_from_yiq, yiq_from_rgb


def loadVideo(video_path):
    image_sequence = []
    video = cv2.VideoCapture(video_path)
    fps = video.get(cv2.CAP_PROP_FPS)

    while video.isOpened():
        ret, frame = video.read()

        if ret is False:
            break

        image_sequence.append(frame[:, :, ::-1])

    video.release()

    return np.asarray(image_sequence), fps


def rgb2yiq(rgb_image):
    image = rgb_image.astype(np.float32)
    return image @ yiq_from_rgb.T


def yiq2rgb(yiq_image):
    image = yiq_image.astype(np.float32)
    return image @ rgb_from_yiq.T


def pyrDown(image, kernel):
    return cv2.filter2D(image, -1, kernel)[::2, ::2]


def pyrUp(image, kernel, dst_shape=None):
    dst_height = image.shape[0] + 1
    dst_width = image.shape[1] + 1

    if dst_shape is not None:
        dst_height -= (dst_shape[0] % image.shape[0] != 0)
        dst_width -= (dst_shape[1] % image.shape[1] != 0)

    height_indexes = np.arange(1, dst_height)
    width_indexes = np.arange(1, dst_width)

    upsampled_image = np.insert(image, height_indexes, 0, axis=0)
    upsampled_image = np.insert(upsampled_image, width_indexes, 0, axis=1)

    return cv2.filter2D(upsampled_image, -1, 4 * kernel)


def idealTemporalBandpassFilter(images,
                                fps,
                                freq_range,
                                axis=0):

    fft = np.fft.fft(images, axis=axis)
    frequencies = np.fft.fftfreq(images.shape[0], d=1.0/fps)

    low = (np.abs(frequencies - freq_range[0])).argmin()
    high = (np.abs(frequencies - freq_range[1])).argmin()

    fft[:low] = 0
    fft[high:] = 0

    return np.fft.ifft(fft, axis=0).real


def reconstructGaussianImage(image, pyramid):
    reconstructed_image = rgb2yiq(image) + pyramid
    reconstructed_image = yiq2rgb(reconstructed_image)
    reconstructed_image = np.clip(reconstructed_image, 0, 255)

    return reconstructed_image.astype(np.uint8)


def reconstructLaplacianImage(image, pyramid, kernel):
    reconstructed_image = rgb2yiq(image)

    for level in range(1, pyramid.shape[0] - 1):
        tmp = pyramid[level]
        for curr_level in range(level):
            tmp = pyrUp(tmp, kernel, pyramid[level - curr_level - 1].shape[:2])
        reconstructed_image += tmp.astype(np.float32)

    reconstructed_image = yiq2rgb(reconstructed_image)
    reconstructed_image = np.clip(reconstructed_image, 0, 255)

    return reconstructed_image.astype(np.uint8)


def getGaussianOutputVideo(original_images, filtered_images):
    video = np.zeros_like(original_images)

    for i in tqdm.tqdm(range(filtered_images.shape[0]),
                       ascii=True,
                       desc="Video Reconstruction"):

        video[i] = reconstructGaussianImage(
                    image=original_images[i],
                    pyramid=filtered_images[i]
                )

    return video


def getLaplacianOutputVideo(original_images, filtered_images, kernel):
    video = np.zeros_like(original_images)

    for i in tqdm.tqdm(range(original_images.shape[0]),
                       ascii=True,
                       desc="Video Reconstruction"):

        video[i] = reconstructLaplacianImage(
                    image=original_images[i],
                    pyramid=filtered_images[i],
                    kernel=kernel
                )

    return video


def saveVideo(video, saving_path, fps):
    (height, width) = video[0].shape[:2]

    fourcc = cv2.VideoWriter_fourcc(*'MJPG')
    writer = cv2.VideoWriter(saving_path, fourcc, fps, (width, height))

    for i in tqdm.tqdm(range(len(video)), ascii=True, desc="Saving Video"):
        writer.write(video[i][:, :, ::-1])

    writer.release()

### gaussian pyramid

In [3]:
import numpy as np
import tqdm

# from processing import idealTemporalBandpassFilter, pyrDown, pyrUp, rgb2yiq


def generateGaussianPyramid(image, kernel, level):
    image_shape = [image.shape[:2]]
    downsampled_image = image.copy()

    for _ in range(level):
        downsampled_image = pyrDown(image=downsampled_image, kernel=kernel)
        image_shape.append(downsampled_image.shape[:2])

    gaussian_pyramid = downsampled_image
    for curr_level in range(level):
        gaussian_pyramid = pyrUp(
                            image=gaussian_pyramid,
                            kernel=kernel,
                            dst_shape=image_shape[level - curr_level - 1]
                        )

    return gaussian_pyramid


def getGaussianPyramids(images, kernel, level):
    gaussian_pyramids = np.zeros_like(images, dtype=np.float32)

    for i in tqdm.tqdm(range(images.shape[0]),
                       ascii=True,
                       desc='Gaussian Pyramids Generation'):

        gaussian_pyramids[i] = generateGaussianPyramid(
                                    image=rgb2yiq(images[i]),
                                    kernel=kernel,
                                    level=level
                        )

    return gaussian_pyramids


def filterGaussianPyramids(pyramids,
                           fps,
                           freq_range,
                           alpha,
                           attenuation):

    filtered_pyramids = idealTemporalBandpassFilter(
                            images=pyramids,
                            fps=fps,
                            freq_range=freq_range
                        ).astype(np.float32)

    filtered_pyramids *= alpha
    filtered_pyramids[:, :, :, 1:] *= attenuation

    return filtered_pyramids

### laplacian pyramid

In [4]:
import numpy as np
import tqdm
from scipy.signal import butter

# from processing import pyrDown, pyrUp, rgb2yiq


def generateLaplacianPyramid(image, kernel, level):
    laplacian_pyramid = []
    prev_image = image.copy()

    for _ in range(level):
        downsampled_image = pyrDown(image=prev_image, kernel=kernel)
        upsampled_image = pyrUp(image=downsampled_image,
                                kernel=kernel,
                                dst_shape=prev_image.shape[:2])
        laplacian_pyramid.append(prev_image - upsampled_image)
        prev_image = downsampled_image

    return laplacian_pyramid


def getLaplacianPyramids(images, kernel, level):
    laplacian_pyramids = []

    for image in tqdm.tqdm(images,
                           ascii=True,
                           desc="Laplacian Pyramids Generation"):

        laplacian_pyramid = generateLaplacianPyramid(
                                    image=rgb2yiq(image),
                                    kernel=kernel,
                                    level=level
                        )
        laplacian_pyramids.append(laplacian_pyramid)

    return np.asarray(laplacian_pyramids, dtype='object')


def filterLaplacianPyramids(pyramids,
                            level,
                            fps,
                            freq_range,
                            alpha,
                            lambda_cutoff,
                            attenuation):

    filtered_pyramids = np.zeros_like(pyramids)
    delta = lambda_cutoff / (8 * (1 + alpha))
    b_low, a_low = butter(1, freq_range[0], btype='low', output='ba', fs=fps)
    b_high, a_high = butter(1, freq_range[1], btype='low', output='ba', fs=fps)

    lowpass = pyramids[0]
    highpass = pyramids[0]
    filtered_pyramids[0] = pyramids[0]

    for i in tqdm.tqdm(range(1, pyramids.shape[0]),
                       ascii=True,
                       desc="Laplacian Pyramids Filtering"):

        lowpass = (-a_low[1] * lowpass
                   + b_low[0] * pyramids[i]
                   + b_low[1] * pyramids[i - 1]) / a_low[0]
        highpass = (-a_high[1] * highpass
                    + b_high[0] * pyramids[i]
                    + b_high[1] * pyramids[i - 1]) / a_high[0]

        filtered_pyramids[i] = highpass - lowpass

        for lvl in range(1, level - 1):
            (height, width, _) = filtered_pyramids[i, lvl].shape
            lambd = ((height ** 2) + (width ** 2)) ** 0.5
            new_alpha = (lambd / (8 * delta)) - 1

            filtered_pyramids[i, lvl] *= min(alpha, new_alpha)
            filtered_pyramids[i, lvl][:, :, 1:] *= attenuation

    return filtered_pyramids

### evm

In [6]:
import os

def gaussian_evm(video_path, saving_path, level=4, alpha=100, low_omega=0.833, high_omega=1, attenuation=1):
    images, fps = loadVideo(video_path)

    gaussian_pyramids = getGaussianPyramids(images=images, kernel=gaussian_kernel, level=level)
    filtered_pyramids = filterGaussianPyramids(pyramids=gaussian_pyramids, fps=fps, freq_range=[low_omega, high_omega], alpha=alpha, attenuation=attenuation)
    output_video = getGaussianOutputVideo(original_images=images, filtered_images=filtered_pyramids)

    saveVideo(video=output_video, saving_path=saving_path, fps=fps)

def laplacian_evm(video_path, saving_path, level=4, alpha=100, lambda_cutoff=1000, low_omega=0.833, high_omega=1, attenuation=1):
    images, fps = loadVideo(video_path)

    laplacian_pyramids = getLaplacianPyramids(images=images, kernel=gaussian_kernel, level=level)
    filtered_pyramids = filterLaplacianPyramids(pyramids=laplacian_pyramids, fps=fps, freq_range=[low_omega, high_omega], alpha=alpha, attenuation=attenuation, lambda_cutoff=lambda_cutoff, level=level)
    output_video = getLaplacianOutputVideo(original_images=images, filtered_images=filtered_pyramids, kernel=gaussian_kernel)

    saveVideo(video=output_video, saving_path=saving_path, fps=fps)

# Define your Colab-specific input parameters
video_path = "/home/maharathy1/MTP/CV_Project/data/birds.mp4"
saving_path = "/home/maharathy1/MTP/CV_Project/results/magnified_birds_optimal.mp4"
level = 6
alpha = 30
low_omega = 1
high_omega = 5
attenuation = 0.1
lambda_cutoff = 16

# Choose between Gaussian or Laplacian mode
mode = 'laplacian'

# Perform EVM based on the chosen mode
if mode == 'gaussian':
    gaussian_evm(video_path, level=level, alpha=alpha, low_omega=low_omega, high_omega=high_omega, saving_path=saving_path, attenuation=attenuation)
else:
    lambda_cutoff = 1000
    laplacian_evm(video_path, level=level, alpha=alpha, lambda_cutoff=lambda_cutoff, low_omega=low_omega, high_omega=high_omega, saving_path=saving_path, attenuation=attenuation)


Laplacian Pyramids Generation: 100%|##########| 405/405 [00:11<00:00, 35.10it/s]
Laplacian Pyramids Filtering: 100%|##########| 404/404 [00:09<00:00, 44.11it/s]
Video Reconstruction: 100%|##########| 405/405 [00:25<00:00, 15.78it/s]
OpenCV: FFMPEG: tag 0x47504a4d/'MJPG' is not supported with codec id 7 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x7634706d/'mp4v'
Saving Video: 100%|##########| 405/405 [00:05<00:00, 68.83it/s]


##Motion Extraction

In [8]:
import cv2
import numpy as np
import tqdm
def load_video(video_path):
    """Loads a video from the given path and converts each frame to grayscale."""
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error: Could not open video.")
        return []
    frames = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        frames.append(gray_frame)
    cap.release()
    return frames

def motion_extraction(frames, shift=1):
    """Extracts motion between frames separated by a given shift."""
    if not frames:
        print("No frames available for motion extraction.")
        return []
    motion_frames = []
    for i in range(len(frames) - shift):
        diff = cv2.absdiff(frames[i + shift], frames[i])
        motion_frames.append(diff)
    return motion_frames

def enhance_motion(frames, kernel_size=5, apply_glow=False):
    """Applies a Gaussian blur to enhance the motion frames."""
    if not frames:
        print("No motion frames available for enhancement.")
        return []
    enhanced_frames = []
    for frame in frames:
        if apply_glow:
            glow = cv2.GaussianBlur(frame, (kernel_size, kernel_size), 0)
            frame = cv2.addWeighted(frame, 0.5, glow, 0.5, 0)
        else:
            frame = cv2.GaussianBlur(frame, (kernel_size, kernel_size), 0)
        enhanced_frames.append(frame)
    return enhanced_frames

def display_frames(frames,path):

    height, width = frames[0].shape
    fourcc = cv2.VideoWriter_fourcc(*'MJPG')
    writer = cv2.VideoWriter(path, fourcc, 60.0, (width, height))

    for i in tqdm.tqdm(range(len(frames)), ascii=True, desc="Saving Video"):
        # Convert grayscale frame to 3-channel (BGR) image
        frame_bgr = cv2.cvtColor(frames[i], cv2.COLOR_GRAY2BGR)
        writer.write(frame_bgr)

    writer.release()

# Main execution block
video_path = "/home/maharathy1/MTP/CV_Project/results/magnified_birds_optimal.mp4"
saving_path = "/home/maharathy1/MTP/CV_Project/me_results/birds_final.mp4"
frames = load_video(video_path)
if frames:
    motion_frames = motion_extraction(frames, shift=5)
    enhanced_frames = enhance_motion(motion_frames, apply_glow=True)
    display_frames(enhanced_frames,saving_path)
else:
    print("Failed to process video.")


OpenCV: FFMPEG: tag 0x47504a4d/'MJPG' is not supported with codec id 7 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x7634706d/'mp4v'
Saving Video: 100%|##########| 400/400 [00:03<00:00, 105.45it/s]
