In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchsummary import summary
import numpy as np
import random
import math


In [2]:
# Configuration for evolution algorithm

population_size = 20 # population size
numbers_of_generation = 10 # numbers of generation
init_epoches = 10 # numbers of epoches we train the model during the evoluation
fully_epoches = 100 # numbers of epoches we train the best model
part1_min = 7 # minimal size of part1
part1_max = 15 # maximum size of part1 
kernel_max = 256 # maximum kernel for conv layer
add_conv_probability = 0.5 # The probability of adding conv instead of pool during the initialization
add_max_pool_probability = 0.8 # The probability of adding maxPool instead of AvgPool during the initialization
mutation_size = population_size // 2 # numbers of new individuals after mutation


In [3]:
# Define hyper-parameters for training
batch_size = 64 
learning_rate = 1e-2

# Load CIFAR-10 dataset
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
test_dataset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


Files already downloaded and verified
Files already downloaded and verified


In [4]:
def train_model(epochs, net):
  criterion = nn.CrossEntropyLoss()
  optimizer = torch.optim.SGD(net.parameters(), lr=learning_rate)  
  net = net.to(device)

  for epoch in range(epochs):
      for i, (images, labels) in enumerate(train_loader):
          images, labels = images.to(device), labels.to(device)
          outputs = net(images)
          loss = criterion(outputs, labels)

          optimizer.zero_grad()
          loss.backward()
          optimizer.step()

      correct = 0
      total = 0
      with torch.no_grad():
          for images, labels in test_loader:
              images, labels = images.to(device), labels.to(device)
              outputs = net(images)
              _, predicted = torch.max(outputs.data, 1)
              total += labels.size(0)
              correct += (predicted == labels).sum().item()

      print('Epoch: {}/{}, Test Accuracy: {:.2f}'.format(epoch+1, epochs, correct/total))
  
  return correct / total


def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)



In [5]:
'''
0 - Conv2d
1 - ReLU
2 - MaxPool2d
3 - AvgPool2d
4 - Linear
5 - Droupout
'''

class Individual:
  def __init__(self, part1, part2):
    # Part1 only contains Conv, Pool, and ReLU
    self.part1 = part1
    # Part2 only contains Linear, Droupout, and ReLU
    self.part2 = part2
    self.score = {
        "accuracy": 0,
        "parameters": 0
    }

  def fill_score(self):
    torch.cuda.empty_cache()
    model = self.decodeCNN()
    # self.score['accuracy'] = train_model(init_epoches, model)
    self.score['accuracy'] = random.randint(1, 99)
    self.score['parameters'] = count_parameters(model)
    model = model.to(device)
    # summary(model, (3, 32, 32))
    del model
    torch.cuda.empty_cache()
  

  def transferToLayer(self, args):
    # After conv, the image size remains the same
    mapping = {
        1: 0, # 1x1 kernel size -> padding = 0
        3: 1, # 3x3 kernel size -> padding = 1
        5: 2, # 5x5 kernel size -> padding = 2
        7: 3, # 7x7 kernel size -> padding = 3
    }
    layer_type = args[0]
    if layer_type == 0:
      return nn.Conv2d(in_channels=args[1], out_channels=args[2], kernel_size=args[3], padding=mapping[args[3]])
    elif layer_type == 1:
      return nn.ReLU()
    elif layer_type == 2:
      return nn.MaxPool2d(2, 2)
    elif layer_type == 3:
      return nn.AvgPool2d(2, 2)
    elif layer_type == 4:
      return nn.Linear(args[1], args[2])
    elif layer_type == 5:
      return nn.Dropout()

  def decodeCNN(self):
    return nn.Sequential(
        *map(self.transferToLayer, self.part1),
        nn.Flatten(),
        *map(self.transferToLayer, self.part2),
    )


In [6]:
def get_random_kernel_size():
  # we only use three types of kernels
  # 1x1 3x3 5x5
  return random.choice([1, 3, 5])


In [7]:
def initialization(size):
  population = []
  
  for _ in range(size):
    part1 = []
    part2 = []
    output_size = 32 # CIFAR10: 32x32
    p1_length = random.randint(part1_min, part1_max)

    # First layer must be conv
    prev_out_channels = random.randint(5, 12)
    part1.extend([
        (0, 3, prev_out_channels, get_random_kernel_size()), # Conv
        (1,) # ReLU
    ])

    # Part1
    for _ in range(2, p1_length + 1):
      if random.random() < add_conv_probability:
        # Add Conv
        cur_out_channels = random.randint(prev_out_channels, kernel_max)
        part1.extend([
            (0, prev_out_channels, cur_out_channels, get_random_kernel_size()),
            (1,)
        ])
        prev_out_channels = cur_out_channels
      else:
        # Add Pool
        # We can only add a maximum of 4 pool layers
        # For each pooling layer, the image size is reduced to half of the original size
        # 32 -> 16 -> 8 -> 4 -> 2 
        if output_size == 2:
          continue 
        if random.random() < add_max_pool_probability: # TODO 50% add maxPool, 50% add avgPool
          # add MaxPool
          part1.append((2,))
        else:
          # AvgPool
          part1.append((3,))
        output_size //= 2

    # Part2  
    prev_features = prev_out_channels * output_size * output_size 
    
    part2.extend([
          (5, ), 
          (4, prev_features, max(10, prev_features // 2)),
          (1, ),
          (5, ),
          (4, max(10, prev_features // 2), max(10, prev_features // 4)),
          (1, ),
          (4, max(10, prev_features // 4), 10),
    ])
          
    population.append(Individual(part1, part2))

  for individual in population:
    individual.fill_score()
    
  return population
  

In [8]:
from itertools import zip_longest 


def mutation(population):
  best = 5
  next_generation = []
  part1Set = set()

  while len(next_generation) < mutation_size:
    best_idx = random.randint(0, best - 1)
    best_individual = population[best_idx]

    r1 = random.randint(0, len(population) - 1)
    while r1 == best_idx:
      r1 = random.randint(0, len(population) - 1)

    r2 = random.randint(0, len(population) - 1)
    while r2 in (r1, best_idx):
      r2 = random.randint(0, len(population) - 1)
    
    r1_minus_r2 = differentiate(population[r1], population[r2])
    new_individual = merge(best_individual, r1_minus_r2)
    if tuple(new_individual.part1) in part1Set:
      continue
    part1Set.add(tuple(new_individual.part1))
    next_generation.append(new_individual)
    
  
  for individual in next_generation:
    individual.fill_score()
  
  return next_generation
    
    
def differentiate(ind1, ind2):
  add_to_best = []
  p1 = p2 = 0
  # Remove ReLU
  seq1 = [x for x in ind1.part1 if x[0] != 1]
  seq2 = [x for x in ind2.part1 if x[0] != 1]

  while p1 < len(seq1) or p2 < len(seq2):
    if p1 == len(seq1):
      add_to_best.append(seq2[p2])
      p2 += 1
      continue
    if p2 == len(seq2):
      add_to_best.append(seq1[p1])
      p1 += 1
      continue
    if seq1[p1][0] == seq2[p2][0]:
      add_to_best.append((-1,)) # -1 represents "remain the smae"
      p1 += 1
      p2 += 1
    else:
      add_to_best.append(seq1[p1])
      p1 += 1
      p2 += 1
      
  return add_to_best



def merge(best_individual, r1_minus_r2):
  new_part1 = []
  seq = [x for x in best_individual.part1 if x[0] != 1] # Remove ReLU
  count_pool = 0
  p1 = p2 = 0
  in_channels = 3

 
  while p1 < len(seq):
      if p2 == len(r1_minus_r2):
          new_part1.append(seq[p1])
          p1 += 1
      elif r1_minus_r2[p2][0] == -1:
          new_part1.append(seq[p1])
          p1 += 1
          p2 += 1
      else:
          new_part1.append(r1_minus_r2[p2])
          p1 += 1
          p2 += 1

      if new_part1[-1][0] == 0:
          replace = (new_part1[-1][0], in_channels, new_part1[-1][2], new_part1[-1][3])
          in_channels = replace[2]
          new_part1.pop()
          new_part1.extend([replace, (1, )])
      elif new_part1[-1][0] in (2, 3):
          count_pool += 1
          if count_pool > 4:
              new_part1.pop()
              count_pool -= 1
              
  features = int(in_channels * (32 * (1 / 2)**count_pool) * (32 * (1 / 2)**count_pool))
  new_part2 = [
      (5, ), 
      (4, features, max(10, features // 2)),
      (1, ),
      (5, ),
      (4, max(10, features // 2), max(10, features // 4)),
      (1, ),
      (4, max(10, features // 4), 10),
  ]
  
  return Individual(new_part1, new_part2)



In [9]:
def get_score(err_max, err_min, para_max, para_min, err, para):
  # Normalization
  err = (err - err_min) / (err_max - err_min)
  para = (para - para_min) / (para_max - para_min)
  score = 0.7 * err + 0.3 * para
  return score

In [10]:

# Initialization
population = initialization(population_size)
err_max = 1 - min(x.score['accuracy'] for x in population)
err_min = 1 - max(x.score['accuracy'] for x in population)
para_max = max(x.score['parameters'] for x in population)
para_min = min(x.score['parameters'] for x in population)
population.sort(key=lambda x: get_score(err_max, err_min, para_max, para_min, 1 - x.score['accuracy'], x.score['parameters']))


In [11]:
for g in range(2, numbers_of_generation):
  # Mutation
  new_population = mutation(population)
  population.extend(new_population)
  err_max = 1 - min(x.score['accuracy'] for x in population)
  err_min = 1 - max(x.score['accuracy'] for x in population)
  para_max = max(x.score['parameters'] for x in population)
  para_min = min(x.score['parameters'] for x in population)
  population.sort(key=lambda x: get_score(err_max, err_min, para_max, para_min, 1 - x.score['accuracy'], x.score['parameters']))
  population = population[:population_size]


In [12]:
# fully train the best CNN
best_individual = population[0]
train_model(fully_epoches, best_individual.decodeCNN())

Epoch: 1/100, Test Accuracy: 0.11


KeyboardInterrupt: ignored