# Project 1 Design Tradeoffs in Digital Systems

##### Author: @BrianQJN qujianning0401@163.com brian.qu@mail.utoronto.ca 
##### Date: 2023-10-14 
##### Version: 1.0 

### E1 Chroma up-sampling, YUV pixel manipulation, YUV-RGB CSC

#### 1.a Read the YUV 4:2:0 video sequence(s), and upscale it to 4:4:4

In [1]:
import numpy as np

def read_yuv420_video(yuv_file_path: str, width: int, height: int, num_frames: int) -> (list, list, list):
    """
    Read YUV420 video file, and return pixel data of the three components Y, U, and V.
    Args:
        yuv_file_path (str): Path of the YUV420 video file.
        width (int): Width of the video.
        height (int): Height of the video.
        num_frames (int): Number of frames to read.
    Returns:
        y_data (list): A list of pixel data for the brightness (Y) component
        cb_data (list): The list of pixel data for the chroma (Cb) component
        cr_data (list): The list of pixel data for the chroma (Cr) component
    """
    # initialize the three lists of pixel data
    y_data = []
    cb_data = []
    cr_data = []

    with open(yuv_file_path, 'rb') as file:
        for _ in range(num_frames):
            # read the Y component
            y_frame = np.fromfile(file, dtype=np.uint8, count=width * height).reshape((height, width))
            y_data.append(y_frame)

            # read the U component
            cb_frame = np.fromfile(file, dtype=np.uint8, count=(width // 2) * (height // 2)).reshape((height // 2, width // 2))
            cb_data.append(cb_frame)

            # read the V component
            cr_frame = np.fromfile(file, dtype=np.uint8, count=(width // 2) * (height // 2)).reshape((height // 2, width // 2))
            cr_data.append(cr_frame)

    file.close()

    return y_data, cb_data, cr_data


def upscale_420_to_444(y_data: list, cb_data: list, cr_data: list) -> list:
    """
    Scale 4:2:0 pixel data to 4:4:4 pixel data.
    Args:
        y_data (list): A list of pixel data for the brightness (Y) component
        cb_data (list): The list of pixel data for the chroma (Cb) component
        cr_data (list): The list of pixel data for the chroma (Cr) component
    Returns:
        yuv444_data (list): A list of pixel data for the YUV444 video.
    """
    # initialize the list of pixel data for the YUV444 video
    yuv444_data = []

    for y_frame, cb_frame, cr_frame in zip(y_data, cb_data, cr_data):
        # upscale the U(cb) and V(cr) component, copy to fill
        cb_upsampled = np.repeat(np.repeat(cb_frame, 2, axis=0), 2, axis=1)
        cr_upsampled = np.repeat(np.repeat(cr_frame, 2, axis=0), 2, axis=1)

        # combine the Y, U, and V component
        yuv444_frame = np.dstack((y_frame, cb_upsampled, cr_upsampled))
        yuv444_data.append(yuv444_frame)

    return yuv444_data


def save_yuv444_video(yuv444_data: list, output_file: str) -> None:
    """
    Save the YUV444 video to a file.
    Args:
        yuv444_data (list): A list of pixel data for the YUV444 video.
        output_file (str): Path of the output file.
    """
    with open(output_file, 'wb') as file:
        for frame in yuv444_data:
            frame.tofile(file)

    file.close()

#### Test for 1.a

In [2]:
input_file = "videos/420/foreman_cif.yuv"
output_file = "videos/444/foreman_cif.yuv"
width = 352
height = 288
num_frames = 300

# read the YUV420 video
y_data, u_data, v_data = read_yuv420_video(input_file, width, height, num_frames)

# upscale the YUV420 video to YUV444
yuv444_data = upscale_420_to_444(y_data, u_data, v_data)

# save the YUV444 video
save_yuv444_video(yuv444_data, output_file)

#### 1.b Convert the YUV 4:4:4 video sequence(s) to RGB

In [11]:
from PIL import Image
import numpy as np

def yuv444_to_rgb(yuv444_data: list, csc_matrix: list) -> list:
    """
    Convert YUV 4:4:4 pixel data to RGB pixel data.
    Args:
        yuv444_data (list): 444yuv pixel data list
        csc_matrix (list): convert matrix
    Returns:
        rgb_data (list): RGB pixel data list
    """
    rgb_data = []

    for yuv_frame in yuv444_data:
        # convert 444yuv pixel data to matrix
        yuv_matrix = np.array(yuv_frame).reshape(-1, 3)

        # convert 444yuv pixel data to 444rgb pixel data
        rgb_matrix = np.dot(yuv_matrix - [16, 128, 128], np.array(csc_matrix).reshape(3, 3).T)

        # convert 444rgb pixel data to 444rgb frame data
        rgb_frame = np.clip(rgb_matrix, 0, 255).astype(np.uint8).reshape(yuv_frame.shape)

        # add rgb frame data to rgb data list
        rgb_data.append(rgb_frame)

    return rgb_data

def save_rgb_images(rgb_data: list, output_prefix: str, stop_frame=300) -> None:
    """
    Convert RGB pixel data to RGB images and save them as .png files.
    Args:
        rgb_data (list): rgb pixel data list
        output_prefix (str): png file prefix
    """
    for i, rgb_frame in enumerate(rgb_data):
        # create pillow image object
        img = Image.fromarray(rgb_frame)

        # save image
        img.save(f"images/{output_prefix}_{i:03d}.png")
        if i == stop_frame:
            break

#### Test for 1.b

In [12]:
csc_matrix = [1.164, 0, 1.596, 1.164, -0.392, -0.813, 1.164, 2.017, 0]

rgb_data = yuv444_to_rgb(yuv444_data, csc_matrix)

save_rgb_images(rgb_data, "images", 4)

### E2 Basic block-based operations on video content and quality assessment metrics

#### 2.a Read the Y-component of 4:2:0 video sequences, and dump it into corresponding Y-only files of each sequence

In [16]:
import numpy as np

def extract_y_component(yuv420_data: list) -> list:
    """
    Extract the Y component from the YUV420 pixel data.
    Args:
        yuv420_data (list): A list of pixel data for the YUV420 video.
    Returns:
        y_data (list): A list of pixel data for the brightness (Y) component
    """
    y_data = []

    for yuv_frame in yuv420_data:
        # extract the Y component
        y_frame = yuv_frame[:, :, 0]
        y_data.append(y_frame)

    return y_data


def save_y_only_files(y_data: list, output_prefix: str, stop_frames=300) -> None:    
    """
    Save the Y component to files.
    Args:
        y_data (list): A list of pixel data for the brightness (Y) component
        output_prefix (str): Path of the output file.
        stop_frames (int): Number of frames to save.
    """
    for i, y_frame in enumerate(y_data):
        # save the Y component to files
        output_file = f"y_only_files/{output_prefix}_{i:03d}.y"

        y_frame.tofile(output_file)

        if i == stop_frames:
            break

#### Test for 2.a

In [17]:
y_data = read_yuv420_video(input_file, width, height, num_frames)[0]
save_y_only_files(y_data, "foreman_cif", 4)

#### 2.b Read every Y-only file, and apply to it the following operation(s):
i. Split each frame into (𝑖 × 𝑖) blocks (where (𝑖) takes the values 2, 8, and 64)
ii. Use “padding” if the width and/or height of the frame is not divisible by (𝑖). Pad with gray (128).
If padding is necessary, padding pixels should be placed at the right and/or bottom of the frame.

In [18]:
import numpy as np

def read_y_only_file(file_path: str, width: int, height: int) -> np.array:
    """
    Read the Y component from a file. 
    Args:
        file_path (str): Path of the file.
        width (int): Width of the video.
        height (int): Height of the video.å
    Returns:
        y_frame (np.array): A numpy array of pixel data for the brightness (Y) component
    """   
    with open(file_path, 'rb') as file:
        # read the Y component
        y_frame = np.fromfile(file, dtype=np.uint8, count=width * height).reshape((height, width))
    
    file.close()
    
    return y_frame

def split_frame_into_blocks(y_frame: np.array, block_size: int) -> list:
    """
    Split a frame into blocks.
    Args:
        y_frame (np.array): A numpy array of pixel data for the brightness (Y) component
        block_size (int): Size of the block.
    Returns:
        blocks (list): A list of blocks.
    """
    height, width = y_frame.shape
    padding_needed = False

    # calculate the width and height to see if padding is needed
    if width % block_size != 0:
        padding_width = block_size - (width % block_size)
        padding_needed = True
    else:
        padding_width = 0

    if height % block_size != 0:
        padding_height = block_size - (height % block_size)
        padding_needed = True
    else:
        padding_height = 0

    # if padding is needed, pad the frame with gray(128)
    if padding_needed:
        y_frame = np.pad(y_frame, ((0, padding_height), (0, padding_width)), 'constant', constant_values=128)

    # split the frame into blocks
    blocks = []

    for i in range(0, height, block_size):
        for j in range(0, width, block_size):
            block = y_frame[i:i + block_size, j:j + block_size]
            blocks.append(block)
    
    return blocks

#### Test for 2.b

In [20]:
y_frame = read_y_only_file("y_only_files/foreman_cif_000.y", 352, 288)
blocks = split_frame_into_blocks(y_frame, 8)
print(f"Number of blocks: {len(blocks)}")

Number of blocks: 1584


#### 2.c Calculate the average* of the sample values within each (𝑖 × 𝑖) block *Use efficient rounded division by (𝑖 × 𝑖) while calculating the approximated average

In [21]:
import numpy as np

def calculate_approximate_average(block: np.array) -> int:
    """
    Calculate the approximate average value of a block.
    Args:
        block (np.array): A numpy array of pixel data for a block.
    Returns:
        avg (int): The approximate average value of the block.
    """
    avg = np.sum(block) // block.size
    return avg

def calculate_average_for_blocks(blocks: list) -> list:
    """
    Calculate the average value for each block.
    Args:
        blocks (list): A list of blocks.
    Returns:
        avgs (list): A list of average values for the blocks.
    """
    avgs = []

    for block in blocks:
        avg = calculate_approximate_average(block)
        avgs.append(avg)
    
    return avgs

#### Test for 2.c

In [22]:
avgs = calculate_average_for_blocks(blocks)

for i, avg in enumerate(avgs):
    print(f"Block {i+1}: {avg}")
    if i == 9:
        break

Block 1: 156.0
Block 2: 200.0
Block 3: 200.0
Block 4: 193.0
Block 5: 199.0
Block 6: 211.0
Block 7: 224.0
Block 8: 220.0
Block 9: 194.0
Block 10: 188.0


#### 2.d Replace every (𝑖 × 𝑖) block with another (𝑖 × 𝑖) block of identical elements of this average value to generate Y-only-block-averaged file(s)

In [44]:
import numpy as np

def replace_block_with_average(block: np.array, avg: int) -> np.array:
    """
    Replace a block with its average value.
    Args:
        block (np.array): A numpy array of pixel data for a block.
        avg (int): The average value of the block.
    Returns:
        replaced_block (np.array): A numpy array of pixel data for a block.
    """
    replaced_block = np.full_like(block, avg)
    return replaced_block

def replace_blocks_with_average(blocks: list, avgs: list) -> list:
    """
    Replace blocks with their average values.
    Args:
        blocks (list): A list of blocks.
        avgs (list): A list of average values for the blocks.
    Returns:
        replaced_blocks (list): A list of blocks.
    """
    replaced_blocks = []

    for block, avg in zip(blocks, avgs):
        replaced_block = replace_block_with_average(block, avg)
        replaced_blocks.append(replaced_block)
    
    return replaced_blocks

def save_y_only_averaged_file(output_file_path: str, y_avg_only_frame: np.array) -> None:
    """
    Save y only averaged file.
    Args:
        y_avg_only_frame (np.array): A numpy array of pixel data for the brightness (Y) component
        output_file_path (str): Path of the output file.
    """
    with open(output_file_path, 'wb') as file:
        y_avg_only_frame.tofile(file)

#### Test for 2.d

In [46]:
replaced_blocks = replace_blocks_with_average(blocks, avgs)

avg_frame = np.block(replaced_blocks)

output_file_path = "y_only_avg_files/foreman_cif_000.y"
save_y_only_averaged_file(output_file_path, avg_frame)

#### 2.e Subjectively compare* every original Y-only file with its corresponding Y-only-block-averaged one *In addition to the frame to frame comparison, you can use simple frame differencing (multiplied by an arbitrary factor to magnify the deltas)

In [51]:
import numpy as np

def subjective_comparison(original_file_path: str, average_file_path: str, factor=1.0):
    """
    Compare the original and the average file.
    Args:
        original_file_path (str): Path of the original file.
        average_file_path (str): Path of the average file.
        factor (float): The factor used to amplify the frame difference
    Returns:
        diff_frames (list): A list of frame differences.
    """
    # read the original and the average file
    original_frame = read_y_only_file(original_file_path, 352, 288)
    average_frame = read_y_only_file(average_file_path, 352, 288)

    # calculate the frame difference
    diff_frame = np.abs(original_frame.astype(float) - average_frame.astype(float)) * factor

    return diff_frame

#### Test for 2.e

In [52]:
magnification_factor = 10.0

diff_frames = subjective_comparison("y_only_files/foreman_cif_000.y", "y_only_avg_files/foreman_cif_000.y", magnification_factor)

#### 2.f Using average PSNR and SSIM among frames, compare every original Y-only file with its corresponding Y-only-block-averaged one

In [53]:
import numpy as np
from skimage.metrics import peak_signal_noise_ratio, structural_similarity

def compare_y_files(original_file_path: str, average_file_path: str) -> (float, float):
    """
    Compare the original and the average file using PSNR and SSIM.
    Args:
        original_file_path (str): Path of the original file.
        average_file_path (str): Path of the average file.
    Returns:
        psnr (float): PSNR value.
        ssim (float): SSIM value.
    """
    # read the original and the average file
    original_frame = read_y_only_file(original_file_path, 352, 288)
    average_frame = read_y_only_file(average_file_path, 352, 288)

    # calculate PSNR and SSIM
    psnr = peak_signal_noise_ratio(original_frame, average_frame)
    ssim = structural_similarity(original_frame, average_frame)

    return psnr, ssim

#### Test for 2.f

In [54]:
psnr, ssim = compare_y_files("y_only_files/foreman_cif_000.y", "y_only_avg_files/foreman_cif_000.y")

print(f"PSNR: {psnr:.2f} dB")
print(f"SSIM: {ssim:.2f}")

PSNR: 11.36 dB
SSIM: 0.12


### E3 Basic Motion Estimation/Compensation

#### 3.a Repeat parts a and b of E2

#### 3.b Using integer pixel full search, find the best predicted block for every (𝑖 × 𝑖) current block

In [55]:
import numpy as np

def create_virtual_reference_frame(frame_shape: tuple, value=128) -> np.array:
    """
    Create a virtual reference frame.
    Args:
        frame_shape (tuple): Shape of the frame, i.e. (height, width).
        value (int): Value of the frame.
    Returns:
        virtual_reference_frame (np.array): A numpy array of pixel data for the virtual reference frame.
    """
    virtual_reference_frame = np.full(frame_shape, value, dtype=np.uint8)
    return virtual_reference_frame

def integer_pixel_full_search(current_block: np.array, reference_frame: np.array, search_range: int) -> (np.array, tuple):
    """
    Using integer pixel full search to find the best matching block.
    Args:
        current_block (np.array): A numpy array of pixel data for the current block.
        reference_frame (np.array): A numpy array of pixel data for the reference frame.
        search_range (int): Search range. (+/- r pixels)
    Returns:
        best_matching_block (np.array): A numpy array of pixel data for the best matching block.
        motion_vector (tuple): Motion vector, i.e. (dy, dx).
    """
    best_mae = float('inf')
    best_matching_block = None
    motion_vector = (0, 0)

    height, width = current_block.shape

    for dx in range(-search_range, search_range+1):
        for dy in range(-search_range, search_range+1):
            x_start = max(0,dx)
            x_end = min(width, width+dx)
            y_start = max(0,dy)
            y_end = min(height, height+dy)

            reference_block = reference_frame[y_start:y_end, x_start:x_end]

            if reference_block.shape == current_block.shape:
                mae = np.abs(current_block - reference_block).mean()

                if mae < best_mae:
                    best_mae = mae
                    best_matching_block = reference_block
                    motion_vector = (dx, dy)
    
    return best_matching_block, motion_vector

#### 3.c Dump the x and y components of every successful MV into a text (and/or binary) file using an arbitrary format that preserves the coordinates of the block

In [56]:
def dump_motion_vectors_to_file(motion_vectors: list, file_path: str) -> None:
    """
    Save motion vectors to a file.
    Args:
        motion_vectors (list): A list of motion vectors.
        file_path (str): Path of the output file.
    """
    with open(file_path, 'w') as file:
        for mv in motion_vectors:
            x, y = mv
            file.write(f"{x} {y}\n")

    file.close()

#### 3.d An (𝑖 × 𝑖) residual block will be generated by subtracting the predicted block from the current block

In [57]:
import numpy as np

def generate_residual_block(current_block: np.array, predicted_block: np.array) -> np.array:
    """
    Generate (i x i) residual block.
    Args:
        current_block (np.array): A numpy array of pixel data for the current block.
        predicted_block (np.array): A numpy array of pixel data for the predicted block.
    Returns:
        residual_block (np.array): A numpy array of pixel data for the residual block.
    """
    # check if the current block and the predicted block have the same shape
    assert current_block.shape == predicted_block.shape

    # calculate the residual block
    residual_block = current_block - predicted_block

    return residual_block

#### 3.e Generate an approximated residual block by rounding every element in the (𝑖 × 𝑖) residual block to the nearest multiple of 2n (for 𝑛 = 1, 2, 3)

In [58]:
import numpy as np

def approximate_residual_block(residual_block: np.array, n: int) -> np.array:
    """
    Generate approximate residual block. Rounding each element in the residual block to the nearest multiple of 2^n.
    Args:
        residual_block (np.array): A numpy array of pixel data for the residual block.
        n (int): Round to the nearest multiple of 2^n
    Returns:
        approximated_residual_block (np.array): A numpy array of pixel data for the approximated residual block.
    """
    factor = 2 ** n

    approximated_residual_block = np.round(residual_block / factor) * factor

    return approximated_residual_block

#### 3.f Dump the (𝑖 × 𝑖) approximated residual values into a text (and/or binary) file using an arbitrary format that preserves the coordinates of the block

In [None]:
import numpy as np

def dump_approximated_residuals_to_file():
    