In [9]:
import torchvision.datasets as datasets
import torch
from torch import nn
import ml_collections
from tqdm import tqdm

# Download dataset
dataset = datasets.Flowers102(root='./', download=True)

### Model Implementations:

In [369]:
# List of available pretrained resnets from pytorch
class Pretrains():
    resnet_versions = [
        'resnet18',
        'resnet34',
        'resnet50',
        'resnet101',
        'resnet152'
    ]
    vgg_versions = [
        'vgg11',
        'vgg11_bn',
        'vgg13',
        'vgg13_bn',
        'vgg16',
        'vgg16_bn',
        'vgg19',
        'vgg19_bn'
    ]

class PretrainBackbone(nn.Module):
    def __init__(self, config):
        super(ResNetBackbone, self).__init__()
        
        # Load pretrained ResNet/VGG backbone
        if config.pretrain in Pretrains.resnet_versions or config.resnet_version in Pretrains.vgg_versions:
            model = torch.hub.load('pytorch/vision:v0.10.0', config.pretrain, pretrained=True)
        else:
            raise ValueError('Invalid ResNet/VGG Version. Please select from: ' 
                             + ', '.join(Pretrains.resnet_versions + Pretrains.vgg_versions))
        
        # Segments out only the backbone layers as list, unpacks, and load into nn.Sequential
        backbone_layers = list(model.children())[:-1]
        self.backbone = nn.Sequential(*backbone_layers)
        
    def forward(self, x):
        x = self.backbone(x)
        return x

class ActivationFunction(nn.Module):
    def __init__(self, config):
        super(ActivationFunction, self).__init__()
        match config.type:
            case 'LeakyReLU':
                self.activation_func = nn.LeakyReLU(
                    config.negative_slope,
                    inplace = True
                )
            case 'ReLU':
                self.activation_func = nn.ReLU(inplace = True)
            case 'Softmax':
                self.activation_func = nn.Softmax(dim = self.dim)
            case _:
                raise ValueError('Invalid activation function or not implemented')
    def forward(self, x):
        return self.activation_func(x)

class Conv2dBlock(nn.Module):
    def __init__(self, config):
        super(Conv2dBlock, self).__init__()
        modules = []
        if config.layer_num < 1:
            raise ValueError('Number of layers cannot be less than 1')
        for layer_idx in range(config.layer_num):
            # Conv2d
            modules.append(nn.Conv2d(
                config.in_channels if not layer_idx else config.out_channels,
                config.out_channels,
                kernel_size = 3,
                padding = 1
            ))
            
            # Batch Normalization
            if config.use_batchnorm:
                modules.append(nn.BatchNorm2d(config.out_channels))
                
            # Activation function, skip this step if skip_last_activation is True
            if config.skip_last_activation and layer_idx == config.layer_num - 1:
                break   
            modules.append(ActivationFunction(config.activation_func))
        self.sequential = nn.Sequential(*modules)
        
    def forward(self, x):
        return self.sequential(x)

# Creates a mirrored Conv2dBlock
class RevConv2dBlock(nn.Module):
    def __init__(self, conv2d_block):
        super(RevConv2dBlock, self).__init__()
        
        # Reverses module from conv2d_block
        modules = list(conv2d_block.sequential)
        modules.reverse()
        module_iterator = iter(range(len(modules)))
        for idx in module_iterator:
            if isinstance(modules[idx], torch.nn.modules.batchnorm.BatchNorm2d):
                
                # Switch order of batch and conv2d
                modules[idx], modules[idx + 1] = modules[idx + 1], modules[idx]
                
                # Swap conv2d with convtranspose2d
                modules[idx] = nn.ConvTranspose2d(
                    modules[idx].out_channels,
                    modules[idx].in_channels,
                    kernel_size = modules[idx].kernel_size,
                    stride = modules[idx].stride,
                    padding = modules[idx].padding
                )
                
                modules[idx + 1] = nn.BatchNorm2d(modules[idx].out_channels)
                
                # Skip next index
                next(module_iterator)
            
        if isinstance(modules[0], ActivationFunction):
            activation_func = modules.pop(0)
            modules.append(activation_func)
            
        self.sequential = nn.Sequential(*modules)
        
    def forward(self, x):
        return self.sequential(x)
    
class VGGBackboneBlock(nn.Module):
    def __init__(self, config):
        super(VGGBackboneBlock, self).__init__()
        config.skip_last_activation = False
        
        # Conv2d
        self.conv2d_block = Conv2dBlock(config)
        
        # Maxpool
        self.maxpool = nn.MaxPool2d(
            kernel_size=config.compression_ratio, 
            stride=config.compression_ratio
        )
    
    def forward(self, x):
        out = self.conv2d_block(x)
        out = self.maxpool(out)
        return out
    
    def get_reverse(self):
        return RevVGGBackconeBlock(self)
    
class RevVGGBackconeBlock(nn.Module):
    def __init__(self, vgg_backbone_block):
        super(VGGBackboneBlock).__init__()
        # To be implemented
    def forward(self, x):
        # To be implemented
        return x
    
class ResidualBlock(nn.Module):
    def __init__(self, config):
        super(ResidualBlock, self).__init__()
        
        # Main Conv2d block
        main_block_config = config
        main_block_config.layer_num = config.main_layer_num
        main_block_config.skip_last_activation = True
        self.main_block = Conv2dBlock(main_block_config)
        
        # Shortcut Conv2d block, we leave self.shortcut_block as undefined if shortcut layer depth = 0
        if config.shortcut_layer_num:
            shortcut_block_config = config
            shortcut_block_config.layer_num = config.shortcut_layer_num
            shortcut_block_config.skip_last_activation = True
            self.shortcut_block = Conv2dBlock(shortcut_block_config)
            
        self.activation_func = ActivationFunction(config.activation_func)
        
        # Optional maxpooling layer if compression_ratio is set
        if hasattr(config, 'compression_ratio'):
            self.maxpool = nn.MaxPool2d(
                kernel_size=config.compression_ratio, 
                stride=config.compression_ratio
        )
    
    def forward(self, x):
        out = self.main_block(x)
        if hasattr(self, 'shortcut_block'):
            out += self.shortcut_block(x)
        else:
            out += x
            
        out = self.activation_func(out)
            
        if hasattr(self, 'maxpool'):
            out = self.maxpool(out)
            
        return out

    def get_reverse(self):
        # Get reversed version
        return RevResidualBlock(self)

class RevResidualBlock(nn.Module):
    def __init__(self, residual_block):
        super(RevResidualBlock, self).__init__()
        self.main_block = RevConv2dBlock(residual_block.main_block)
        
        if hasattr(residual_block, 'shortcut_block'):
            self.shortcut_block = RevConv2dBlock(residual_block.shortcut_block)
            
        if hasattr(residual_block, 'maxpool'):
            self.upsample = nn.Upsample(scale_factor=residual_block.maxpool.stride)
            
    def forward(self, x):
        if hasattr(self, 'upsample'):
            x = self.upsample(x)
        else:
            x = x
            
        out = self.main_block(x)
        
        if hasattr(self, 'shortcut_block'):
            out += self.shortcut_block(x)
        else:
            out += x
        return out
    
class Encoder(nn.Module):
    def __init__(self, config):
        super(Encoder, self).__init__()
        modules = []
        match config.type:
            case 'residual_blocks':
                for idx, block_feature in enumerate(config.features):
                    if not idx:
                        in_channels = config.in_channels
                        out_channels = block_feature
                    else:
                        in_channels = config.features[idx - 1]
                        out_channels = block_feature

                    block_config = ml_collections.ConfigDict({
                        'main_layer_num': config.main_layer_num,
                        'shortcut_layer_num': config.shortcut_layer_num,
                        'in_channels': in_channels,
                        'out_channels': out_channels,
                        'use_batchnorm': config.use_batchnorm,
                        'activation_func': config.activation_func,
                    })
                    if hasattr(config, 'compression_ratio'):
                        block_config.compression_ratio = config.compression_ratio

                    modules.append(ResidualBlock(block_config))
            case 'vgg_backbone_blocks':
                # To be implemented
                raise NotImplementedError('To be implemented')
                
        self.sequential = nn.Sequential(*modules)
    def forward(self, x):
        return self.sequential(x)
        
class Decoder(nn.Module):
    def __init__(self, arg):
        super(Decoder, self).__init__()
        # Initialize by mirroring encoder
        if isinstance(arg, Encoder):
            encoder = arg
            modules = list(encoder.sequential)
            modules.reverse()
            
            for idx in range(len(modules)):
                modules[idx] = modules[idx].get_reverse()
            self.sequential = nn.Sequential(*modules)
        # Initialize by config (not implemented since we are using mirrored encoder/decoder)
        else:
            raise NotImplementedError('This decoder class is only implemented to be initialized by mirroring an encoder class')
    def forward(self, x):
        return self.sequential(x)
    
class AutoEncoder(nn.Module):
    def __init__(self, config):
        super(AutoEncoder, self).__init__()
        # Encoder
        encoder_config = config.encoder_config
        encoder_config.in_channels = config.in_channels
        self.encoder = Encoder(encoder_config)
        
        # Check for bottleneck input size by passing dummy input to encoder
        dummy_input = torch.randn(1, config.in_channels, config.in_dimension[0], config.in_dimension[1])
        out = self.encoder.forward(dummy_input)
        out_dimension = list(out.size())
        in_bottleneck = out_dimension[1] * out_dimension[2] * out_dimension[3]
        
        # Bottleneck
        self.bottleneck = nn.Linear(in_bottleneck, config.bottleneck_width)
        
        # Decoder
        self.decoder = Decoder(self.encoder)
        
    def forward(self, x):
        out = self.encoder(x)
        encoder_out_shape = out.size() 
        flatten = out.view(out.size(0), -1)
        out = self.bottleneck(flatten)
        reshaped = out.view(out.size()[0], out.size()[1], 1, 1)
        out = self.decoder(reshaped)
        
        return out

### Test:

In [370]:
config_dict = {
    'in_dimension': (224, 224),
    'in_channels': 3,
    'encoder_config': {
        'type': 'residual_blocks',
        'compression_ratio': 2,
        'features': [64, 128, 256, 512, 512, 512],
        'main_layer_num': 3,
        'shortcut_layer_num': 1,
        'use_batchnorm': True,
        'activation_func': {
            'type': 'LeakyReLU',
            'negative_slope': 0.1
        },
    },
    'decoder_config': {
        'mirror_encoder': True
    },
    'bottleneck_width': 512
}
test_config = ml_collections.ConfigDict(config_dict)

autoencoder = AutoEncoder(test_config)
autoencoder.forward(torch.randn(1, 3, 224, 224))

tensor([[[[ 0.5516, -0.2068, -0.4680,  ..., -0.8531, -0.4450,  0.1232],
          [ 0.8301,  0.7162,  0.5984,  ..., -1.5186, -0.2426, -0.2329],
          [ 0.8712,  1.0302,  1.0262,  ..., -0.5684,  0.7226,  0.5514],
          ...,
          [ 1.0588,  0.4694,  0.0622,  ...,  1.3015,  1.8702,  1.1376],
          [ 1.1240,  0.0624, -0.1162,  ...,  1.1003,  1.2729,  1.0246],
          [ 1.2998,  0.0777,  0.1123,  ...,  0.6273,  0.9946,  0.2985]],

         [[ 0.0466,  0.8038,  0.8825,  ...,  0.8605,  0.5045,  0.8763],
          [ 0.4250, -0.4446, -0.6750,  ...,  0.5111, -0.1032,  0.2369],
          [-0.1207, -0.9937, -0.6350,  ...,  0.9737, -0.2861,  0.0639],
          ...,
          [ 1.1805,  1.4365,  1.4348,  ..., -0.0631, -0.1165,  0.6880],
          [ 0.2076,  0.7495,  0.7422,  ...,  0.1660, -0.2923,  0.7185],
          [-0.1095,  0.4830,  0.3474,  ..., -0.5860, -0.7834,  0.4724]],

         [[-0.7530, -0.1011, -0.5831,  ..., -1.1542, -1.1337, -0.6581],
          [ 0.0807,  0.0728, -

### WIP Stuffs

In [None]:
def get_all_configs(training_config):
    config_list = []
    configs_num = len(training_config.pretrains) \
        * len(training_config.input_sizes) \
        * len(training_config.autoencoder.depths) \
        * len(training_config.autoencoder.widths) \
        * len(training_config.autoencoder.activations) \
        * len(training_config.autoencoder.bottlenecks) \
        * len(training_config.ffn.depths) \
        * len(training_config.ffn.widths) \
        * len(training_config.ffn.activations) \
        * len(training_config.ffn.dropouts)
    pbar = tqdm('Generating Configs', total = configs_num, position = 0, leave = True)
    # Pretrain hyperparameters
    for pretrain in training_config.pretrains:
        
        for input_size in training_config.input_sizes:
            # Encoder hyperparameters
            for autoencoder_depth in training_config.autoencoder.depths:
                for autoencoder_width in training_config.autoencoder.widths:
                    for autoencoder_activation in training_config.autoencoder.activations:
                        for autoencoder_bottleneck in training_config.autoencoder.bottlenecks:

                            # FFN hyperparameters
                            for ffn_depth in training_config.ffn.depths:
                                for ffn_width in training_config.ffn.widths:
                                    for ffn_activation in training_config.ffn.activations:
                                        for ffn_dropout in training_config.ffn.dropouts:
                                            # Initialize config dict
                                            config = ml_collections.ConfigDict()

                                            # Input
                                            config.input_size = input_size

                                            # Pretrain Network
                                            config.pretrain = pretrain

                                            # Encoder/Decoder Network (both use the same structure)
                                            config.autoencoder = ml_collections.ConfigDict()
                                            config.autoencoder.depth = autoencoder_depth
                                            config.autoencoder.width = autoencoder_width
                                            config.autoencoder.activation = autoencoder_activation
                                            config.autoencoder.bottleneck = autoencoder_bottleneck
                                            config.autoencoder.output_size =  input_size

                                            config.autoencoder.max_epoch = training_config.autoencoder.max_epoch

                                            # FFN
                                            config.ffn = ml_collections.ConfigDict()
                                            config.ffn.depth = ffn_depth
                                            config.ffn.width = ffn_width
                                            config.ffn.activation = ffn_activation
                                            config.ffn.dropout = ffn_dropout

                                            # Output
                                            config.num_classes = training_config.num_classes

                                            config.max_epoch = training_config.max_epoch

                                            config_list.append(config)
                                            pbar.update(1)
    return config_list

training_config = ml_collections.ConfigDict()
training_config.input_sizes = [(128), (224, 224), (360, 360)]
training_config.pretrains = [None]

training_config.autoencoder = ml_collections.ConfigDict()
training_config.autoencoder.depths = [4, 8, 16, 32]
training_config.autoencoder.widths = [64, 128, 256, 512]
training_config.autoencoder.activations = ['ReLU', 'LeakyReLU']
training_config.autoencoder.bottlenecks = [8, 16, 32, 64, 128]
training_config.autoencoder.max_epoch = 100

training_config.ffn = ml_collections.ConfigDict()
training_config.ffn.depths = [None]
training_config.ffn.widths = [None]
training_config.ffn.activations = [None]
training_config.ffn.dropouts = [None]

training_config.num_classes = None
training_config.max_epoch = 100

training_configs = get_all_configs(training_config)
training_configs[0]

autoencoder_configs = ml_collections.ConfigDict()
autoencoder_configs.autoencoder = ml_collections.ConfigDict()
autoencoder_configs.autoencoder.layer_sizes = [
    [(224, 224, 64), (112, 112, 128), (56, 56, 256), (28, 28, 512), (14, 14, 512), (7, 7, 512)],
    [(224, 224, 64), (112, 112, 128), (56, 56, 256), (28, 28, 512), (14, 14, 512)],
    [(224, 224, 32), (112, 112, 64), (56, 56, 128), (28, 28, 256), (14, 14, 512), (7, 7, 512)],
    [(224, 224, 32), (112, 112, 64), (56, 56, 128), (28, 28, 256), (14, 14, 512)],
    [(224, 224, 16), (112, 112, 32), (56, 56, 64), (28, 28, 128), (14, 14, 256), (7, 7, 512), (3, 3, 512)],
    [(224, 224, 16), (112, 112, 32), (56, 56, 64), (28, 28, 128), (14, 14, 256), (7, 7, 512)],
    [(224, 224, 16), (112, 112, 32), (56, 56, 64), (28, 28, 128), (14, 14, 256)],
]

In [5]:
training_config = ml_collections.ConfigDict()
training_config.input_sizes = [(224, 224)]
training_config.pretrains = Pretrains.resnet_versions + Pretrains.vgg_versions

training_config.encoder_decoder = ml_collections.ConfigDict()
training_config.encoder_decoder.depths = [4, 8, 16]
training_config.encoder_decoder.widths = [64, 256, 512]
training_config.encoder_decoder.activations = ['ReLU']
training_config.encoder_decoder.bottlenecks = [16, 32, 64, 128]
training_config.encoder_decoder.max_epoch = 100

training_config.ffn = ml_collections.ConfigDict()
training_config.ffn.depths = [None]
training_config.ffn.widths = [None]
training_config.ffn.activations = [None]
training_config.ffn.dropouts = [None]

training_config.num_classes = None
training_config.max_epoch = 100

training_configs = get_all_configs(training_config)
training_configs[0]
import torchvision.models as models
resnet = models.resnet50(pretrained=True)

100%|██████████████████████████████████████████████████████████████████████████████| 468/468 [00:00<00:00, 7173.71it/s]
