In [None]:
%cd thesis
import sys

sys.path.append(os.getcwd())

In [None]:
%pip install -r requirements.conda.txt

In [None]:
import torch
import os
from torchsummary import summary
from realesrgan import RealESRGANer
from basicsr.archs.rrdbnet_arch import RRDBNet

os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"


In [None]:
torch.cuda.is_available()

In [None]:
# Upsampler

model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64)

upsampler = RealESRGANer(
    scale=4,
    model_path='./realesrgan/experiments/pretrained_models/RealESRGAN_x4plus.pth',
    model=model,
    tile=False,
    tile_pad=10,
    pre_pad=0,
    half=False)

In [None]:
from src.helpers import utils
from compress import make_deterministic
from src.loss.perceptual_similarity import perceptual_loss as ps
from default_config import ModelModes

In [None]:
# Compression

# Reproducibility
make_deterministic()
perceptual_loss_fn = ps.PerceptualLoss(model='net-lin', net='alex', use_gpu=torch.cuda.is_available())

# Load model
device = torch.device('cpu')#utils.get_device()
print(device)
logger = utils.logger_setup(logpath=os.path.join('images', 'logs'), filepath=os.path.abspath('1'))
loaded_args, compression, _ = utils.load_model('experiments/hific_low.pt', logger, device, model_mode=ModelModes.EVALUATION,
    current_args_d=None, prediction=True, strict=False)
print('logger done')

#compression.Hyperprior.hyperprior_entropy_model.build_tables()

In [None]:
# Input size: [2, 220, 8, 8]

input = torch.rand(1, 220, 8, 8)

result = compression.Generator(input)
summary(compression.Generator, (220, 8, 8), 1, device='cpu')


In [None]:
summary(compression, (3, 128, 128), 2, device='cpu')

In [None]:
summary(upsampler.model, (3, 128, 128), 1, device='cpu')

In [None]:
compression.train(False)
compression(torch.rand((16, 3, 128, 128)))

In [None]:
class Model(torch.nn.Module):
    def __init__(self, compression, upsampler):
        super().__init__()
        self.compression = compression
        self.upsampler = upsampler
    
    def forward(self, x):
        y, loss = self.compression(x)
        y = self.upsampler(y)
        return y

    def train(self, train=True):
        self.compression.train(False)
        self.compression.Generator.train(train)
        self.upsampler.train(train)

In [None]:
m = Model(compression, upsampler.model)

In [None]:
x = torch.rand(1, 3, 128, 128)
result = m(x)

print(result.shape)

### Remarks
- Conv2D with 2x2 padding that's equivalent to Conv2DTranspose with no padding.
- Checkerboard artifacts can start to become an issue when using strides (even after stacking multiple layers).

### Things to try: 

1. To avoid checkerboard artifacts, an alternative upsampling method that’s gaining popularity is to apply classical upsampling followed by a regular convolution (that preserves the spatial dimensions).

In [None]:
import torch
import torch.nn.functional as F
from src.normalisation import channel, instance

class ResidualBlock(torch.nn.Module):
    def __init__(self, input_dims, kernel_size=3, stride=1, 
                 channel_norm=True, activation='relu'):
        """
        input_dims: Dimension of input tensor (B,C,H,W)
        """
        super(ResidualBlock, self).__init__()

        self.activation = getattr(F, activation)
        in_channels = input_dims[1]
        norm_kwargs = dict(momentum=0.1, affine=True, track_running_stats=False)

        if channel_norm is True:
            self.interlayer_norm = channel.ChannelNorm2D_wrap
        else:
            self.interlayer_norm = instance.InstanceNorm2D_wrap

        pad_size = int((kernel_size-1)/2)
        self.pad = torch.nn.ReflectionPad2d(pad_size)
        self.conv1 = torch.nn.Conv2d(in_channels, in_channels, kernel_size, stride=stride)
        self.conv2 = torch.nn.Conv2d(in_channels, in_channels, kernel_size, stride=stride)
        self.norm1 = self.interlayer_norm(in_channels, **norm_kwargs)
        self.norm2 = self.interlayer_norm(in_channels, **norm_kwargs)

    def forward(self, x):
        identity_map = x
        res = self.pad(x)
        res = self.conv1(res)
        res = self.norm1(res) 
        res = self.activation(res)

        res = self.pad(res)
        res = self.conv2(res)
        res = self.norm2(res)

        return torch.add(res, identity_map)

class Upsampler(torch.nn.Module):

    def __init__(self, input_dims, batch_size, C=220, activation='relu',
                 n_residual_blocks=8, channel_norm=True, sample_noise=False,
                 noise_dim=32, silent=True):
        super(Upsampler, self).__init__()
        self.silent = silent

        kernel_dim = 3
        filters = [960, 480, 240, 120, 60]
        self.n_residual_blocks = n_residual_blocks
        self.sample_noise = sample_noise
        self.noise_dim = noise_dim

        # Layer / normalization options
        cnn_kwargs = dict(stride=2, padding=1, output_padding=1)
        norm_kwargs = dict(momentum=0.1, affine=True, track_running_stats=False)
        activation_d = dict(relu='ReLU', elu='ELU', leaky_relu='LeakyReLU')
        self.activation = getattr(torch.nn, activation_d[activation])  # (leaky_relu, relu, elu)
        self.n_upsampling_layers = 4
        
        if channel_norm is True:
            self.interlayer_norm = channel.ChannelNorm2D_wrap
        else:
            self.interlayer_norm = instance.InstanceNorm2D_wrap

        self.pre_pad = torch.nn.ReflectionPad2d(1)
        self.asymmetric_pad = torch.nn.ReflectionPad2d((0,1,1,0))  # Slower than tensorflow?
        self.post_pad = torch.nn.ReflectionPad2d(3)

        H0, W0 = input_dims[1:]
        heights = [2**i for i in range(5,9)]
        widths = heights
        H1, H2, H3, H4 = heights
        W1, W2, W3, W4 = widths 


        # (16,16) -> (16,16), with implicit padding
        self.conv_block_init = torch.nn.Sequential(
            self.interlayer_norm(C, **norm_kwargs),
            self.pre_pad,
            torch.nn.Conv2d(C, filters[0], kernel_size=(3,3), stride=1),
            self.interlayer_norm(filters[0], **norm_kwargs),
        )

        if sample_noise is True:
            # Concat noise with latent representation
            filters[0] += self.noise_dim

        for m in range(n_residual_blocks):
            resblock_m = ResidualBlock(input_dims=(batch_size, filters[0], H0, W0), 
                channel_norm=channel_norm, activation=activation)
            self.add_module(f'resblock_{str(m)}', resblock_m)
        
        self.upconv_block1 = torch.nn.Sequential(
            torch.nn.ConvTranspose2d(filters[0], filters[1], kernel_dim, **cnn_kwargs),
            self.interlayer_norm(filters[1], **norm_kwargs),
            self.activation(),
        )
        self.upconv_block2 = torch.nn.Sequential(
            torch.nn.ConvTranspose2d(filters[1], filters[2], kernel_dim, **cnn_kwargs),
            self.interlayer_norm(filters[2], **norm_kwargs),
            self.activation(),
        )
        self.upconv_block3 = torch.nn.Sequential(
            torch.nn.ConvTranspose2d(filters[2], filters[3], kernel_dim, **cnn_kwargs),
            self.interlayer_norm(filters[3], **norm_kwargs),
            self.activation(),
        )
        self.upconv_block4 = torch.nn.Sequential(
            torch.nn.ConvTranspose2d(filters[3], filters[4], kernel_dim, **cnn_kwargs),
            self.interlayer_norm(filters[4], **norm_kwargs),
            self.activation(),
        )
        self.conv_block_out = torch.nn.Sequential(
            self.post_pad,
            torch.nn.Conv2d(filters[-1], 3, kernel_size=(7,7), stride=1),
        )

    def forward(self, x):
        if not self.silent:
            print(f'{"INPUT : ": <15}', x.shape)
        head = self.conv_block_init(x)
        if not self.silent:
            print(f'{"INITIAL_CONV : ": <15}', head.shape)

        if self.sample_noise is True:
            B, C, H, W = tuple(head.size())
            z = torch.randn((B, self.noise_dim, H, W)).to(head)
            head = torch.cat((head,z), dim=1)

        for m in range(self.n_residual_blocks):
            resblock_m = getattr(self, f'resblock_{str(m)}')
            if m == 0:
                x = resblock_m(head)
            else:
                x = resblock_m(x)
            if not self.silent:
                print(f'{f"RESIDUAL_{m}": <15}', x.shape)
        
        x += head
        x = self.upconv_block1(x)
        if not self.silent:
            print(f'{"UPCONV_1 : ": <15}', x.shape)
        x = self.upconv_block2(x)
        if not self.silent:
            print(f'{"UPCONV_2 : ": <15}', x.shape)
        x = self.upconv_block3(x)
        if not self.silent:
            print(f'{"UPCONV_3 : ": <15}', x.shape)
        x = torch.nn.functional.interpolate(x, scale_factor=2, mode='nearest')
        if not self.silent:
            print(f'{"BILINEAR : ": <15}', x.shape)
        x = self.upconv_block4(x)
        if not self.silent:
            print(f'{"UPCONV_4 : ": <15}', x.shape)
        out = self.conv_block_out(x)
        if not self.silent:
            print(f'{"UPCONV_5 : ": <15}', out.shape)

        return out

In [None]:
upsampler = Upsampler([220, 8, 8], 2)

input = torch.rand(2, 220, 8, 8)
output = upsampler(input)

print(output.shape)

In [None]:
summary(upsampler, (220, 8, 8), 2, device='cpu')

Try using interpolation to upscale feature maps. Need to match shapes.

In [None]:
import torch
from torch.nn import functional as F

shape = (2, 220, 8, 8)

input = torch.rand(shape)
output1 = F.interpolate(input, scale_factor=2, mode='nearest')

print(input.shape, output1.shape)

### Load model from checkpoint and modify structure of Generator

In [None]:
# Compression
import torch
from src.helpers import utils
from compress import make_deterministic
from src.loss.perceptual_similarity import perceptual_loss as ps
from default_config import ModelModes

# Reproducibility
make_deterministic()
perceptual_loss_fn = ps.PerceptualLoss(model='net-lin', net='alex', use_gpu=torch.cuda.is_available())

# Load model
device = torch.device('cpu')#utils.get_device()
logger = utils.logger_setup(logpath=os.path.join('images', 'logs'), filepath=os.path.abspath('1'))
loaded_args, compression, _ = utils.load_model('experiments/hific_low.pt', logger, device, model_mode=ModelModes.EVALUATION,
    current_args_d=None, prediction=True, strict=False, silent=True)

#compression.Hyperprior.hyperprior_entropy_model.build_tables()

In [None]:
upsampler = Upsampler((220, 8, 8), 1)

In [None]:
compression.train(False)
compression.Generator = upsampler
compression.Generator.train(True)

In [None]:
input = torch.rand((1, 3, 128, 128))
output = compression(input)

print(f'OUTPUT SHAPE : {output[0].shape}')

### Now we got to train the network

For this purpose we need:

1. Set up a dataset
2. Run training script from hific repo

In [None]:
%pip install awscli

In [None]:
%pip install boto3

In [140]:
#pragma dataset init OPEN_IMAGES --size 128Gb

# TODO: fill dataset here
import boto3
from botocore.config import Config
from botocore import UNSIGNED

import tqdm

import tarfile

BUCKET = 'open-images-dataset'
KEY = 'tar/train_0.tar.gz'

s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))
metadata = s3.head_object(Bucket=BUCKET, Key=KEY)

with tqdm.tqdm(total=metadata['ContentLength'], unit="B", unit_scale=True) as pbar:
    s3.download_file(BUCKET, KEY, f'{DATASET_PATH}/train_0.tar.gz', Callback=lambda bytes_transfered: pbar.update(bytes_transfered))

print('Unpacking...')

tar = tarfile.open(f'{DATASET_PATH}/train_0.tar.gz')
tar.extractall(f'{DATASET_PATH}')
tar.close()
#!tar -xzf /home/jupyter/mnt/datasets/OPEN_IMAGES/train_0.tar.gz

print('Done.')

# Dataset will be created in /home/jupyter/mnt/datasets/OPEN_IMAGES

It's init dataset task. State result won't be merged.


100%|██████████| 49.3G/49.3G [15:59<00:00, 51.4MB/s]  


Unpacking...
Done.




Mounting dataset OPEN_IMAGES... /home/jupyter/mnt/datasets/OPEN_IMAGES


In [None]:
DATASET_PATH = '/home/jupyter/mnt/datasets/OPEN_IMAGES'
HOME_PATH = '/home/jupyter/work/resources/thesis'