<a href="https://colab.research.google.com/github/Redcxx/ucl-master-project/blob/master/pix2pix.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [30]:
#@title pip installs

%pip install pydrive2 > /dev/null
%pip install torchinfo > /dev/null

In [126]:
#@title imports

import sys
import os
import random
import functools
from datetime import datetime
from pathlib import Path
from pprint import pprint

import numpy as np
import torch
from torch import nn
from torchinfo import summary

In [32]:
#@title constants

RUN_ID = datetime.now().strftime('%Y-%m-%d-%A-%Hh-%Mm-%Ss')
RANDOM_SEED = 42
WORKING_FOLDER_NAME = 'MasterProject'  # will be created on google drive
PYDRIVE2_SETTING_FILE = 'settings.yaml'


print(f'RUN_ID: {RUN_ID}')
print(f'RANDOM_SEED: {RANDOM_SEED}')
print(f'WORKING_FOLDER_NAME: {WORKING_FOLDER_NAME}')

RUN_ID: 2022-05-25-Wednesday-13h-38m-05s
RANDOM_SEED: 42
WORKING_FOLDER_NAME: MasterProject


In [33]:
#@title colab & jupyter notebook settings for saving file

IN_COLAB = 'google.colab' in sys.modules

if IN_COLAB:
    from google.colab import files, drive

    drive_dir = '/content/drive'
    drive.mount(drive_dir)

    working_dir = os.path.join(drive_dir, 'My Drive', WORKING_FOLDER_NAME)
    Path(working_dir).mkdir(parents=True, exist_ok=True)  # create directory if not exists on google drive

    def save_file(file_name):
        files.download(file_name)  # save locally

        # save on google drive
        with open(file_name, 'rb') as src_file:
            with open(os.path.join(working_dir, file_name), 'wb') as dest_file:
                dest_file.write(src_file.read())

else:
    from pydrive2.auth import GoogleAuth
    from pydrive2.drive import GoogleDrive

    def ensure_folder_on_drive(drive, folder_name):
        folders = drive.ListFile({
            # see https://developers.google.com/drive/api/guides/search-files
            'q': "mimeType = 'application/vnd.google-apps.folder'"
        }).GetList()

        folders = filter(lambda folder: folder['title'] == folder_name, folders)

        if len(folders) == 1:
            return folders[0]
        
        if len(folders) > 1:
            pprint(folders)
            raise AssertionError('Multiple Folders of the same name detected')

        # folder not found, create a new one at root
        print(f'Folder: {folder_name} not found, creating at root')

        folder = drive.CreateFile({
            'title': folder_name, 
            # "parents": [{
            #     "kind": "drive#fileLink", 
            #     "id": parent_folder_id
            # }],
            "mimeType": "application/vnd.google-apps.folder"
        })
        folder.Upload()
        return folder


    g_auth = GoogleAuth(settings_file=PYDRIVE2_SETTING_FILE, http_timeout=None)
    g_auth.LocalWebserverAuth(host_name="localhost", port_numbers=None, launch_browser=True)
    drive = GoogleDrive(g_auth)

    folder = ensure_folder_on_drive(drive, WORKING_FOLDER_NAME)    

    def save_file(file_name):
        file = drive.CreateFile({
            'title': file_name,
            'parents': [{
                'id': folder['id']
            }]
        })
        file.SetContentFile(file_name)
        file.Upload()  # save to google drive
        file.GetContentFile(file_name)  # save locally    

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [34]:
#@title misc

# reproducibility
random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed(RANDOM_SEED)

# training device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)

Using device: cpu


In [35]:
!nvidia-smi

NVIDIA-SMI has failed because it couldn't communicate with the NVIDIA driver. Make sure that the latest NVIDIA driver is installed and running.



In [129]:
#@title Unet Block
class UnetBlock(nn.Module):
    def __init__(
        self, 
        in_filters, out_filters,
        submodule=None, sub_in_filters=None, sub_out_filters=None, sub_skip_connection=False, 
        skip_connection=True, 
        dropout=nn.Dropout, 
        in_norm=nn.BatchNorm2d, out_norm=nn.BatchNorm2d, 
        in_act=nn.LeakyReLU, out_act=nn.ReLU
    ):
        super().__init__()

        if submodule is None:
            sub_in_filters = in_filters
            sub_out_filters = in_filters
            sub_skip_connection = False
        

        layers = []

        # encoder
        layers.append(nn.Conv2d(in_channels=in_filters, out_channels=sub_in_filters, kernel_size=4, stride=2, padding=1))

        if in_norm:
            layers.append(in_norm(sub_in_filters))

        if in_act:
            layers.append(in_act())

        
        # submodule
        if submodule:
            layers.append(submodule)


        # decoder
        if sub_skip_connection:
            layers.append(nn.ConvTranspose2d(in_channels=sub_out_filters * 2, out_channels=out_filters, kernel_size=4, stride=2, padding=1))
        else:
            layers.append(nn.ConvTranspose2d(in_channels=sub_out_filters    , out_channels=out_filters, kernel_size=4, stride=2, padding=1))

        if out_norm:
            layers.append(out_norm(out_filters))
        
        if dropout:
            layers.append(dropout())
        
        if out_act:
            layers.append(out_act())
        
        self.model = nn.Sequential(*layers)

        self.skip_connection = skip_connection
    
    def forward(self, x):
        if self.skip_connection:
            return torch.cat([x, self.model(x)], dim=1)
        else:
            return self.model(x)

In [130]:
summary(
    UnetBlock(in_filters=64, out_filters=64, submodule=None), 
    input_size=(16, 64, 16, 16),
    col_names=['output_size', 'num_params', 'mult_adds']
)

Layer (type:depth-idx)                   Output Shape              Param #                   Mult-Adds
UnetBlock                                --                        --                        --
├─Sequential: 1-1                        [16, 64, 16, 16]          --                        --
│    └─Conv2d: 2-1                       [16, 64, 8, 8]            65,600                    67,174,400
│    └─BatchNorm2d: 2-2                  [16, 64, 8, 8]            128                       2,048
│    └─LeakyReLU: 2-3                    [16, 64, 8, 8]            --                        --
│    └─ConvTranspose2d: 2-4              [16, 64, 16, 16]          65,600                    268,697,600
│    └─BatchNorm2d: 2-5                  [16, 64, 16, 16]          128                       2,048
│    └─Dropout: 2-6                      [16, 64, 16, 16]          --                        --
│    └─ReLU: 2-7                         [16, 64, 16, 16]          --                        --
Total para

In [135]:
#@title Generator

class Generator(nn.Module):

    def __init__(self, config):
        super().__init__()

        # dependency injection
        batch_norm = functools.partial(nn.BatchNorm2d, affine=True, track_running_stats=True)
        relu = functools.partial(nn.ReLU, inplace=True)
        leaky_relu = functools.partial(nn.LeakyReLU, inplace=True, negative_slope=0.2)
        dropout = functools.partial(nn.Dropout, p=0.5)
        tahn = nn.Tanh
        
        # build model recursively inside-out
        blocks = config['blocks'][::-1]  

        self.model = None
        for i, layer in enumerate(blocks):
            self.model = UnetBlock(
                in_filters=layer['filters'], 
                out_filters=layer['filters'],

                # the inner module
                submodule=self.model, 
                sub_in_filters=blocks[i-1]['filters'] if i > 0 else None, 
                sub_out_filters=blocks[i-1]['filters'] if i > 0 else None,
                sub_skip_connection=blocks[i-1]['skip_connection'],

                skip_connection=blocks[i-1]['skip_connection'],
                dropout=dropout if layer['dropout'] else None,
                in_norm=batch_norm if i!=0 else None,
                out_norm=batch_norm,
                in_act=leaky_relu if i!=0 else relu,
                out_act=relu
            )
        
        # build outermost block
        self.model = UnetBlock(
            in_filters=config['in_channels'],
            out_filters=config['out_channels'],

            submodule=self.model, 
            sub_in_filters=blocks[-1]['filters'] if len(blocks) > 0 else None, 
            sub_out_filters=blocks[-1]['filters'] if len(blocks) > 0 else None,
            sub_skip_connection=blocks[-1]['skip_connection'],

            skip_connection=False, 
            dropout=None,
            in_norm=None, out_norm=None,
            in_act=leaky_relu, out_act=tahn
        )

    def forward(self, x):
        return self.model(x)

In [136]:
unet_config = {
    'in_channels': 3,
    'out_channels': 3,
    'blocks': [{
        'filters': 64,
        'dropout': False,
        'skip_connection': True
    }, {
        'filters': 128,
        'dropout': False,
        'skip_connection': True
    }, {
        'filters': 256,
        'dropout': False,
        'skip_connection': True
    }, {
        'filters': 512,
        'dropout': False,
        'skip_connection': True
    }, {
        'filters': 512,
        'dropout': True,
        'skip_connection': True
    }, {
        'filters': 512,
        'dropout': True,
        'skip_connection': True
    }, {
        'filters': 512,
        'dropout': True,
        'skip_connection': False
    }]
}

summary(
    Generator(unet_config),
    input_size=(16, 3, 256, 256),
    col_names=['output_size', 'num_params', 'mult_adds'],
    depth=24
)

Layer (type:depth-idx)                                                                                         Output Shape              Param #                   Mult-Adds
Generator                                                                                                      --                        --                        --
├─UnetBlock: 1-1                                                                                               [16, 3, 256, 256]         --                        --
│    └─Sequential: 2-1                                                                                         [16, 3, 256, 256]         --                        --
│    │    └─Conv2d: 3-1                                                                                        [16, 64, 128, 128]        3,136                     822,083,584
│    │    └─LeakyReLU: 3-2                                                                                     [16, 64, 128, 128]        --               

In [None]:
class Discriminator(nn.Module):
    def __init__(self, in_channels, n_filters, n_layers=3, norm='batch', init_type='normal', init_gain=0.0):
        super().__init__()

        


