In [1]:
import math
import torch
import random
import numpy as np
import torchvision.datasets as dsets
import torchvision.transforms as transforms

from torch import nn
from tqdm import tqdm
from torch.nn import functional as F
from torch.utils.data import Dataset, random_split


In [18]:
device = 'cpu'
epochs = 10
lr = 0.00001
bs = 32

In [19]:
layers_set = {
                # increase channels with kernel 3
                'conv=channel_factor:2,kernel_size:3,stride:1,padding:0-',
                'conv=channel_factor:3,kernel_size:3,stride:1,padding:0-',
                'conv=channel_factor:4,kernel_size:3,stride:1,padding:0-',

                # decrease channels with kernel 3
                'conv=channel_factor:0.4,kernel_size:3,stride:1,padding:0-',
                'conv=channel_factor:0.6,kernel_size:3,stride:1,padding:0-',
                'conv=channel_factor:0.8,kernel_size:3,stride:1,padding:0-',


                 'batchnorm=eps:0.00001-',
                 'batchnorm=eps:0.0001-',

                 'avgpool=kernel_size:2,stride:2,padding:0-',
                 'avgpool=kernel_size:4,stride:4,padding:0-',

                 'maxpool=kernel_size:2,stride:2,padding:0-',
                 'maxpool=kernel_size:4,stride:4,padding:0-',

                 'dropout=p:0.2-',
                 'dropout=p:0.4-',
  }

In [20]:
def create_layers_string(min_len=2, max_len=10):
    if min_len < 1:
      print('min_len < 0,\n Please, choose min_len >= 1')
      return None
    length = random.randint(min_len, max_len)
    text = random.sample(layers_set, length)
    text = ''.join((layer for layer in text))
    return text

In [21]:
class NN(nn.Module):

    def __init__(self, in_channels):
      super().__init__()

      self.layers = nn.Sequential()

    def forward(self, x):
      x = self.layers(x)
      return x

    def __call__(self, x):
      return self.forward(x)

In [22]:
class nnGenerator():
    def __init__(self):
      self.text_layers_dict = dict({})
      self.nn_len = -1

    def parseTextNet(self, text_net):
      self.text_layers_dict = dict({})
      self.nn_len = -1
      print(text_net)
      if text_net[-1] == '-':
        text_net = text_net[:-1]
      text_layers = text_net.split('-')
      id = 0
      for text_layer in text_layers:
        tmp = text_layer.split('=')
        layer_name, layer_params = tmp[0], tmp[1].split(',')
        layer_params_dict = dict({})
        for param in layer_params:
          param = param.split(':')
          param_name, param_value = param[0], param[1]
          layer_params_dict[param_name] = param_value
        self.text_layers_dict[layer_name + str(id)] = (id, layer_params_dict)
        id += 1
      print(self.text_layers_dict)

    def get_text_layers_dict(self):
      return self.text_layers_dict

    def get_nn_len(self):
      return self.nn_len

    def conv_output_shape(self, h, w, kernel_size=1, stride=1, pad=0, dilation=1):
      h = math.floor( ((h + (2 * pad) - ( dilation * (kernel_size - 1) ) - 1 )/ stride) + 1)
      w = math.floor( ((w + (2 * pad) - ( dilation * (kernel_size - 1) ) - 1 )/ stride) + 1)
      return h, w

    def generateNN(self, n_classes, test_batch):
      success_state = False

      backbone = nn.Sequential()
      classifier = nn.Sequential()
      optimizer = None
      try:
        data_shape = np.array(test_batch).shape # [B, C, H, W]
        last_shape = data_shape
        for layer_name in self.text_layers_dict.keys():
            layer = None
            layer_params= self.text_layers_dict[layer_name][1]
            if layer_name.find('conv') >= 0:
              kernel_size = int(layer_params['kernel_size'])
              channel_factor = float(layer_params['channel_factor'])
              stride = int(layer_params['stride'])
              padding = int(layer_params['padding'])
              dilation = 1

              activation = nn.ReLU(inplace=True)

              in_chan = last_shape[1]
              assert(in_chan <= last_shape[2] and in_chan <= last_shape[3])
              out_chan = math.floor(in_chan * channel_factor)
              assert(out_chan > 0)

              backbone.append(nn.Conv2d(in_chan, out_chan, kernel_size, stride, padding, dilation))
              backbone.append(activation)

              h, w = self.conv_output_shape(last_shape[2], last_shape[3], kernel_size, stride, padding, dilation)
              last_shape = (last_shape[0], out_chan, h, w)

            elif layer_name.find('batchnorm') >= 0:
              eps = float(layer_params['eps'])
              in_chan = last_shape[1]
              backbone.append(nn.BatchNorm2d(in_chan, eps))

            elif layer_name.find('avgpool') >= 0:
              kernel_size = int(layer_params['kernel_size'])
              stride = int(layer_params['stride'])
              padding = int(layer_params['padding'])
              dilation = 1
              out_chan = last_shape[1]
              backbone.append(nn.AvgPool2d(kernel_size, stride, padding))

              h, w = self.conv_output_shape(last_shape[2], last_shape[3], kernel_size, stride, padding, dilation)
              last_shape = (last_shape[0], out_chan, h, w)


            elif layer_name.find('maxpool') >= 0:
              kernel_size = int(layer_params['kernel_size'])
              stride = int(layer_params['stride'])
              padding = int(layer_params['padding'])
              dilation = 1

              backbone.append(nn.MaxPool2d(kernel_size, stride, padding))

              h, w = self.conv_output_shape(last_shape[2], last_shape[3], kernel_size, stride, padding, dilation)
              last_shape = (last_shape[0], last_shape[1], h, w)

            elif layer_name.find('dropout') >= 0:
              p = float(layer_params['p'])
              backbone.append(nn.Dropout2d(p))

        linear_in_shape = last_shape[1] * last_shape[2] * last_shape[3]
        classifier = nn.Linear(linear_in_shape, n_classes)
        success_state = True
        print('NN build successfull!')

      except Exception as e:
        print('NN build failed!')
        print(str(e))

      net = nn.Sequential()
      net.append(backbone)
      net.append(nn.Flatten(start_dim=1))
      net.append(classifier)
      self.text_layers_dict = dict({})
      return success_state, net

In [23]:

class ds(Dataset):
    def __init__(self, X, y):

      self.X = X
      self.y = y

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        x_ = self.X[idx]
        y_ = self.y[idx]
        return x_, y_


In [24]:
train_data = dsets.MNIST(root = './data', train = True,
                        transform = transforms.ToTensor(), download = True)

test_data = dsets.MNIST(root = './data', train = False,
                       transform = transforms.ToTensor())


train_samples = np.expand_dims(np.array(train_data.data), axis=1)[:5000]
train_labels = np.array(train_data.targets)[:5000]


dataset = ds(X=train_samples, y=train_labels)

train_set, valid_set = random_split(dataset, [0.8, 0.2], generator=torch.Generator().manual_seed(42))

train_dataloader = torch.utils.data.DataLoader(
  train_set,
  batch_size=bs,
  shuffle=True,
  drop_last=True)

valid_dataloader = torch.utils.data.DataLoader(
  valid_set,
  batch_size=bs,
  drop_last=True,
  shuffle=True)

In [25]:
test_batch = None
for b, _ in train_dataloader:
  test_batch = b
  break

In [26]:
generator = nnGenerator()

In [33]:
success_state = False
while not success_state:
  text_layers = create_layers_string()
  generator.parseTextNet(text_layers)
  success_state, seq = generator.generateNN(n_classes=10, test_batch=test_batch)


conv=channel_factor:0.4,kernel_size:3,stride:1,padding:0-batchnorm=eps:0.00001-conv=channel_factor:4,kernel_size:3,stride:1,padding:0-conv=channel_factor:0.8,kernel_size:3,stride:1,padding:0-maxpool=kernel_size:4,stride:4,padding:0-
{'conv0': (0, {'channel_factor': '0.4', 'kernel_size': '3', 'stride': '1', 'padding': '0'}), 'batchnorm1': (1, {'eps': '0.00001'}), 'conv2': (2, {'channel_factor': '4', 'kernel_size': '3', 'stride': '1', 'padding': '0'}), 'conv3': (3, {'channel_factor': '0.8', 'kernel_size': '3', 'stride': '1', 'padding': '0'}), 'maxpool4': (4, {'kernel_size': '4', 'stride': '4', 'padding': '0'})}
NN build failed!

batchnorm=eps:0.00001-conv=channel_factor:3,kernel_size:3,stride:1,padding:0-avgpool=kernel_size:4,stride:4,padding:0-conv=channel_factor:0.6,kernel_size:3,stride:1,padding:0-maxpool=kernel_size:4,stride:4,padding:0-dropout=p:0.2-conv=channel_factor:0.8,kernel_size:3,stride:1,padding:0-dropout=p:0.4-
{'batchnorm0': (0, {'eps': '0.00001'}), 'conv1': (1, {'channel_

since Python 3.9 and will be removed in a subsequent version.
  text = random.sample(layers_set, length)


In [34]:
model = NN(in_channels=1)
model.layers = seq

In [39]:
display(model)

NN(
  (layers): Sequential(
    (0): Sequential(
      (0): Dropout2d(p=0.2, inplace=False)
      (1): Conv2d(1, 3, kernel_size=(3, 3), stride=(1, 1))
      (2): ReLU(inplace=True)
      (3): Dropout2d(p=0.4, inplace=False)
    )
    (1): Flatten(start_dim=1, end_dim=-1)
    (2): Linear(in_features=2028, out_features=10, bias=True)
  )
)

In [36]:
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=lr)

In [37]:
def train():
    train_losses = []
    valid_losses = []
    # TODO calculate metrics and return them after train
    def CalcValLoss():
        with torch.no_grad():
            losses = []
            for X, Y in valid_dataloader:
                X = X.float().to(device)
                Y = Y.float().to(device)
                preds = model(X)
                preds, _ = torch.max(preds,1)
                loss = criterion(preds,Y)
                losses.append(loss.item())
            print("Valid Loss : {:.6f}".format(torch.tensor(losses).mean()))
            valid_losses.append(torch.tensor(losses).mean())

    for i in range(1, epochs):
        losses = []
        for X, Y in tqdm(train_dataloader):
            X = X.float().to(device)
            Y = Y.float().to(device)
            preds = model(X)
            preds, _ = torch.max(preds,1)
            loss = criterion(preds, Y)
            losses.append(loss.item())

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        print("Train Loss : {:.6f}".format(torch.tensor(losses).mean()))
        train_losses.append(torch.tensor(losses).mean())
    return train_losses, valid_losses

In [38]:
train_losses, valid_losses = train()

100%|██████████| 125/125 [00:00<00:00, 213.39it/s]


Train Loss : 8579.939453


100%|██████████| 125/125 [00:00<00:00, 219.21it/s]


Train Loss : 493.860870


100%|██████████| 125/125 [00:00<00:00, 224.44it/s]


Train Loss : 493.228210


100%|██████████| 125/125 [00:00<00:00, 347.04it/s]


Train Loss : 493.066498


100%|██████████| 125/125 [00:00<00:00, 359.57it/s]


Train Loss : 492.938690


100%|██████████| 125/125 [00:00<00:00, 340.28it/s]


Train Loss : 492.947876


100%|██████████| 125/125 [00:00<00:00, 339.37it/s]


Train Loss : 492.833740


100%|██████████| 125/125 [00:00<00:00, 335.57it/s]


Train Loss : 492.833679


100%|██████████| 125/125 [00:00<00:00, 350.72it/s]

Train Loss : 492.908295



