In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import time
import numpy as np

import cv2

# ---------------------------
# My Models: SRModel (with Residual Blocks)
# ---------------------------
class ResidualBlock(nn.Module):
    def __init__(self, in_channels):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, in_channels, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(in_channels, in_channels, kernel_size=3, padding=1)
        self.relu = nn.ReLU(inplace=True)
        
    def forward(self, x):
        residual = x
        x = self.relu(self.conv1(x))
        x = self.conv2(x)
        return x + residual

class SRModel(nn.Module):
    def __init__(self, in_channels, out_channels, features, num_res_blocks, upscale_factor):
        super().__init__()
        # Initial feature extraction
        self.conv1 = nn.Conv2d(in_channels, features, kernel_size=3, padding=1)
        # Residual blocks
        self.res_blocks = nn.Sequential(*[ResidualBlock(features) for _ in range(num_res_blocks)])
        # Final upscaling (using pixel shuffle)
        self.conv2 = nn.Conv2d(features, out_channels * (upscale_factor ** 2), kernel_size=3, padding=1)
        self.pixel_shuffle = nn.PixelShuffle(upscale_factor)
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.res_blocks(x)
        x = self.conv2(x)
        x = self.pixel_shuffle(x)
        return x

# ---------------------------
# Dummy SRCNN model (based on [17])
# ---------------------------
class DummySRCNN(nn.Module):
    def __init__(self, upscale_factor=4):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 64, kernel_size=9, stride=1, padding=4)
        self.conv2 = nn.Conv2d(64, 32, kernel_size=5, stride=1, padding=2)
        self.conv3 = nn.Conv2d(32, 1, kernel_size=5, stride=1, padding=2)
    
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = self.conv3(x)
        return x

# ---------------------------
# Optical Flow based Motion Compensation Module
# ---------------------------
class OpticalFlowMotionCompensation(nn.Module):
    def __init__(self, k=0.125):
        """
        k: constant used in the adaptive weighting, as in r = exp(-k * error)
        """
        super().__init__()
        self.k = k

    def forward(self, neighbor, center):
        """
        neighbor, center: torch tensors of shape (B, 1, H, W) with type float32.
        For each image in the batch, compute the optical flow from the neighbor to center using Farneback,
        warp the neighbor frame, compute the per-pixel error and then apply adaptive motion compensation:
        
          y_amc(i,j) = (1 - r(i,j)) * y_center(i,j) + r(i,j) * y_warped(i,j),
          
        where r(i,j) = exp(-k * |y_center(i,j) - y_warped(i,j)|).
        """
        B, C, H, W = neighbor.shape
        compensated_list = []
        # Process each image in the batch individually.
        for i in range(B):
            # Convert tensors to NumPy arrays.
            neigh_np = neighbor[i, 0].detach().cpu().numpy().astype(np.float32)
            cent_np = center[i, 0].detach().cpu().numpy().astype(np.float32)
            
            # Compute optical flow using Farneback.
            # Note: In a full implementation you might tune these parameters further.
            flow = cv2.calcOpticalFlowFarneback(neigh_np, cent_np, None,
                                                pyr_scale=0.5, levels=3, winsize=15,
                                                iterations=3, poly_n=5, poly_sigma=1.2, flags=0)
            # Create remap coordinates.
            grid_x, grid_y = np.meshgrid(np.arange(W), np.arange(H))
            map_x = (grid_x + flow[..., 0]).astype(np.float32)
            map_y = (grid_y + flow[..., 1]).astype(np.float32)
            
            # Warp the neighbor frame using the flow field.
            warped = cv2.remap(neigh_np, map_x, map_y, interpolation=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT)
            
            # Compute per-pixel error.
            error = np.abs(cent_np - warped)
            r = np.exp(-self.k * error)
            
            # Adaptive motion compensation: weighted average between center and warped neighbor.
            compensated = (1 - r) * cent_np + r * warped
            
            # Convert back to torch tensor.
            compensated_tensor = torch.from_numpy(compensated).unsqueeze(0).unsqueeze(0)
            compensated_list.append(compensated_tensor)
        compensated_tensor = torch.cat(compensated_list, dim=0).to(neighbor.device)
        return compensated_tensor

# ---------------------------
# VSRnet models
# ---------------------------
# VSRnet without motion compensation (as before)
class DummyVSRNet(nn.Module):
    def __init__(self, upscale_factor=4):
        super().__init__()
        # Separate convolutional layers for each frame.
        self.conv1_f0 = nn.Conv2d(1, 64, kernel_size=9, stride=1, padding=4)
        self.conv1_f1 = nn.Conv2d(1, 64, kernel_size=9, stride=1, padding=4)
        self.conv1_f2 = nn.Conv2d(1, 64, kernel_size=9, stride=1, padding=4)
        # For simplicity re-use conv1_f1 and conv1_f0 for frames 3 and 4.
        self.conv2 = nn.Conv2d(320, 32, kernel_size=5, stride=1, padding=2)
        self.conv3 = nn.Conv2d(32, 1, kernel_size=5, stride=1, padding=2)
        self.upscale_factor = upscale_factor
    
    def forward(self, x):
        # x shape: (batch, 5, H, W)
        h0 = self.conv1_f0(x[:, 0:1, :, :])
        h1 = self.conv1_f1(x[:, 1:2, :, :])
        h2 = self.conv1_f2(x[:, 2:3, :, :])
        h3 = self.conv1_f1(x[:, 3:4, :, :])
        h4 = self.conv1_f0(x[:, 4:5, :, :])
        x_cat = torch.cat((h0, h1, h2, h3, h4), dim=1)
        x_cat = F.relu(x_cat)
        x_cat = F.relu(self.conv2(x_cat))
        x_cat = self.conv3(x_cat)
        return x_cat

# VSRnet with motion compensation using optical flow.
class DummyVSRNetMCOptFlow(nn.Module):
    def __init__(self, upscale_factor=4):
        super().__init__()
        # Instantiate the optical flow based motion compensation module.
        self.mc = OpticalFlowMotionCompensation(k=0.125)
        # Convolution layers for each frame.
        self.conv1_f0 = nn.Conv2d(1, 64, kernel_size=9, stride=1, padding=4)
        self.conv1_f1 = nn.Conv2d(1, 64, kernel_size=9, stride=1, padding=4)
        self.conv1_f2 = nn.Conv2d(1, 64, kernel_size=9, stride=1, padding=4)
        self.conv2 = nn.Conv2d(320, 32, kernel_size=5, stride=1, padding=2)
        self.conv3 = nn.Conv2d(32, 1, kernel_size=5, stride=1, padding=2)
        self.upscale_factor = upscale_factor

    def forward(self, x):
        # x shape: (batch, 5, H, W)
        # Use the center frame (index 2) as reference.
        center = x[:, 2:3, :, :]
        # Apply optical flow based motion compensation to each neighboring frame.
        x0 = self.mc(x[:, 0:1, :, :], center)
        x1 = self.mc(x[:, 1:2, :, :], center)
        # The center frame remains unchanged.
        x2 = center
        x3 = self.mc(x[:, 3:4, :, :], center)
        x4 = self.mc(x[:, 4:5, :, :], center)
        
        h0 = self.conv1_f0(x0)
        h1 = self.conv1_f1(x1)
        h2 = self.conv1_f2(x2)
        h3 = self.conv1_f1(x3)  # re-use conv1_f1 for simplicity
        h4 = self.conv1_f0(x4)  # re-use conv1_f0 for simplicity
        
        x_cat = torch.cat((h0, h1, h2, h3, h4), dim=1)
        x_cat = F.relu(x_cat)
        x_cat = F.relu(self.conv2(x_cat))
        x_cat = self.conv3(x_cat)
        return x_cat

# ---------------------------
# Timing utility function
# ---------------------------
def measure_time(model, input_tensor, num_runs=10):
    # Warm-up runs (without logging time)
    with torch.no_grad():
        for _ in range(3):
            _ = model(input_tensor)
    # Measure forward pass time
    start = time.time()
    with torch.no_grad():
        for _ in range(num_runs):
            _ = model(input_tensor)
    total_time = time.time() - start
    avg_time = total_time / num_runs
    return avg_time

# ---------------------------
# Main routine
# ---------------------------
def main():
    # set to cuda if available, else use cpu
    if torch.cuda.is_available():
        device = torch.device('cpu')
    else:
        device = torch.device('cuda')
    print(f"Using device: {device}")
    
    # Instantiate models (using random weights)
    model_small = SRModel(in_channels=9, out_channels=3, features=64, num_res_blocks=5, upscale_factor=4).to(device)
    model_large = SRModel(in_channels=9, out_channels=3, features=128, num_res_blocks=10, upscale_factor=4).to(device)
    srcnn = DummySRCNN(upscale_factor=4).to(device)
    vsrnet_no_mc = DummyVSRNet(upscale_factor=4).to(device)
    vsrnet_mc = DummyVSRNetMCOptFlow(upscale_factor=4).to(device)
    
    # Create dummy inputs:
    # For SR models: input with 9 channels (3 frames × 3 channels) at 120×214 resolution.
    input_sr = torch.randn(1, 9, 120, 214, device=device)
    # For SRCNN: single-channel input (e.g., luminance)
    input_srcnn = torch.randn(1, 1, 120, 214, device=device)
    # For VSRnet models: 5 frames (single channel each)
    input_vsr = torch.randn(1, 5, 120, 214, device=device)
    
    # Number of forward passes to average (reduced here since optical flow is more expensive)
    num_runs = 10
    time_small = measure_time(model_small, input_sr, num_runs)
    time_large = measure_time(model_large, input_sr, num_runs)
    time_srcnn = measure_time(srcnn, input_srcnn, num_runs)
    time_vsr_no_mc = measure_time(vsrnet_no_mc, input_vsr, num_runs)
    time_vsr_mc = measure_time(vsrnet_mc, input_vsr, num_runs)
    
    # Log the average forward pass times (in milliseconds)
    print(f"Average forward pass time on CPU over {num_runs} runs:")
    print(f"  SR Model (5 blocks, 64 features): {time_small * 1000:.3f} ms")
    print(f"  SR Model (10 blocks, 128 features): {time_large * 1000:.3f} ms")
    print(f"  SRCNN: {time_srcnn * 1000:.3f} ms")
    print(f"  VSRnet without MC: {time_vsr_no_mc * 1000:.3f} ms")
    print(f"  VSRnet with optical flow MC: {time_vsr_mc * 1000:.3f} ms")

if __name__ == '__main__':
    main()


Using device: cpu
Average forward pass time on CPU over 10 runs:
  SR Model (5 blocks, 64 features): 144.865 ms
  SR Model (10 blocks, 128 features): 907.161 ms
  SRCNN: 25.320 ms
  VSRnet without MC: 99.414 ms
  VSRnet with optical flow MC: 135.218 ms


In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import time
import numpy as np
import cv2

# ---------------------------
# Residual Block Definition
# ---------------------------
class ResidualBlock(nn.Module):
    def __init__(self, in_channels):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, in_channels, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(in_channels, in_channels, kernel_size=3, padding=1)
        self.relu = nn.ReLU(inplace=True)
        
    def forward(self, x):
        residual = x
        x = self.relu(self.conv1(x))
        x = self.conv2(x)
        return x + residual

# ---------------------------
# SRModel (with Residual Blocks)
# ---------------------------
class SRModel(nn.Module):
    def __init__(self, in_channels, out_channels, features, num_res_blocks, upscale_factor):
        super().__init__()
        # Initial feature extraction
        self.conv1 = nn.Conv2d(in_channels, features, kernel_size=3, padding=1)
        # Residual blocks
        self.res_blocks = nn.Sequential(*[ResidualBlock(features) for _ in range(num_res_blocks)])
        # Final upscaling (using pixel shuffle)
        self.conv2 = nn.Conv2d(features, out_channels * (upscale_factor ** 2), kernel_size=3, padding=1)
        self.pixel_shuffle = nn.PixelShuffle(upscale_factor)
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.res_blocks(x)
        x = self.conv2(x)
        x = self.pixel_shuffle(x)
        return x

# ---------------------------
# Dummy SRCNN model (based on [17])
# ---------------------------
class DummySRCNN(nn.Module):
    def __init__(self, upscale_factor=4):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 64, kernel_size=9, stride=1, padding=4)
        self.conv2 = nn.Conv2d(64, 32, kernel_size=5, stride=1, padding=2)
        self.conv3 = nn.Conv2d(32, 1, kernel_size=5, stride=1, padding=2)
    
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = self.conv3(x)
        return x

# ---------------------------
# Optical Flow based Motion Compensation Module
# ---------------------------
class OpticalFlowMotionCompensation(nn.Module):
    def __init__(self, k=0.125):
        """
        k: constant used in the adaptive weighting, as in r = exp(-k * error)
        """
        super().__init__()
        self.k = k

    def forward(self, neighbor, center):
        """
        neighbor, center: torch tensors of shape (B, 1, H, W) with type float32.
        For each image in the batch, compute the optical flow from the neighbor to center using Farneback,
        warp the neighbor frame, compute the per-pixel error and then apply adaptive motion compensation:
        
          y_amc(i,j) = (1 - r(i,j)) * y_center(i,j) + r(i,j) * y_warped(i,j),
          
        where r(i,j) = exp(-k * |y_center(i,j) - y_warped(i,j)|).
        """
        B, C, H, W = neighbor.shape
        compensated_list = []
        # Process each image in the batch individually.
        for i in range(B):
            # Convert tensors to NumPy arrays.
            neigh_np = neighbor[i, 0].detach().cpu().numpy().astype(np.float32)
            cent_np = center[i, 0].detach().cpu().numpy().astype(np.float32)
            
            # Compute optical flow using Farneback.
            flow = cv2.calcOpticalFlowFarneback(neigh_np, cent_np, None,
                                                pyr_scale=0.5, levels=3, winsize=15,
                                                iterations=3, poly_n=5, poly_sigma=1.2, flags=0)
            # Create remap coordinates.
            grid_x, grid_y = np.meshgrid(np.arange(W), np.arange(H))
            map_x = (grid_x + flow[..., 0]).astype(np.float32)
            map_y = (grid_y + flow[..., 1]).astype(np.float32)
            
            # Warp the neighbor frame using the flow field.
            warped = cv2.remap(neigh_np, map_x, map_y, interpolation=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT)
            
            # Compute per-pixel error.
            error = np.abs(cent_np - warped)
            r = np.exp(-self.k * error)
            
            # Adaptive motion compensation: weighted average between center and warped neighbor.
            compensated = (1 - r) * cent_np + r * warped
            
            # Convert back to torch tensor.
            compensated_tensor = torch.from_numpy(compensated).unsqueeze(0).unsqueeze(0)
            compensated_list.append(compensated_tensor)
        compensated_tensor = torch.cat(compensated_list, dim=0).to(neighbor.device)
        return compensated_tensor

# ---------------------------
# VSRnet models
# ---------------------------
# VSRnet without motion compensation
class DummyVSRNet(nn.Module):
    def __init__(self, upscale_factor=4):
        super().__init__()
        # Separate convolutional layers for each frame.
        self.conv1_f0 = nn.Conv2d(1, 64, kernel_size=9, stride=1, padding=4)
        self.conv1_f1 = nn.Conv2d(1, 64, kernel_size=9, stride=1, padding=4)
        self.conv1_f2 = nn.Conv2d(1, 64, kernel_size=9, stride=1, padding=4)
        # For simplicity re-use conv1_f1 and conv1_f0 for frames 3 and 4.
        self.conv2 = nn.Conv2d(320, 32, kernel_size=5, stride=1, padding=2)
        self.conv3 = nn.Conv2d(32, 1, kernel_size=5, stride=1, padding=2)
        self.upscale_factor = upscale_factor
    
    def forward(self, x):
        # x shape: (batch, 5, H, W)
        h0 = self.conv1_f0(x[:, 0:1, :, :])
        h1 = self.conv1_f1(x[:, 1:2, :, :])
        h2 = self.conv1_f2(x[:, 2:3, :, :])
        h3 = self.conv1_f1(x[:, 3:4, :, :])
        h4 = self.conv1_f0(x[:, 4:5, :, :])
        x_cat = torch.cat((h0, h1, h2, h3, h4), dim=1)
        x_cat = F.relu(x_cat)
        x_cat = F.relu(self.conv2(x_cat))
        x_cat = self.conv3(x_cat)
        return x_cat

# VSRnet with motion compensation using optical flow.
class DummyVSRNetMCOptFlow(nn.Module):
    def __init__(self, upscale_factor=4):
        super().__init__()
        # Instantiate the optical flow based motion compensation module.
        self.mc = OpticalFlowMotionCompensation(k=0.125)
        # Convolution layers for each frame.
        self.conv1_f0 = nn.Conv2d(1, 64, kernel_size=9, stride=1, padding=4)
        self.conv1_f1 = nn.Conv2d(1, 64, kernel_size=9, stride=1, padding=4)
        self.conv1_f2 = nn.Conv2d(1, 64, kernel_size=9, stride=1, padding=4)
        self.conv2 = nn.Conv2d(320, 32, kernel_size=5, stride=1, padding=2)
        self.conv3 = nn.Conv2d(32, 1, kernel_size=5, stride=1, padding=2)
        self.upscale_factor = upscale_factor

    def forward(self, x):
        # x shape: (batch, 5, H, W)
        # Use the center frame (index 2) as reference.
        center = x[:, 2:3, :, :]
        # Apply optical flow based motion compensation to each neighboring frame.
        x0 = self.mc(x[:, 0:1, :, :], center)
        x1 = self.mc(x[:, 1:2, :, :], center)
        # The center frame remains unchanged.
        x2 = center
        x3 = self.mc(x[:, 3:4, :, :], center)
        x4 = self.mc(x[:, 4:5, :, :], center)
        
        h0 = self.conv1_f0(x0)
        h1 = self.conv1_f1(x1)
        h2 = self.conv1_f2(x2)
        h3 = self.conv1_f1(x3)  # re-use conv1_f1 for simplicity
        h4 = self.conv1_f0(x4)  # re-use conv1_f0 for simplicity
        
        x_cat = torch.cat((h0, h1, h2, h3, h4), dim=1)
        x_cat = F.relu(x_cat)
        x_cat = F.relu(self.conv2(x_cat))
        x_cat = self.conv3(x_cat)
        return x_cat

# ---------------------------
# New Two-Stage SR Model
# ---------------------------
# The base model (first stage) is the same as SRModel above.
# New refinement network (second stage) to boost high-frequency details.
class RefinementNet(nn.Module):
    def __init__(self, channels=3, features=64, num_res_blocks=3):
        super(RefinementNet, self).__init__()
        self.conv_in = nn.Conv2d(channels, features, kernel_size=3, padding=1)
        # A few residual blocks for texture refinement
        res_blocks = [ResidualBlock(features) for _ in range(num_res_blocks)]
        self.res_blocks = nn.Sequential(*res_blocks)
        self.conv_out = nn.Conv2d(features, channels, kernel_size=3, padding=1)
        
    def forward(self, x):
        out = F.relu(self.conv_in(x))
        out = self.res_blocks(out)
        out = self.conv_out(out)
        # Residual connection: refine rather than re-predict entirely.
        return x + out

# Composite model that feeds the output of the base model into the refinement network.
class TwoStageSRModel(nn.Module):
    def __init__(self, base_model, refinement_model):
        super(TwoStageSRModel, self).__init__()
        self.base_model = base_model
        self.refinement_model = refinement_model
        
    def forward(self, x):
        base_output = self.base_model(x)
        refined_output = self.refinement_model(base_output)
        return refined_output

# ---------------------------
# Timing utility function
# ---------------------------
def measure_time(model, input_tensor, num_runs=10):
    # Warm-up runs (without logging time)
    with torch.no_grad():
        for _ in range(3):
            _ = model(input_tensor)
    # Measure forward pass time
    start = time.time()
    with torch.no_grad():
        for _ in range(num_runs):
            _ = model(input_tensor)
    total_time = time.time() - start
    avg_time = total_time / num_runs
    return avg_time

# ---------------------------
# Main routine
# ---------------------------
def main():
    # set to cuda if available, else use cpu (note: this example uses cpu if cuda is available)
    if torch.cuda.is_available():
        device = torch.device('cuda')
    else:
        device = torch.device('cpu')
    print(f"Using device: {device}")
    
    # Instantiate models (using random weights)
    model_small = SRModel(in_channels=9, out_channels=3, features=64, num_res_blocks=5, upscale_factor=4).to(device)
    model_large = SRModel(in_channels=9, out_channels=3, features=128, num_res_blocks=10, upscale_factor=4).to(device)
    srcnn = DummySRCNN(upscale_factor=4).to(device)
    vsrnet_no_mc = DummyVSRNet(upscale_factor=4).to(device)
    vsrnet_mc = DummyVSRNetMCOptFlow(upscale_factor=4).to(device)
    
    # Instantiate the new two-stage model:
    # Base model: same as SRModel (5 blocks, 64 features)
    base_model = SRModel(in_channels=9, out_channels=3, features=64, num_res_blocks=5, upscale_factor=4).to(device)
    # Refinement network: 3 residual blocks for texture refinement.
    refinement_net = RefinementNet(channels=3, features=64, num_res_blocks=3).to(device)
    two_stage = TwoStageSRModel(base_model, refinement_net).to(device)
    
    # Create dummy inputs:
    # For SR models and two-stage model: input with 9 channels (3 frames × 3 channels) at 120×214 resolution.
    input_sr = torch.randn(1, 9, 120, 214, device=device)
    # For SRCNN: single-channel input (e.g., luminance)
    input_srcnn = torch.randn(1, 1, 120, 214, device=device)
    # For VSRnet models: 5 frames (single channel each)
    input_vsr = torch.randn(1, 5, 120, 214, device=device)
    
    # Number of forward passes to average
    num_runs = 50
    time_small = measure_time(model_small, input_sr, num_runs)
    time_large = measure_time(model_large, input_sr, num_runs)
    time_srcnn = measure_time(srcnn, input_srcnn, num_runs)
    time_vsr_no_mc = measure_time(vsrnet_no_mc, input_vsr, num_runs)
    time_vsr_mc = measure_time(vsrnet_mc, input_vsr, num_runs)
    time_two_stage = measure_time(two_stage, input_sr, num_runs)
    
    # Log the average forward pass times (in milliseconds)
    print(f"Average forward pass time on CPU over {num_runs} runs:")
    print(f"  SR Model (5 blocks, 64 features): {time_small * 1000:.3f} ms")
    print(f"  SR Model (10 blocks, 128 features): {time_large * 1000:.3f} ms")
    print(f"  SRCNN: {time_srcnn * 1000:.3f} ms")
    print(f"  VSRnet without MC: {time_vsr_no_mc * 1000:.3f} ms")
    print(f"  VSRnet with optical flow MC: {time_vsr_mc * 1000:.3f} ms")
    print(f"  Two-stage SR Model (Base + Refinement): {time_two_stage * 1000:.3f} ms")

if __name__ == '__main__':
    main()


Using device: cuda
Average forward pass time on CPU over 50 runs:
  SR Model (5 blocks, 64 features): 8.879 ms
  SR Model (10 blocks, 128 features): 41.924 ms
  SRCNN: 5.209 ms
  VSRnet without MC: 486.388 ms
  VSRnet with optical flow MC: 1412.317 ms
  Two-stage SR Model (Base + Refinement): 94.388 ms
