# INTRODUCTION

Competition: https://codalab.lisn.upsaclay.fr/competitions/21112

This is a challenge associated to the PRIN 2022 project "LICAM - AI-powered LiDAR fusion for next-generation smartphone cameras". The challenge task is to deblur real low-light images taken by the iPhone 15 Pro, using both the blurred image and the co-registered depth map produced by the Lidar sensor. The deblurred images will be compared to registered ground truth sharp images by means of the LPIPS perceptual quality metric.

Training and validation data are provided from a novel dataset of low-light iPhone images affected by noise and motion blur, with a registered Lidar map and a sharp ground truth image. These images are the most similar to the test images. Participants may also use the ARKitScenes dataset to pretrain their models by simulating motion blur.

The LICAM -“AI-powered LIDAR fusion for next-generation smartphone cameras (LICAM)” project is funded by European Union – Next Generation EU within the PRIN 2022 program (D.D. 104 - 02/02/2022 Ministero dell’Università e della Ricerca). The contents of this website reflect only the authors' views and opinions and the Ministry cannot be considered responsible for them.

In [None]:
!pip install keras-unet-collection

In [None]:
import glob
from PIL import Image
import numpy as np
import re
import tensorflow as tf
import torch
import random
from sklearn.model_selection import train_test_split
from keras_unet_collection import models

import matplotlib.pyplot as plt

from typing import Optional

In [None]:
PATH_TRAIN: str = "/kaggle/input/lidar-challenge/LICAM_deblur_challenge_dataset/train_val"
PATH_TEST: str = "/kaggle/input/lidar-challenge/LICAM_deblur_challenge_dataset/test"
PATH_DATA: str = "/kaggle/input/lidar-challenge/data"

# Depth map preprocessing

In [None]:
MASK_VALUE: int = -1

In [None]:
files_train = glob.glob(f"{PATH_TRAIN}/images45/*/depth/*.png")
files_data = glob.glob(f"{PATH_DATA}/*/*/highres_depth/*.png")

In [None]:
for file in files_train:
    img = np.array(Image.open(file), dtype=np.int32)
    img = (img - np.min(img)) / (np.max(img) - np.min(img))
    np.save(f'./0_depth_{re.findall(r".*/images45/.*/depth/(.*).png", file)[0]}.npy', img)

In [None]:
for file in files_data:
    img = np.array(Image.open(file).resize(size=(512, 512), resample=Image.NEAREST), dtype=np.int32)
    img[img == 0] = MASK_VALUE
    img[img != MASK_VALUE] = ((img[img != MASK_VALUE] - np.min(img[img != MASK_VALUE])) 
                              / (np.max(img[img != MASK_VALUE]) - np.min(img[img != MASK_VALUE])))
    np.save(f'./1_depth_{re.findall(r".*/.*/.*/highres_depth/(.*).png", file)[0]}.npy', img)

# Image preprocessing

In [None]:
files_train = glob.glob(f"{PATH_TRAIN}/images45/*/rgb/*.png")
files_data = glob.glob(f"{PATH_DATA}/*/*/wide/*.png")

In [None]:
for file in files_train:
    img = np.array(Image.open(file), dtype=np.float32)
    img = img / 255.
    np.save(f'./0_rgb_{re.findall(r".*/images45/.*/rgb/(.*).png", file)[0]}.npy', img)

In [None]:
for file in files_data:
    img = np.array(Image.open(file).resize(size=(512, 512), resample=Image.LANCZOS), dtype=np.int32)
    img = img / 255.
    np.save(f'./1_rgb_{re.findall(r".*/.*/.*/wide/(.*).png", file)[0]}.npy', img)

# Ground truth

In [None]:
files_train = glob.glob(f"{PATH_TRAIN}/images45/*/gt/*.png")

In [None]:
for file in files_train:
    img = np.array(Image.open(file), dtype=np.float32)
    img = img / 255.
    np.save(f'./0_gt_{re.findall(r".*/images45/.*/gt/(.*).png", file)[0]}.npy', img)

# Sequence generator

In [None]:
idx = glob.glob(f"./*_rgb_*.npy")
idx = np.asarray([re.findall(r"./(.*)_rgb_(.*).npy", file)[0] for file in idx])

In [None]:
def random_motion(steps: Optional[int] = 16,
                  initial_vector: Optional[torch.Tensor] = None,
                  alpha: Optional[float] = 0.2):
    motion = [torch.zeros_like(initial_vector)]
    for s in range(steps):
        change = torch.randn(initial_vector.shape[0], dtype=torch.cfloat)
        initial_vector = initial_vector + change * alpha
        initial_vector /= initial_vector.abs().add(1e-8)
        motion.append(motion[-1] + initial_vector)
    motion = torch.stack(motion, -1)
    motion -= motion.mean(-1, keepdim=True)
    xrange = max(motion.real.max().ceil().long(), -motion.real.min().floor().long()) * 2
    yrange = max(motion.imag.max().ceil().long(), -motion.imag.min().floor().long()) * 2
    
    kernel = torch.zeros(initial_vector.shape[0], 1, yrange.item()+1, xrange.item()+1)
    for s in range(steps):
        v = motion[:,s] + kernel.shape[-1] // 2 + (kernel.shape[-2] // 2)*1j
        ixs = v.real.long() 
        iys = v.imag.long()
        vxs = v.real - ixs
        vys = v.imag - iys

        for i, (iy, ix, vy, vx) in enumerate(zip(iys, ixs, vys, vxs)): 
            kernel[i,0,iy,ix] += (1-vx) * (1-vy) / steps
            kernel[i,0,iy,ix+1] += vx * (1-vy) / steps
            kernel[i,0,iy+1,ix] += (1-vx) * vy / steps
            kernel[i,0,iy+1,ix+1] += vx * vy / steps
        
    return kernel

In [None]:
class RandomMotionBlur(torch.nn.Module):
    def __init__(self,
                 steps: Optional[int] = 17,
                 alpha: Optional[float] = 0.2):
        super().__init__()
        self.steps = steps
        self.alpha = alpha
        
    def forward(self,
                x: torch.Tensor):
        x = x.swapaxes(1, 3)
        vector = torch.randn(x.shape[0], dtype=torch.cfloat) / 3
        vector.real /= 2
        m = random_motion(self.steps, vector, alpha=self.alpha)
        xpad = [m.shape[-1]//2+1] * 2 + [m.shape[-2]//2+1] * 2
        x = torch.nn.functional.pad(x, xpad)
        mpad = [0, x.shape[-1]-m.shape[-1], 0, x.shape[-2]-m.shape[-2]]
        mp = torch.nn.functional.pad(m, mpad)
        fx = torch.fft.fft2(x)
        fm = torch.fft.fft2(mp)
        fy = fx * fm
        y = torch.fft.ifft2(fy).real
        y = y[...,xpad[2]:-xpad[3], xpad[0]:-xpad[1]].swapaxes(1, 3)
        return y

In [None]:
class DataGenerator(tf.keras.utils.Sequence):
    def __init__(self, 
                 idx: np.ndarray,
                 batch_size: Optional[int] = 8,
                 shuffle: Optional[bool] = True,
                 **kwargs):
        super(tf.keras.utils.Sequence, self).__init__(**kwargs)
        self.idx = idx
        self.batch_size = batch_size
        self.shuffle = shuffle

    def __len__(self):
        return int(np.ceil(len(self.idx) / self.batch_size))

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.idx)

    def __getitem__(self,
                    index: int):
        idx = self.idx[index * self.batch_size : (index + 1) * self.batch_size]
        X_rgb = np.zeros((len(idx), 512, 512, 3), dtype=np.float32)
        X_depth = np.zeros((len(idx), 512, 512), dtype=np.float32)
        y = np.zeros((len(idx), 512, 512, 3), dtype=np.float32)
        for i, (dtype, file) in enumerate(idx):
            X_depth[i] = np.load(f"./{dtype}_depth_{file}.npy")
            X_rgb[i] = np.load(f"./{dtype}_rgb_{file}.npy")
            y[i] = np.load(f"./{dtype}_gt_{file}.npy") if dtype == "0" else np.load(f"./{dtype}_rgb_{file}.npy")
            if dtype == "1":
                blur = RandomMotionBlur(steps=random.randint(5, 50))
                X_rgb[i] = blur(torch.Tensor(X_rgb[i][np.newaxis, ...]))[0]
        return (X_rgb, X_depth[..., np.newaxis]), y

In [None]:
idx_train, idx_test = train_test_split(range(len(idx)), test_size=0.3)

In [None]:
train_generator = DataGenerator(idx=idx[idx_train])
test_generator = DataGenerator(idx=idx[idx_test])

# Model

In [None]:
unet = models.unet_2d((512, 512, 4),
                      [16, 32, 64],
                      n_labels=3,
                      stack_num_down=1,
                      stack_num_up=1,
                      activation='Snake',
                      output_activation=None, 
                      batch_norm=True,
                      pool='max',
                      unpool='nearest',
                      name='unet')
unet.summary()

In [None]:
input_rgb = tf.keras.Input(shape=(512, 512, 3), dtype=tf.float32)
input_depth = tf.keras.Input(shape=(512, 512, 1), dtype=tf.float32)
x = tf.keras.layers.concatenate([input_rgb, input_depth], axis=3)
outputs = unet(x)
model = tf.keras.models.Model(inputs=[input_rgb, input_depth],
                              outputs=outputs)
model.summary()

In [None]:
class VGGFeatureMatchingLoss(tf.keras.losses.Loss):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.encoder_layers = ["block1_conv1",
                               "block2_conv1",
                               "block3_conv1",
                               "block4_conv1",
                               "block5_conv1"]
        self.weights = [1.0 / 32, 1.0 / 16, 1.0 / 8, 1.0 / 4, 1.0]
        vgg = tf.keras.applications.VGG19(include_top=False, weights="imagenet")
        layer_outputs = [vgg.get_layer(x).output for x in self.encoder_layers]
        self.vgg_model = tf.keras.Model(vgg.input, layer_outputs, name="VGG")
        self.mae = tf.keras.losses.MeanAbsoluteError()

    def call(self, y_true, y_pred):
        y_true = tf.keras.applications.vgg19.preprocess_input(255. * y_true)
        y_pred = tf.keras.applications.vgg19.preprocess_input(255. * y_pred)
        real_features = self.vgg_model(y_true)
        fake_features = self.vgg_model(y_pred)
        loss = 0
        for i in range(len(real_features)):
            loss += self.weights[i] * self.mae(real_features[i], fake_features[i])
        return loss

In [None]:
class PerceptualLoss(tf.keras.losses.Loss):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.mae = tf.keras.losses.MeanAbsoluteError()
        self.mse = tf.keras.losses.MeanSquaredError()
        self.vggloss = VGGFeatureMatchingLoss()

    def call(self, y_true, y_pred):
        loss = (self.vggloss(y_true, y_pred) * 1.5
                + self.mae(y_true, y_pred) * 0.5
                + self.mse(y_true, y_pred) * 0.5
                + (1 - tf.reduce_mean(tf.image.ssim(tf.clip_by_value(y_true, clip_value_max=1.0, clip_value_min=0.0), 
                                                    tf.clip_by_value(y_pred, clip_value_max=1.0, clip_value_min=0.0), 
                                                    1.0))) * 0.7)
        return loss

In [None]:
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
              loss=PerceptualLoss(),
              metrics=["mae", "mse"])

In [None]:
model.fit(train_generator,
          validation_data=test_generator,
          epochs=10,
          verbose=1)

# Predict

In [None]:
X_rgb = np.asarray(Image.open(f"{PATH_TEST}/1/rgb/1.png"))[np.newaxis, ...]
X_depth = np.asarray(Image.open(f"{PATH_TEST}/1/depth/1.png"))[np.newaxis, ...]

In [None]:
y_pred = model.predict([X_rgb, X_depth])

In [None]:
y_pred = np.clip(y_pred, a_max=1., a_min=0.) * 255.

In [None]:
plt.imshow(y_pred[0])
plt.show()