In [None]:
import os
import rawpy
from PIL import Image, ImageOps
import matplotlib.pyplot as plt
from datasets import load_dataset

def process_raw_image(raw_folder: str, image_name: str, target_resolution: tuple[int, int], processed_folder: str) -> None:

    # separate image name from image format
    image_name, image_format = os.path.splitext(image_name)

    # import input image
    with rawpy.imread(f'{raw_folder}/{image_name}.{image_format}').postprocess(use_camera_wb=True) as raw_image:

        # Convert the raw image to a Pillow Image
        input_image = Image.fromarray(raw_image)

        # Resize the image to fit within the target size while preserving aspect ratio
        input_image = ImageOps.fit(input_image, target_resolution)

        # Save image
        input_image.save(f'{processed_folder}/{image_name}.png')

def extract_images_from_dataset(dataset, image_resolution, n_images_to_save, output_folder) -> None:
    
    # find first 10 images with target resolution
    for image in dataset:
        if image['image'].size == image_resolution:

            # Save image to the specified folder
            image['image'].save(f'{output_folder}/image_{n_images_to_save}.png')
            n_images_to_save -= 1
            if n_images_to_save == 0:
                break

def convert_

raw_folder = '/home/itec/emanuele/inpainting/raw_images'
input_folder = '/home/itec/emanuele/inpainting/inputs'
image_resolution = (512, 512)

imagenet_hard = load_dataset('taesiri/imagenet-hard', split='validation', streaming=True)
n_images_to_save = 100
extract_images_from_dataset(imagenet_hard, image_resolution, n_images_to_save, input_folder)



# raw_image = rawpy.imread(raw_folder + 'Nikon-D750-Shotkit-6.NEF')

# for image_name in os.listdir(raw_folder):
#     print(image_name)
#     process_raw_image(raw_folder, image_name, image_resolution, input_folder)

# process_raw_image(raw_folder, 'Nikon-D750-Shotkit-6.NEF', image_size, input_folder)

In [None]:

import numpy as np
import torch
from diffusers import StableDiffusionInpaintPipeline


output_folder = '/home/itec/emanuele/inpainting/outputs/'
masks_folder = '/home/itec/emanuele/inpainting/masks/'
block_size = 32 # stable diffusion cannot work with smaller blocks
blocks_per_row = int(input_image.size[0] / block_size)
blocks_per_column = int(input_image.size[1] / block_size)

# empty input checkers folder
for file in os.listdir(input_folder):
    os.remove(input_folder + file)

# empty output checkers folder
for file in os.listdir(output_folder):
    os.remove(output_folder + file)

def create_checkerboard_mask(image_size, block_size):
    height, width = image_size
    mask = np.zeros((height, width), dtype=np.uint8)
    num_blocks_height = height // block_size
    num_blocks_width = width // block_size

    for i in range(num_blocks_height):
        for j in range(num_blocks_width):
            if (i + j) % 2 != 0:
                mask[i * block_size: (i + 1) * block_size,
                     j * block_size: (j + 1) * block_size] = 255

    return mask

mask = create_checkerboard_mask([input_image.size[0], input_image.size[1]], block_size)

# save mask
mask_image = Image.fromarray(mask, mode='L')
mask_image.save(masks_folder + 'checkerboard_mask.png')

pipe = StableDiffusionInpaintPipeline.from_pretrained(
    "runwayml/stable-diffusion-inpainting",
    revision="fp16",
    torch_dtype=torch.float16,
)
pipe = pipe.to("cuda")
prompt = ''
# image and mask_image should be PIL images.
# The mask structure is white for inpainting and black for keeping as is
output_image = pipe(prompt=prompt, image=input_image, mask_image=mask_image).images[0]
output_image.save(output_folder + 'checkerboard_output.yuv')

def plot_images(images_dict):
    num_images = len(images_dict)

    fig, axes = plt.subplots(1, num_images, figsize=(5 * num_images, 5))

    for i, (title, image) in enumerate(images_dict.items()):
        axes[i].imshow(image, cmap='gray')
        axes[i].set_title(title)
        axes[i].axis('off')

    plt.show()



In [None]:
import cv2
import numpy as np
from skimage.metrics import structural_similarity as ssim
import subprocess

# Load images
img1 = np.array(input_image)
img2 = np.array(output_image)

def psnr(img1, img2):
    mse = np.mean((img1 - img2) ** 2)
    if mse == 0:
        return "Identical images"
    max_pixel = 255.0
    psnr_value = 20 * np.log10(max_pixel / np.sqrt(mse))
    return psnr_value

# Convert images to YUV using FFmpeg
def convert_to_yuv(input_path: str, output_path: str) -> None:
    cmd = f'ffmpeg -i {input_path} -pix_fmt yuv420p {output_path}'
    subprocess.run(cmd, shell=True)

convert_to_yuv(input_folder + 'input_image.png', )

# TODO: change into lossless codec
def image_into_video(image, output_video_path):
    # Create a video writer object
    video_writer = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*'I420'), 1, (image.shape[1], image.shape[0]), False)

    # Convert frame to YUV format
    image = cv2.cvtColor(image, cv2.COLOR_BGR2YUV_I420)

    # Write images to video
    video_writer.write(image)

    # Release video writer
    video_writer.release()
    cv2.destroyAllWindows()

# TODO: VMAF is broken because we have a very old old version on the server, I cannot install a newer one
def calculate_vmaf(video_file1, video_file2, vmaf_model, csv_path):

    cmd = f'vmafossexec yuv420p {input_image.size[0]} {input_image.size[1]} {video_file1} {video_file2} {vmaf_model} --log {csv_path}'
    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
    return result



# Calculate PSNR
psnr_value = psnr(img1, img2)
print("PSNR:", psnr_value)

# Calculate SSIM
# Convert images to grayscale
img1_gray = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY)
img2_gray = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY)
ssim_value = ssim(img1_gray, img2_gray)
print("SSIM:", ssim_value)

# Replace 'video_file1' and 'video_file2' with your video file paths
vid1 = 'video1.yuv'
vid2 = 'video2.yuv'
image_into_video(img1, vid1)
image_into_video(img2, vid2)
vmaf_score = calculate_vmaf(vid1, vid2, '/home/shared/athena/vmaf/model/vmaf_v0.6.1.pkl', 'vmaf_results.csv') # TODO: score is the result of cmd operation, not vmap, fix it
print("VMAF Score:", vmaf_score)

In [None]:
# TODO: this image seems complex and blurry, run this process for a batch of several different images, and check general results