# Install important dependencies




In [2]:
!pip install kornia
!pip install einops
!pip install pytorch-lightning==1.9.5
!pip install tensorboard

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting kornia
  Downloading kornia-0.6.12-py2.py3-none-any.whl (653 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m653.4/653.4 kB[0m [31m10.2 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: kornia
Successfully installed kornia-0.6.12
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting einops
  Downloading einops-0.6.1-py3-none-any.whl (42 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.2/42.2 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: einops
Successfully installed einops-0.6.1
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pytorch-lightning==1.9.5
  Downloading pytorch_lightning-1.9.5-py3-none-any.whl (829 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32

# Import the Important libraries




In [3]:
import torch
from torch.utils.data import Dataset
import imageio
import numpy as np
from kornia import create_meshgrid
from einops import rearrange
import cv2
from typing import Tuple
from torch import nn
import torch

# datasets
from torch.utils.data import DataLoader


# optimizer
from torch.optim import Adam
from torch.optim.lr_scheduler import CosineAnnealingLR
from pytorch_lightning import LightningModule, Trainer
from pytorch_lightning.callbacks import TQDMProgressBar
from pytorch_lightning.loggers import TensorBoardLogger



# Dataset loader

In [4]:
class ImageDataset(Dataset):
    def __init__(self, image_path: str, img_wh: Tuple[int, int], split: str):
        image = imageio.imread(image_path)[..., :3]/255.
        image = cv2.resize(image, img_wh)
        # image = np.load('images/data_2d_text.npz')['test_data'][6]/255.

        self.uv = create_meshgrid(*image.shape[:2], True)[0]
        self.rgb = torch.FloatTensor(image)

        if split == 'train':
            self.uv = self.uv[::2, ::2]
            self.rgb = self.rgb[::2, ::2]
        elif split == 'val':
            self.uv = self.uv[1::2, 1::2]
            self.rgb = self.rgb[1::2, 1::2]

        self.uv = rearrange(self.uv, 'h w c -> (h w) c')
        self.rgb = rearrange(self.rgb, 'h w c -> (h w) c')

    def __len__(self):
        return len(self.uv)

    def __getitem__(self, idx: int):
        return {"uv": self.uv[idx], "rgb": self.rgb[idx]}

# MLP Model with ReLu and PE

In [5]:
class MLP(nn.Module):
    def __init__(self, n_in,
                 n_layers=4, n_hidden_units=256,
                 act='relu', act_trainable=False):
        super().__init__()

        layers = []
        for i in range(n_layers):

            if i == 0:
                l = nn.Linear(n_in, n_hidden_units)
            elif 0 < i < n_layers-1:
                l = nn.Linear(n_hidden_units, n_hidden_units)

            if act == 'relu':
                act_ = nn.ReLU(True)
            
            if i < n_layers-1:
                layers += [l, act_]
            else:
                layers += [nn.Linear(n_hidden_units, 3), nn.Sigmoid()]

        self.net = nn.Sequential(*layers)

    def forward(self, x):
        """
        x: (B, 2) # pixel uv (normalized)
        """
        return self.net(x) # (B, 3) rgb


class PE(nn.Module):
    """
    perform positional encoding
    """
    def __init__(self, P):
        """
        P: (2, F) encoding matrix
        """
        super().__init__()
        self.register_buffer("P", P)

    @property
    def out_dim(self):
        return self.P.shape[1]*2

    def forward(self, x):
        """
        x: (B, 2)
        """
        x_ = 2*np.pi*x @ self.P # (B, F)
        return torch.cat([torch.sin(x_), torch.cos(x_)], 1) # (B, 2*F)

In [6]:
mlp = MLP(2, n_layers=4, n_hidden_units=256,act='relu')

In [7]:
print(mlp)

MLP(
  (net): Sequential(
    (0): Linear(in_features=2, out_features=256, bias=True)
    (1): ReLU(inplace=True)
    (2): Linear(in_features=256, out_features=256, bias=True)
    (3): ReLU(inplace=True)
    (4): Linear(in_features=256, out_features=256, bias=True)
    (5): ReLU(inplace=True)
    (6): Linear(in_features=256, out_features=3, bias=True)
    (7): Sigmoid()
  )
)


# metrics

In [8]:
def mse(image_pred, image_gt, valid_mask=None, reduction='mean'):
    value = (image_pred-image_gt)**2
    if valid_mask is not None:
        value = value[valid_mask]
    if reduction == 'mean':
        return torch.mean(value)
    return value

@torch.no_grad()
def psnr(image_pred, image_gt, valid_mask=None, reduction='mean'):
    return -10*torch.log10(mse(image_pred, image_gt, valid_mask, reduction))

# Train

In [9]:
# hyper parameters
image_path = '/content/1.jpeg'
batch_size = 1024
lr = 1e-4
img_wh = [800, 800]
num_epochs = 1000

In [12]:

def get_learning_rate(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']


class CoordMLPSystem(LightningModule):
    # hyper parameters
    image_path = '/content/1.jpeg'
    batch_size = 1024
    lr = 1e-4
    img_wh = [800, 800]
    num_epochs = 1000

    def __init__(self):
        super().__init__()
      
        P = torch.cat([torch.eye(2)*2**i for i in range(10)], 1) # (2, 2*10)
        self.pe = PE(P)


        act ='relu'

        n_in = self.pe.out_dim

        self.mlp = MLP(n_in=n_in, act=act)

       

    def forward(self, x):
        x = self.pe(x)
        return self.mlp(x)
        
    def setup(self, stage=None):
        self.train_dataset = ImageDataset(image_path,img_wh,'train')
        self.val_dataset = ImageDataset(image_path,img_wh,'val')

    def train_dataloader(self):
        return DataLoader(self.train_dataset,
                          shuffle=True,
                          num_workers=4,
                          batch_size=self.batch_size,
                          pin_memory=True)

    def val_dataloader(self):
        return DataLoader(self.val_dataset,
                          shuffle=False,
                          num_workers=4,
                          batch_size=self.batch_size,
                          pin_memory=True)

    def configure_optimizers(self):
        self.opt = Adam(self.mlp.parameters(), lr=self.lr)
        scheduler = CosineAnnealingLR(self.opt, num_epochs, lr/1e2)

        return [self.opt], [scheduler]

    def training_step(self, batch, batch_idx):
        rgb_pred = self(batch['uv'])

        loss = mse(rgb_pred, batch['rgb'])
        psnr_ = psnr(rgb_pred, batch['rgb'])

        self.log('lr', self.opt.param_groups[0]['lr'])
        self.log('train/loss', loss)
        self.log('train/psnr', psnr_, prog_bar=True)

        return loss

    def validation_step(self, batch, batch_idx):
        rgb_pred = self(batch['uv'])

        loss = mse(rgb_pred, batch['rgb'], reduction='none')

        log = {'val_loss': loss,
               'rgb_gt': batch['rgb']}

        log['rgb_pred'] = rgb_pred

        return log

    def validation_epoch_end(self, outputs):
        mean_loss = torch.cat([x['val_loss'] for x in outputs]).mean()
        mean_psnr = -10*torch.log10(mean_loss)
        rgb_gt = torch.cat([x['rgb_gt'] for x in outputs])
        rgb_gt = rearrange(rgb_gt, '(h w) c -> c h w',
                           h=img_wh[1]//2,
                           w=img_wh[0]//2)
        rgb_pred = torch.cat([x['rgb_pred'] for x in outputs])
        rgb_pred = rearrange(rgb_pred, '(h w) c -> c h w',
                             h=img_wh[1]//2,
                             w=img_wh[0]//2)

        self.logger.experiment.add_images('val/gt_pred',
                                          torch.stack([rgb_gt, rgb_pred]),
                                          self.global_step)

        self.log('val/loss', mean_loss, prog_bar=True)
        self.log('val/psnr', mean_psnr, prog_bar=True)



# Start the training

In [None]:

if __name__ == '__main__':
    system = CoordMLPSystem()

    pbar = TQDMProgressBar(refresh_rate=1)
    callbacks = [pbar]

    logger = TensorBoardLogger(save_dir="logs",
                               name='MLP_With_PE',
                               default_hp_metric=False)

    trainer = Trainer(max_epochs=num_epochs,
                      callbacks=callbacks,
                      logger=logger,
                      enable_model_summary=True,
                      accelerator='auto',
                      devices=1,
                      num_sanity_val_steps=0,
                      log_every_n_steps=1,
                      check_val_every_n_epoch=20,
                      benchmark=True)

    trainer.fit(system)

INFO:pytorch_lightning.utilities.rank_zero:GPU available: False, used: False
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs
  image = imageio.imread(image_path)[..., :3]/255.
INFO:pytorch_lightning.callbacks.model_summary:
  | Name | Type | Params
------------------------------
0 | pe   | PE   | 0     
1 | mlp  | MLP  | 142 K 
------------------------------
142 K     Trainable params
0         Non-trainable params
142 K     Total params
0.571     Total estimated model params size (MB)


Training: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

# Plot the resulta

In [None]:
# Make this dir and move the log to it
!mkdir log_directory


In [None]:
%load_ext tensorboard
log_directory = '/content/log_directory'
%tensorboard --logdir log_directory