In [2]:
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.checkpoint as cp
from collections import OrderedDict

In [3]:
import torch
import torch.nn as nn


class EnhancedInceptionModule(nn.Module):
    def __init__(self, input_data_depth, output_data_depth, number_of_convolution_filters=32, max_kernel_size=7,
                 dimensions_of_convolution=2):
        super().__init__()
        # The Member values are functions used in the inception, the convolution, max pool, and 1x1 convolution
        # each made with three versions to handle one, two, and three dimensional data,

        self.input_data_depth = input_data_depth
        self.output_data_depth = output_data_depth
        self.number_of_convolution_filters = number_of_convolution_filters
        self.max_kernel_size = max_kernel_size
        self.dimensions_of_convolution = dimensions_of_convolution

        # Convolution layers,

        self.convolution_1d = nn.Conv1d(in_channels=number_of_convolution_filters,
                                        out_channels=number_of_convolution_filters, kernel_size=2, padding='same',
                                        bias=False)
        self.convolution_2d = nn.Conv2d(in_channels=number_of_convolution_filters,
                                        out_channels=number_of_convolution_filters, kernel_size=2, padding='same',
                                        bias=False)
        self.convolution_3d = nn.Conv3d(in_channels=number_of_convolution_filters,
                                        out_channels=number_of_convolution_filters, kernel_size=2, padding='same',
                                        bias=False)

        # 1x1 Convolution layers,

        self.convolution_1d_1x1 = nn.Conv1d(in_channels=input_data_depth,
                                            out_channels=number_of_convolution_filters, kernel_size=1, padding='same',
                                             bias=False)
        self.convolution_2d_1x1 = nn.Conv2d(in_channels=input_data_depth,
                                            out_channels=number_of_convolution_filters, kernel_size=1, padding='same',
                                             bias=False)
        self.convolution_3d_1x1 = nn.Conv3d(in_channels=input_data_depth,
                                            out_channels=number_of_convolution_filters, kernel_size=1, padding='same',
                                             bias=False)

        # Max pooling layers,

        self.max_pool_1d = nn.MaxPool1d(kernel_size=3, stride=1, padding=1)
        self.max_pool_2d = nn.MaxPool2d(kernel_size=3, stride=1, padding=1)
        self.max_pool_3d = nn.MaxPool3d(kernel_size=3, stride=1, padding=1)

        self.max_kernel_size = max_kernel_size
        self.dimensions_of_convolution = dimensions_of_convolution

    def forward(self, input_data):

        if self.dimensions_of_convolution == 1:
            convolution = self.convolution_1d
            max_pool = self.max_pool_1d
            convolution_1x1 = self.convolution_1d_1x1

        elif self.dimensions_of_convolution == 2:
            convolution = self.convolution_2d
            max_pool = self.max_pool_2d
            convolution_1x1 = self.convolution_2d_1x1

        elif self.dimensions_of_convolution == 3:
            convolution = self.convolution_3d
            max_pool = self.max_pool_3d
            convolution_1x1 = self.convolution_3d_1x1       
        

        # an else statement to handle invalid dimensions, if the variable dimensions_of_convolution
        # is not equal to 1 or 2 or 3.

        else:
            convolution_dimensional_error = "Invalid convolution dimensions."
            return convolution_dimensional_error

        convolution_1x1.in_channels = input_data.size(dim=1)

        # The Max pool branch from the inception module (future reminder: put a link to the enhanced inception here)

        convolution_input = convolution_1x1(input_data)
        collective_data = convolution_1x1(input_data)

        # Preparing the data for the iteration of convolution operation.

        max_pool_output = max_pool(input_data)
        convolution_1x1_output = convolution_1x1(max_pool_output)
        collective_data = torch.cat((collective_data, convolution_1x1_output), 1)

        # Iterating the convolution operation over the data (max_kernel_size - 1) times,
        # Explanation:
        #
        # Tensor height/width after convolution = (height/width before - kernel height/width)/ stride + 1
        # more concisely H/W_out = (H/W_in - H/W_kernel)/s +1
        # at H/W_kernel = 2, s = 1
        #
        # H/W_out = ((H/W_in - 2)/ 1 + 1
        #
        # H/W_out = H/W_in - 1
        #
        # Therefore, to do a 7x7 convolution, where a 7x7 partition of the tensor is reduced
        # to a single 1x1 square, requires 6 2x2 convolutions, and a 5x5 requires 4,
        # a 3x3 requires 2.
        #
        # and a result, it is self-evident from the general pattern beforehand established,
        # that a convolution of Kernel size K, requires K-1 (2x2) convolutions.
        # ( or 1x2 or 2x2x2 for one-dimensional and three-dimensional convolutions.

        for i in range(self.max_kernel_size - 1):
          convolution.in_channels = collective_data.size(dim=1)
          convolution_output = convolution(convolution_input)
          collective_data = torch.cat((collective_data, convolution_output), 1)
          convolution_input = convolution_output

        # Applying 1x1 convolution to change the depth of the collective_data as to match the
        # desired output depth.

        if self.dimensions_of_convolution == 1:
            output_1x1_convolution = nn.Conv1d(in_channels=collective_data.size(dim=2),
                                               out_channels=self.output_data_depth, kernel_size=1, padding='same',
                                               bias=False)
            return output_1x1_convolution(collective_data)

        elif self.dimensions_of_convolution == 2:
            output_1x1_convolution = nn.Conv2d(in_channels=collective_data.size(dim=2),
                                               out_channels=self.output_data_depth, kernel_size=1, padding='same',
                                               bias=False)
            return output_1x1_convolution(collective_data)

        else:
            output_1x1_convolution = nn.Conv3d(in_channels=collective_data.size(dim=2),
                                               out_channels=self.output_data_depth, kernel_size=1, padding='same',
                                               bias=False)
            return output_1x1_convolution(collective_data)


In [4]:
def _batch_norm_function_factory(batch_norm, relu, convolution):
    def batch_norm_function(*inputs):
        concatenated_features = torch.cat(inputs, 1)
        bottleneck_output = convolution(relu(batch_norm(concatenated_features)))
        return bottleneck_output

    return batch_norm_function

In [5]:
class DenseLayer(nn.Module):
    def __init__(self, number_of_input_features, growth_rate, batch_norm_size, drop_rate,
                 model_dimensions_of_convolution, efficient=False):
        super(DenseLayer, self).__init__()
        self.number_of_input_features = number_of_input_features
        self.growth_rate = growth_rate
        self.batch_norm_size = batch_norm_size
        self.drop_rate = drop_rate
        self.model_dimensions_of_convolution = model_dimensions_of_convolution
        self.efficient = efficient
        
        self.add_module('batch_norm_1_1d', nn.BatchNorm1d(number_of_input_features)),
        self.add_module('batch_norm_1_2d', nn.BatchNorm2d(number_of_input_features)),
        self.add_module('batch_norm_1_3d', nn.BatchNorm3d(number_of_input_features)),

        self.add_module('relu1', nn.ReLU(inplace=True)),

        self.add_module('convolution_1d_1x1', nn.Conv1d(number_of_input_features, batch_norm_size * growth_rate,
                                                        kernel_size=1, stride=1, bias=False)),
        self.add_module('convolution_2d_1x1', nn.Conv2d(number_of_input_features, batch_norm_size * growth_rate,
                                                        kernel_size=1, stride=1, bias=False)),
        self.add_module('convolution_3d_1x1', nn.Conv3d(number_of_input_features, batch_norm_size * growth_rate,
                                                        kernel_size=1, stride=1, bias=False)),

        self.add_module('batch_norm_2_1d', nn.BatchNorm1d(batch_norm_size * growth_rate)),
        self.add_module('batch_norm_2_2d', nn.BatchNorm2d(batch_norm_size * growth_rate)),
        self.add_module('batch_norm_2_3d', nn.BatchNorm3d(batch_norm_size * growth_rate)),

        self.add_module('relu2', nn.ReLU(inplace=True)),

        self.add_module('inception',
                        EnhancedInceptionModule(
                            input_data_depth=batch_norm_size * growth_rate,
                            output_data_depth=growth_rate,
                            number_of_convolution_filters=16,
                            max_kernel_size=11,
                            dimensions_of_convolution=model_dimensions_of_convolution)),
        self.drop_rate = drop_rate
        self.efficient = efficient
        self.model_dimensions_of_convolution = model_dimensions_of_convolution

    def forward(self, *previous_features):

        batch_norm_function = _batch_norm_function_factory(batch_norm=self.batch_norm_1_2d,
                                                           relu=self.relu1,
                                                           convolution=self.convolution_2d_1x1)

        if self.model_dimensions_of_convolution == 1:
            batch_norm_function = _batch_norm_function_factory(batch_norm=self.batch_norm_1_1d,
                                                               relu=self.relu1,
                                                               convolution=self.convolution_1d_1x1)
        elif self.model_dimensions_of_convolution == 3:
            batch_norm_function = _batch_norm_function_factory(batch_norm=self.batch_norm_1_3d,
                                                               relu=self.relu1,
                                                               convolution=self.convolution_3d_1x1)

        if self.efficient and any(previous_feature.requires_grad for previous_feature in previous_features):
            bottleneck_output = cp.checkpoint(batch_norm_function, *previous_features)
        else:
            bottleneck_output = batch_norm_function(*previous_features)

        new_features = self.inception(self.relu2(self.batch_norm_2_2d(bottleneck_output)))
        
        if self.model_dimensions_of_convolution == 1:
            new_features = self.inception(self.relu2(self.batch_norm_2_1d(bottleneck_output)))
            
        elif self.model_dimensions_of_convolution == 3:
            new_features = self.inception(self.relu2(self.batch_norm_2_3d(bottleneck_output)))
           
        if self.drop_rate > 0:
            new_features = F.dropout(new_features, p=self.drop_rate, training=self.training)
        return new_features

In [6]:
class _Transition(nn.Module):
    def __init__(self, number_of_input_features, number_of_output_features, model_dimensions_of_convolution):
        super(_Transition, self).__init__()
        self.number_of_input_features = number_of_input_features
        self.number_of_output_features = number_of_output_features
        self.model_dimensions_of_convolution = model_dimensions_of_convolution

        self.add_module('relu', nn.ReLU(inplace=True))

        self.add_module('convolution_1x1_1d', nn.Conv1d(number_of_input_features, number_of_output_features,
                                                        kernel_size=1, stride=1, bias=False))
        self.add_module('convolution_1x1_2d', nn.Conv2d(number_of_input_features, number_of_output_features,
                                                        kernel_size=1, stride=1, bias=False))
        self.add_module('convolution_1x1_3d', nn.Conv3d(number_of_input_features, number_of_output_features,
                                                        kernel_size=1, stride=1, bias=False))

        self.add_module('average_pool_1d', nn.AvgPool1d(kernel_size=2, stride=2))
        self.add_module('average_pool_2d', nn.AvgPool2d(kernel_size=2, stride=2))
        self.add_module('average_pool_3d', nn.AvgPool3d(kernel_size=2, stride=2))

    def forward(self, input_data):
        if self.model_dimensions_of_convolution == 1:
          transition_batch_norm = nn.BatchNorm1d(input_data.size(dim=1))
          return self.average_pool_1d(self.convolution_1x1_1d(self.relu(transition_batch_norm(input_data))))
        elif self.model_dimensions_of_convolution == 2:
          transition_batch_norm = nn.BatchNorm1d(input_data.size(dim=1))
          return self.average_pool_2d(self.convolution_1x1_2d(self.relu(transition_batch_norm(input_data))))
        elif self.model_dimensions_of_convolution == 3:
          transition_batch_norm = nn.BatchNorm1d(input_data.size(dim=1))
          return self.average_pool_3d(self.convolution_1x1_3d(self.relu(transition_batch_norm(input_data))))
        # an else statement to handle invalid dimensions, if the variable dimensions_of_convolution
        # is not equal to 1 or 2 or 3.

        else:
            convolution_dimensional_error = "Invalid convolution dimensions."
            return convolution_dimensional_error

In [7]:
class _DenseBlock(nn.Module):
    def __init__(self, number_of_layers, number_of_input_features, batch_norm_size, growth_rate,
                 drop_rate, model_dimensions_of_convolution, efficient=False):
        super(_DenseBlock, self).__init__()
        self.number_of_layers = number_of_layers
        self.number_of_input_features = number_of_input_features
        self.batch_norm_size = batch_norm_size
        self.growth_rate = growth_rate
        self.drop_rate = drop_rate
        self.model_dimensions_of_convolution = model_dimensions_of_convolution
        self.efficient = efficient
        self.dense_layer = DenseLayer(number_of_input_features=number_of_input_features,
                                       growth_rate=growth_rate, batch_norm_size=batch_norm_size,
                                       model_dimensions_of_convolution=model_dimensions_of_convolution,
                                       drop_rate=drop_rate, efficient=efficient
                                       )
        self.number_of_layers = number_of_layers

    def forward(self, initial_features):
        features = [initial_features]
        for name, layer in self.named_children():
            new_features = layer(*features)
            features.append(new_features)
        return torch.cat(features, 1)

    """
    def forward(self, initial_features):
        features = torch.tensor(initial_features)
        for i in range(self.number_of_layers):
            new_features = self.dense_layer(features)
            features = torch.cat((features, new_features), dim=1)
            self.dense_layer.number_of_input_features += i * self.growth_rate
        return features
    """

In [8]:
class ConvolutionalClassifier(nn.Module):
    def __init__(self, classifier_input_channels, number_of_classes,
                 model_dimensions_of_convolution, activation_function=nn.ReLU()):
        super(ConvolutionalClassifier, self).__init__()

        self.classifier_input_channels = classifier_input_channels
        self.number_of_classes = number_of_classes
        self.model_dimensions_of_convolution = model_dimensions_of_convolution
        self.activation_function = activation_function

        assert 1 <= model_dimensions_of_convolution <= 3, 'model_dimensions_of_convolution should be between 1 and 3'

        self.convolutional_classifier_1d = nn.Sequential(
            nn.Conv1d(self.classifier_input_channels, 1024, kernel_size=1), activation_function,
            nn.Conv1d(1024, 512, kernel_size=1), activation_function,
            nn.Conv1d(512, 256, kernel_size=1), activation_function,
            nn.Conv1d(256, 128, kernel_size=1), activation_function,
            nn.Conv1d(128, number_of_classes, kernel_size=1), activation_function,
        )
        self.convolutional_classifier_2d = nn.Sequential(
            nn.Conv2d(self.classifier_input_channels, 1024, kernel_size=1), activation_function,
            nn.Conv2d(1024, 512, kernel_size=1), activation_function,
            nn.Conv2d(512, 256, kernel_size=1), activation_function,
            nn.Conv2d(256, 128, kernel_size=1), activation_function,
            nn.Conv2d(128, number_of_classes, kernel_size=1), activation_function,
        )
        self.convolutional_classifier_3d = nn.Sequential(
            nn.Conv3d(self.classifier_input_channels, 1024, kernel_size=1), activation_function,
            nn.Conv3d(1024, 512, kernel_size=1), activation_function,
            nn.Conv3d(512, 256, kernel_size=1), activation_function,
            nn.Conv3d(256, 128, kernel_size=1), activation_function,
            nn.Conv3d(128, number_of_classes, kernel_size=1), activation_function,
        )

    def forward(self, input_data):
        if self.model_dimensions_of_convolution == 1:
            return self.convolutional_classifier_1d(input_data)
        elif self.model_dimensions_of_convolution == 2:
            return self.convolutional_classifier_2d(input_data)
        else:
            return self.convolutional_classifier_3d(input_data)


class DenseNeXt(nn.Module):
    r"""DenseNet-BC model class, based on
    `"Densely Connected Convolutional Networks" <https://arxiv.org/pdf/1608.06993.pdf>`
    Args:
        growth_rate (int) - how many filters to add each layer (`k` in paper)
        dense_block_configuration (list of 3 or 4 ints) - how many layers in each pooling block
        number_of_initial_features (int) - the number of filters to learn in the first convolution layer
        batch_norm_size (int) - multiplicative factor for number of bottle neck layers
            (i.e. batch_norm_size * k features in the bottleneck layer)
        drop_rate (float) - dropout rate after each dense layer
        number_of_classes (int) - number of classification classes
        small_inputs (bool) - set to True if images are 32x32. Otherwise assumes images are larger.
        efficient (bool) - set to True to use checkpointing. Much more memory efficient, but slower.
    """

    def __init__(self, input_data_depth=3, growth_rate=12, dense_block_configuration=(16, 16, 16), compression=0.5,
                 number_of_initial_features=82, batch_norm_size=4, drop_rate=0, number_of_classes=10,
                 model_dimensions_of_convolution=2, small_inputs=True, efficient=False, inception_convolution_filters=16
                 ):
        super(DenseNeXt, self).__init__()
        assert 0 < compression <= 1, 'compression of DenseNeXt should be between 0 and 1'
        assert 1 <= model_dimensions_of_convolution <= 3, 'model_dimensions_of_convolution should be between 1 and 3'

        self.input_data_depth = input_data_depth
        self.growth_rate = growth_rate
        self.dense_block_configuration = dense_block_configuration
        self.compression = compression
        self.number_of_initial_features = number_of_initial_features
        self.batch_norm_size = batch_norm_size
        self.drop_rate = drop_rate
        self.number_of_classes = number_of_classes
        self.model_dimensions_of_convolution = model_dimensions_of_convolution
        self.small_inputs = small_inputs
        self.inception_convolution_filters = inception_convolution_filters
        self.classifier_input_channels = 0

        self.initial_Inception = EnhancedInceptionModule(input_data_depth=input_data_depth,
                                           output_data_depth=number_of_initial_features,
                                           number_of_convolution_filters=inception_convolution_filters,
                                           dimensions_of_convolution=model_dimensions_of_convolution,
                                           max_kernel_size=7
                                           )
        if model_dimensions_of_convolution == 1:
            self.initial_batch_norm = nn.BatchNorm1d(number_of_initial_features)
            self.initial_max_pool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1, ceil_mode=False)
            self.final_batch_norm = nn.BatchNorm1d(number_of_initial_features)

        elif model_dimensions_of_convolution == 2:
            self.initial_batch_norm = nn.BatchNorm2d(number_of_initial_features)
            self.initial_max_pool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1, ceil_mode=False)
            self.final_batch_norm = nn.BatchNorm2d(number_of_initial_features)
        elif model_dimensions_of_convolution == 3:
            self.initial_batch_norm = nn.BatchNorm3d(number_of_initial_features)
            self.initial_max_pool = nn.MaxPool3d(kernel_size=3, stride=2, padding=1, ceil_mode=False)
            self.final_batch_norm = nn.BatchNorm3d(number_of_initial_features)

        self.initial_ReLU = nn.ReLU(inplace=True)

        self.dense_block = _DenseBlock(
            number_of_layers=dense_block_configuration[0],
            number_of_input_features=number_of_initial_features,
            batch_norm_size=batch_norm_size,
            growth_rate=growth_rate,
            drop_rate=drop_rate,
            efficient=efficient,
            model_dimensions_of_convolution=model_dimensions_of_convolution
        )

        self.transition = _Transition(
            number_of_input_features=number_of_initial_features,
            number_of_output_features=int(number_of_initial_features * self.compression),
            model_dimensions_of_convolution=model_dimensions_of_convolution
        )

        if model_dimensions_of_convolution == 1:
            self.final_batch_norm = nn.BatchNorm1d(number_of_initial_features)
            self.classifier = ConvolutionalClassifier(0, number_of_classes=self.number_of_classes,
                                                      model_dimensions_of_convolution=self.model_dimensions_of_convolution)
        elif model_dimensions_of_convolution == 2:
            self.final_batch_norm = nn.BatchNorm2d(number_of_initial_features)
            self.classifier = ConvolutionalClassifier(0, number_of_classes=self.number_of_classes,
                                                      model_dimensions_of_convolution=self.model_dimensions_of_convolution)
        elif model_dimensions_of_convolution == 3:
            self.final_batch_norm = nn.BatchNorm3d(number_of_initial_features)
            self.classifier = ConvolutionalClassifier(0, number_of_classes=self.number_of_classes,
                                                      model_dimensions_of_convolution=self.model_dimensions_of_convolution)

        # TODO: try convolution as a classifier

        for name, parameter in self.named_parameters():
            if 'conv' in name and 'weight' in name:
                dimensions = [parameter.size]
                n = parameter.size(0)
                for i in range(2, len(dimensions)):
                    n *= parameter.size(i)
                parameter.data.normal_().mul_(math.sqrt(2. / n))
            elif 'norm' in name and 'weight' in name:
                parameter.data.fill_(1)
            elif 'norm' in name and 'bias' in name:
                parameter.data.fill_(0)
            elif 'classifier' in name and 'bias' in name:
                parameter.data.fill_(0)

    def forward(self, x):
        features = self.initial_Inception(x)

        if not self.small_inputs:
            features = self.initial_max_pool(self.initial_batch_norm(features))

        for i in range(len(self.dense_block_configuration)):
            self.dense_block.number_of_layers = self.dense_block_configuration[i]
            self.dense_block.number_of_input_features = features.size(dim=1)
            features = self.dense_block(features)
            
            if i != len(self.dense_block_configuration) - 1: 
              self.transition.number_of_input_features = features.size(dim=1)
              print(features.size(dim=1))
              print(self.transition.number_of_input_features)
              features = self.transition(features)
        self.final_batch_norm.in_channels = features.size(dim=1)      
        features = self.final_batch_norm(features)

        out = F.relu(features, inplace=True)
        out = F.adaptive_avg_pool2d(out, (1, 1))
        self.classifier.classifier_input_channels = out.size(dim=1)
        out = self.classifier(out)
        out = torch.flatten(out, 1)
        return out

        # for i in dense_block_configuration:

        # input_data_depth, output_data_depth, number_of_convolution_filters=32, max_kernel_size=7,
        # dimensions_of_convolution=2

    # def forward(self, x):


In [10]:
net = DenseNeXt()
input_data = torch.randn(1, 3, 19, 11)
print(net(input_data))
# input_data_depth=3, growth_rate=12, dense_block_configuration=(16, 16, 16), compression=0.5,
# number_of_initial_features=24, batch_norm_size=4, drop_rate=0, number_of_classes=10,
# model_dimensions_of_convolution=2, small_inputs=True, efficient=False, inception_convolution_filters=16

# number_of_input_features, growth_rate, batch_norm_size, drop_rate,
# model_dimensions_of_convolution, efficient=False

RuntimeError: ignored