# Generating CNN architectures automatically with genetic algorithms

In [None]:
import random
import json

import numpy as np
import matplotlib.pyplot as plt
from tabulate import tabulate

import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.layers import Input, Add, Dense, Activation, BatchNormalization, Flatten, Conv2D, AveragePooling2D, MaxPooling2D, GlobalMaxPooling2D
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.utils import to_categorical

print(tf.__version__)

# ResNets

This project is based in ResNets, using this kind of Convolutional Neural Networks allows us to make very deep neural networks avoiding gradient vanishing and overfitting issues.

In [None]:
def random_layer():
    r = random.random()

    if r < 0.5:
        f1 = 2 ** random.randint(3, 8) # number from 8 to 256
        f2 = 2 ** random.randint(3, 8) # number from 8 to 256
        new_layer = str(f1) + "_" + str(f2)
    else:
        q = random.random()
        if q < 0.5:
            # Max Pooling
            new_layer = "max"
        else:
            new_layer = "mean"

    return new_layer

## Skip Layer

### What is a Skip Layer?

In [None]:
def skip_layer(X, f1, f2, kernel = (3,3), stride = (1,1)):
    inputs = X

    # First convolution
    layer = Conv2D(f1, kernel_size=kernel, strides=stride, padding="same")(X)
    layer = BatchNormalization(axis=3)(layer)
    layer = Activation("relu")(layer)

    # Second convolution
    layer = Conv2D(f2, kernel_size=kernel, strides=stride, padding="same")(layer)
    layer = BatchNormalization(axis=3)(layer)

    # Inter convolution (makes sure that the dimensionality at the skip layers are the same)
    inputs = Conv2D(f2, kernel_size=(1,1), strides=stride, padding="same")(inputs)

    # We add the input and the second convolution layers
    outputs = Add()([inputs, layer])
    outputs = Activation("relu")(outputs)

    return outputs

## Pooling Layer

In [None]:
def pooling_layer(X, pooling_type, kernel = (2,2), stride = (2,2)):
    pooling_choices = {
        "max": MaxPooling2D,
        "mean": AveragePooling2D
    }

    return pooling_choices[pooling_type](pool_size=kernel, strides=stride, padding="same")(X)

# Genetic Algorithm

## The CNN class (individual)

In [None]:
# This class represents each individual of our population
class CNN:

    def __init__(self, encoding:str, input_shape:tuple, output_shape:int) -> None:

        """
        Class constructor

        Args:
            encoding (str): Encoding representation of the CNN
            input_shape (tuple): Input shape of the CNN (height, width, channels)
            output_shape (int): Number of classes of the CNN
        """

        # Genetic algorithm stuff
        self.genes = encoding.split("-") # List of genes (cnn layers)
        self.num_genes = len(self.genes)
        self.fitness = 0.0               # Adaptation value

        # Convolutional Neural Network stuff
        self.encoding = encoding         # Encoded representation of the CNN
        self.input_shape = input_shape   # Input shape (WIDTH, HEIGHT, CHANNELS)
        self.output_shape = output_shape # Output shape (number of classes)
        self.model = None                # Model object (Tensorflow)
        self.accuracy = 0.0              # Accuracy of the model
        self.process_time = 0.0          # Time of the model to make an inference
        self.training_time = 0.0         # Time taken to train

    def generate_model(self) -> tf.keras.Model:
        """
        Generates a Keras model from the encoding

        Returns:
            model: tensorflow.keras.Model
        """
        inputs = Input(shape=self.input_shape)
        outputs = inputs

        # Create a list of layers from the encoding of the cnn
        layers = []
        for layer in self.encoding.split("-"):
            if layer == "mean":
                outputs = pooling_layer(outputs, "mean")
            elif layer == "max":
                outputs=  pooling_layer(outputs, "max")
            else:
                # Skip layer
                f1, f2 = layer.split("_")
                outputs = skip_layer(outputs, int(f1), int(f2))

        outputs = GlobalMaxPooling2D()(outputs)
        outputs = Dense(self.output_shape, activation="softmax")(outputs)

        self.model = Model(inputs = inputs, outputs = outputs)

    def __str__(self) -> str:

        """
        String representation of the object.
        """

        return f"""Model encoding: {self.encoding}, \nModel Accuracy: {self.accuracy},\nModel process time: {self.process_time}"""

    def get_info(self) -> dict:
        
        """
        Returns a dict with the information of the current individual.

        Returns:
            dict: Dictionary containing class important attributes.
        """
        
        return {
            "encoding" : self.encoding,
            "depth": self.num_genes,
            "fitness" : self.fitness,
            "accuracy" : self.accuracy,
            "process time": self.process_time,
            "training time": self.training_time,
        }

## The Population class

In [None]:
class Population:

    def __init__(self, n_individuals, min_genes, max_genes):
        self.n_individuals = n_individuals
        self.min_genes = min_genes
        self.max_genes = max_genes
        self.individuals = None
        self.best_individual = None

    def initialize(self, input_shape, output_shape):
        population = [] # List of CNN objects
        
        for _ in range(self.n_individuals):
            new_individual = self.generate_individual(input_shape, output_shape)
            population.append(new_individual)

        self.individuals = population

    def generate_individual(self, input_shape, output_shape):
        """_summary_

        Args:
            input_shape (_type_): _description_
            output_shape (_type_): _description_

        Returns:
            _type_: _description_
        """
        depth = random.randint(self.min_genes, self.max_genes)
        layers = [random_layer() for _ in range(depth)]
        layers = "-".join(layers)

        return CNN(layers, input_shape, output_shape)

    def print(self):
        individuals_info = []
        for individual in self.individuals:
            individuals_info.append(
                [individual.num_genes, individual.encoding, round(individual.accuracy, 4)]
            )

        print(tabulate(individuals_info,
        headers=["CNN Depth", "Encoding", "Accuracy"],
        numalign="center", stralign="left"))

## The GeneticAlgorithm class

In [None]:
class GeneticAlgorithm:

    def __init__(self, population_size, min_genes, max_genes, fitness_func, mutation_rate, 
                crossover_rate, num_generations, saved_cnns, training_params):
        self.population_size = population_size
        self.min_genes = min_genes
        self.max_genes = max_genes
        self.fitness_func = fitness_func
        self.num_generations = num_generations
        
        self.mutation_rate = mutation_rate
        self.crossover_rate = crossover_rate

        self.population = Population(population_size, min_genes, max_genes)
        self.input_shape = training_params["X_train"][0].shape
        self.output_shape = training_params["y_train_cat"][0].shape[0]
        # print("Input shape:", self.input_shape)
        # print("Output shape:", self.output_shape)
        self.population.initialize(self.input_shape, self.output_shape)
        
        self.saved_cnns = saved_cnns
        self.training_params = training_params

    def save_architectures(self):
        # Directly from dictionary
        with open('evaluated_architectures.json', 'w') as outfile:
            json.dump(self.saved_cnns, outfile, indent=4)

    def evaluate_population(self):

        if self.population.best_individual is not None:
            best_acc = self.population.best_individual.accuracy
        else:
            best_acc = 0.0

        for individual in self.population.individuals:
            print()
            # Check if our architecture is already in the saved cnns dict
            if individual.encoding in self.saved_cnns:
                print("Architecture {} already evaluated".format(individual.encoding))
            else:
                individual.generate_model()
                individual.model.compile(optimizer="Adam", loss="categorical_crossentropy", metrics=["accuracy"])

                # Get training params
                X_train = self.training_params["X_train"]
                X_test = self.training_params["X_test"]
                y_train_cat = self.training_params["y_train_cat"]
                y_test_cat = self.training_params["y_test_cat"]
                epochs = self.training_params["epochs"]
                batch_size = self.training_params["batch_size"]

                print("Architecture:", individual.encoding)
                print("Training ...")
                history = individual.model.fit(X_train, y_train_cat, epochs = epochs, batch_size = batch_size, validation_split=0.15, verbose=1)

                print("Evaluating the model with unseen data ...")
                val_loss, val_acc = individual.model.evaluate(X_test, y_test_cat)

                individual.accuracy = val_acc

                if val_acc > best_acc:
                    self.population.best_individual = individual
                    best_acc = val_acc

                self.saved_cnns[individual.encoding] = val_acc

        print("\nEvery individual has been evaluated! ...")


    def tournament_selection(self):

        print("Tournament Selection".center(30, "="))

        offspring = []

        # Sort individuals by accuracy
        sorted_individuals = sorted(self.population.individuals, key=lambda x: x.accuracy, reverse=True)

        # Elitism
        # We add to the offspring the best individual of the last population
        offspring.append(sorted_individuals[0])

        # Fill the offspring by tournament
        while len(offspring) < self.population_size:
            # Select two individuals
            ind1 = random.choice(self.population.individuals)
            ind2 = random.choice(self.population.individuals)

            winner = ind1 if ind1.accuracy > ind2.accuracy else ind2

            print(f"Selected individuals: {ind1.encoding} acc: {round(ind1.accuracy, 4)} & {ind2.encoding} acc: {round(ind2.accuracy, 4)} | Winner: {winner.encoding}")

            offspring.append(winner)

        return offspring

    def cross(self, parent1, parent2, cross_point):        
        genes1 = parent1.genes[:cross_point]
        genes1 += parent2.genes[cross_point:]

        genes2 = parent1.genes[cross_point:]
        genes2 += parent2.genes[:cross_point]

        genes1 = "-".join(genes1)
        genes2 = "-".join(genes2)

        son1 = CNN(genes1, self.input_shape, self.output_shape)
        son2 = CNN(genes2, self.input_shape, self.output_shape)

        return son1, son2

    def crossover(self):
        print("Crossover".center(20, "="))
        selected_indices = [] # Individuals selected for crossover
        num_selected = 0

        for index in range(self.population_size):
            r = random.random()
            # Se eligen los individuos de las posiciones i con a_i < prob_cruce
            if r < self.crossover_rate:
                selected_indices.append(index)
                num_selected += 1
        
        # El número de seleccionados se hace par
        if num_selected % 2 == 1:
            num_selected -= 1

        for i in range(0, len(selected_indices), 2):
            parent1 = self.population.individuals[i]
            parent2 = self.population.individuals[i+1]

            # We choose a random crossover point from shortest parent
            shortest_parent = parent1 if parent1.num_genes < parent2.num_genes else parent2
            cross_point = random.randint(0, shortest_parent.num_genes-1)

            # We create two individuals based on their parents
            son1, son2 = self.cross(parent1, parent2, cross_point)
            
            # New individuals replace their parents

            print(f"Crosspoint: {cross_point} | Layer {parent1.genes[cross_point]}")
            print(f"Parent {parent1.encoding} was replaced by {son1.encoding}")
            print(f"Parent {parent2.encoding} was replaced by {son2.encoding}")

            self.population.individuals[i]   = son1
            self.population.individuals[i+1] = son2

    def mutation(self):

        possible_mutations = {"increment_depth", "reduce_depth"}
        # "change_layer_type", "recreate_layer"

        # We loop over the genes of every individual in population
        for individual in self.population.individuals:
            for n_gene, gene in enumerate(individual.genes):
                r = random.random()
                if r < self.mutation_rate:
                    # Mutate

                    print("Mutation".center(20, "="))

                    mutation_type = random.choice(possible_mutations)
                    
                    if mutation_type == "increment_depth":
                        # Put a layer after this layer
                        new_layer = random_layer()       
                        individual.genes.insert(n_gene + 1, new_layer)        

                    elif mutation_type == "reduce_depth":
                        # Delete the current layer
                        removed = individual.genes.pop(n_gene)
                        print(f"{removed} Removed from layers")

                    elif mutation_type == "change_layer_type":
                        # Put Skip layer if Mean layer or vice versa
                        if gene == "mean" or gene == "max":
                            pass

                    elif mutation_type == "recreate_layer":
                        pass

    def main_loop(self):
        for generation in range(self.num_generations):

            print(f"Generation {generation + 1}")
            # ========== Evaluation ============
            self.evaluate_population()

            best_individual = self.population.best_individual
            print(f"The best individual was {best_individual.encoding} with accuracy {round(best_individual.accuracy, 4)}")

            print("Individuals Summary".center(30, " "))
            self.population.print()
            self.save_architectures()

            # ========== Selection =============
            self.population.individuals = self.tournament_selection()

            # ========== Crossover ==========
            self.crossover()

            # ========== Mutation ==============
            # self.mutation()
        

# Running our Genetic Algorithm

## Preparing our parameters

### Lloading the architectures that have already been evaluated

In [None]:
# Dictionary of saved architectures with it's fitness value
# This should be replaced with a json file
saved_cnns = {}

### Lloading the data set with which the individuals will be evaluated

### Out dataset: CIFAR 10
It has 10 classes, wich are:

| Label | Description |
|-------|-------------|
|   0   |   airplane  |
|   1   |  automobile |
|   2   |     bird    |
|   3   |     cat     |
|   4   |     deer    |
|   5   |     dog     |
|   6   |     frog    |
|   7   |    horse    |
|   8   |     ship    |
|   9   |    truck    |

In [None]:
(X_train, y_train), (X_test, y_test) = tf.keras.datasets.cifar10.load_data()
y_train_cat = to_categorical(y_train)
y_test_cat = to_categorical(y_test)

### Setting the training parameters ready

In [None]:
training_params = {
    "epochs" : 15,
    "batch_size" : 16,
    "X_train" : X_train,
    "X_test" : X_test,
    "y_train_cat" : y_train_cat,
    "y_test_cat" : y_test_cat
}

## Creating a GeneticAlgorithm object

In [None]:
ga = GeneticAlgorithm(
    population_size=10,  # How many CNN will be in the population
    min_genes=5,        # Minimum depth of the CNN's
    max_genes=10,       # Maximum depth of the CNN's
    fitness_func="acc", # TODO: this should be changed to a function that takes response time into account
    num_generations=10,
    mutation_rate=0.07, # Mutation rate (value by convention)
    crossover_rate=0.4, # Crossover rate (value by convention)
    saved_cnns=saved_cnns, # Document that saves the individuals who have already been evaluated, thus saving resources
    training_params=training_params # Parameters needed for training
)

In [None]:
ga.main_loop()

In [None]:
import pprint

with open("evaluated_architectures.json") as file:
    data = json.load(file)
    pprint.pprint(data)

In [None]:
type(data)

In [None]:
data2 = {
    "mean-64_16-32_32-mean-max": 
        {
            "accuracy" : 0.2777000069618225,
            "depth": 5,
            "process time": 0.03,
            "training time": 25
        },
    
    "mean-64_17-32_32-mean-max": 
        {
            "accuracy" : 0.2777000069618225,
            "depth": 5,
            "process time": 0.03,
            "training time": 25
        },

    "mean-64_18-32_32-mean-max": 
        {
            "accuracy" : 0.2777000069618225,
            "depth": 5,
            "process time": 0.03,
            "training time": 25
        },

    "mean-64_19-32_32-mean-max": 
        {
            "accuracy" : 0.2777000069618225,
            "depth": 5,
            "process time": 0.03,
            "training time": 25
        },
}

In [None]:
with open("other_json.json", "w") as outfile:
    json.dump(data2, outfile, indent=4)