## Imports

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from tqdm import tqdm
import time
import numpy as np
from torchvision.transforms import Compose, Resize, ToTensor
from torch.utils.data import Dataset, DataLoader
import os
from PIL import Image
import matplotlib.pyplot as plt
import sys
from torchvision.transforms import functional as TF
import torchvision.transforms as transforms
import random
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts
import scipy.io as sio
import cv2
import cudacanvas
import torch.nn.functional as F

## Bicubic Plus Plus Model

In [2]:
class Bicubic_plus_plus(nn.Module):
    """
    Bicuic Plus Plus model. Adapted from Aselsan Researach group.
    - Pretrained weights from their github repository.
    - https://github.com/aselsan-research-imaging-team/bicubic-plusplus 
    """
    def __init__(self, sr_rate=3):
        super(Bicubic_plus_plus, self).__init__()
        self.conv0 = nn.Conv2d(3, 32, kernel_size=3, stride=2, padding=1, bias=False)
        self.conv1 = nn.Conv2d(32, 32, kernel_size=3, padding=1, bias=False)
        self.conv2 = nn.Conv2d(32, 32, kernel_size=3, padding=1, bias=False)
        self.conv_out = nn.Conv2d(32, (2*sr_rate)**2 * 3, kernel_size=3, padding=1, bias=False)
        self.Depth2Space = nn.PixelShuffle(2*sr_rate)
        self.act = nn.LeakyReLU(inplace=True, negative_slope=0.1)

    def forward(self, x):
        x0 = self.conv0(x)
        x0 = self.act(x0)
        x1 = self.conv1(x0)
        x1 = self.act(x1)
        x2 = self.conv2(x1)
        x2 = self.act(x2) + x0
        y = self.conv_out(x2)
        y = self.Depth2Space(y)
        return y

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = Bicubic_plus_plus().to(device)
model.load_state_dict(torch.load('weights/bicubic_pp_x3.pth'))
model.eval()
# Print number of parameters

Bicubic_plus_plus(
  (conv0): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
  (conv1): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (conv2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (conv_out): Conv2d(32, 108, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (Depth2Space): PixelShuffle(upscale_factor=6)
  (act): LeakyReLU(negative_slope=0.1, inplace=True)
)

## Handle Data

In [3]:
# Define transform to be applied to frames
# Currently only transforms.ToTensor()
transform = transforms.Compose([
    transforms.ToTensor(),
])

def upscale_video(video_path, model, transform = None, out_video_path = None, evaluate_mode = False):
    """
    Upscale a video 3x by using bicubic plus plus model
    - video_path: path to the video
    - model: bicubic plus plus model
    - transform: transform to be applied to frames
    - out_video_path: path to the output video (only if evaluate_mode is True)
    - evaluate_mode: if True, model is used to write the video to storage so that it can be evaluated.
    - evaluate mode is slow due to GPu -> CPU transfer overhead

    - Output: Upscaled Video feed or File storage if in evaluate mode 
    """
    # Set CUDA device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # Load video file
    cap = cv2.VideoCapture(video_path)

    # Get video properties
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_time = 1.0 / fps
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    # Define codec and VideoWriter object
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    output_video = cv2.VideoWriter(out_video_path, fourcc, fps, (frame_width*3, frame_height*3))

    # Set up cudacanvas window for renders
    white_screen = torch.ones((3, frame_height*3, frame_width*3)).to(device)
    cudacanvas.set_image(white_screen)
    cudacanvas.create_window()
    
    times = []  # List to store time taken to process each frame
    prev_time = time.time()
    # Initialize variables for moving average calculation
    moving_avg_duration = 0
    alpha = 0.3  # Smoothing factor for moving average, adjust as needed
    frame_time = 1.0 / fps

    # Process each frame
    while cap.isOpened():
        # Read frame
        ret, frame = cap.read()
        if not ret:
            break
        
        # Convert frame to RGB and apply transform
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame_tensor = transform(frame_rgb).unsqueeze(0).to(device)

        # Perform super-resolution on the frame
        with torch.no_grad():
            upscaled_frame_tensor = model(frame_tensor)
            upscaled_frame_tensor = torch.clamp(upscaled_frame_tensor, 0, 1)
        
        # Write the upscaled frame to output video file if in evaluate mode
        if evaluate_mode:
            # Convert tensor back to numpy array
            upscaled_frame = (upscaled_frame_tensor.squeeze().cpu().numpy().transpose(1, 2, 0) * 255).astype(np.uint8)
            upscaled_frame = cv2.cvtColor(upscaled_frame, cv2.COLOR_RGB2BGR)
            # # Write upscaled frame to output video
            output_video.write(upscaled_frame)
        
        # If no evaluation, just output using cudacanvas
        else:
            start_time = time.time()
            cudacanvas.render()
            cudacanvas.set_image(upscaled_frame_tensor.squeeze())
            render_time = time.time() - start_time
            # Update moving average of frame processing time
            moving_avg_duration = (alpha * render_time) + ((1 - alpha) * moving_avg_duration)
            # Calculate time since last frame was processed
            curr_time = time.time()
            time_since_last_frame = curr_time - prev_time
            # Calculate sleep time based on desired FPS and moving average
            time_to_wait = frame_time - time_since_last_frame
            # Calculate sleep time to adjust to target frame time
            sleep_time = max(time_to_wait - moving_avg_duration, 0)
            time.sleep(sleep_time)
            prev_time = time.time()
            times.append(prev_time - start_time)
            if cudacanvas.should_close() or not ret:
                break

    total_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)  # Total number of frames in the video
    total_time = sum(times)  # Total processing time, assuming 'times' list is populated per frame as shown previously

    # Calculate and print the rendering FPS
    rendering_fps = total_frames / total_time
    print(f"Rendering FPS: {rendering_fps}")
    print(f"Original FPS: {fps}")

    # Calculate and print the video length in seconds
    video_length = total_frames / fps  # Original video's FPS used for length calculation
    print(f"Video Length: {video_length} seconds")
    print(f"Original Video Length: {total_frames / fps} seconds")

    # Release resources
    cap.release()
    output_video.release()
    cv2.destroyAllWindows()

# testing
# upscale_video('test_videos/rabbit.mp4', model, transform = transform, out_video_path="output_video.mp4", evaluate_mode = True)
upscale_video('test_videos/rabbit.mp4', model, transform = transform, evaluate_mode = False)

Rendering FPS: 22.389583153137497
Original FPS: 25.0
Video Length: 15.16 seconds
Original Video Length: 15.16 seconds


## Performance Metrics

In [None]:
import torch
from torchvision.transforms.functional import to_tensor, to_pil_image
import cv2
import numpy as np
from skimage.metrics import peak_signal_noise_ratio as compare_psnr
from skimage.metrics import structural_similarity as compare_ssim
from PIL import Image

def evaluate_upscaling_performance(input_video_path, ground_truth_video_path, model, transform=None, device=torch.device('cuda')):
    input_cap = cv2.VideoCapture(input_video_path)
    gt_cap = cv2.VideoCapture(ground_truth_video_path)
    
    psnr_values = []
    ssim_values = []
    
    while True:
        ret_input, input_frame = input_cap.read()
        ret_gt, gt_frame = gt_cap.read()
        
        if not ret_input or not ret_gt:
            break

        input_frame_rgb = cv2.cvtColor(input_frame, cv2.COLOR_BGR2RGB)
        gt_frame_rgb = cv2.cvtColor(gt_frame, cv2.COLOR_BGR2RGB)
        
        if transform:
            input_frame_tensor = transform(input_frame_rgb).unsqueeze(0).to(device)
        else:
            input_frame_tensor = to_tensor(input_frame_rgb).unsqueeze(0).to(device)

        with torch.no_grad():
            upscaled_frame = model(input_frame_tensor)
            upscaled_frame = upscaled_frame.squeeze(0).cpu()

        upscaled_frame_pil = to_pil_image(upscaled_frame)
        upscaled_frame_resized_pil = upscaled_frame_pil.resize((gt_frame_rgb.shape[1], gt_frame_rgb.shape[0]), Image.BICUBIC)
        upscaled_frame_resized = np.array(upscaled_frame_resized_pil)

        frame_psnr = compare_psnr(gt_frame_rgb, upscaled_frame_resized, data_range=255)

        # Ensure win_size is valid and does not exceed the image dimensions
        win_size = 3  # Minimum viable window size for SSIM that still allows for a meaningful comparison
        
        frame_ssim = compare_ssim(gt_frame_rgb, upscaled_frame_resized, multichannel=True, data_range=255, win_size=win_size)
        
        psnr_values.append(frame_psnr)
        ssim_values.append(frame_ssim)

    input_cap.release()
    gt_cap.release()

    avg_psnr = np.mean(psnr_values)
    avg_ssim = np.mean(ssim_values)
    print(f"Average PSNR: {avg_psnr:.2f} dB")
    print(f"Average SSIM: {avg_ssim:.4f}")

In [None]:
# evaluate_upscaling_performance('path_to_low_res_video.mp4', 'path_to_high_res_video.mp4', model, transform=None, device=torch.device('cuda'))
evaluate_upscaling_performance('test_videos\production720.mp4', 'test_videos\production2160.mp4', model, transform=None, device=torch.device('cuda'))

## Calculate average FPS for 1000 frames

In [4]:
noise = torch.randn(1, 3, 720, 1280).to(device)
times = []
for i in range(1000):
  torch.cuda.synchronize()
  start = time.time()
  with torch.no_grad():
    pred = model(noise)
    pred = torch.clamp(pred, 0, 1)
  torch.cuda.synchronize()
  end = time.time() - start
  times.append(end)

# plt.imshow(pred.squeeze().cpu().numpy().transpose(1, 2, 0))
avg_time = np.mean(times)

print("Input frame size =", noise.shape)
print("Output frame size =", pred.shape)
print("Average Time per frame =", 1000*avg_time, "ms")
print("Average FPS =", 1/avg_time, "FPS")

Input frame size = torch.Size([1, 3, 720, 1280])
Output frame size = torch.Size([1, 3, 2160, 3840])
Average Time per frame = 4.914951801300049 ms
Average FPS = 203.46079482111932 FPS
