In [1]:
import numpy as np
from tensorflow import keras
from keras import layers
from random import choice, choices, randint
from copy import deepcopy

In [2]:
# Model / data parameters
num_classes = 10

# the data, split between train and test sets
(x_train, y_train), (x_test, y_test) = keras.datasets.fashion_mnist.load_data()

# Scale images to the [0, 1] range
x_train = x_train.astype("float32") / 255
x_test = x_test.astype("float32") / 255

# Make sure images have shape (28, 28, 1)
x_train = np.expand_dims(x_train, -1)
x_test = np.expand_dims(x_test, -1)
print("x_train shape:", x_train.shape)
print(x_train.shape[0], "train samples")
print(x_test.shape[0], "test samples")

# convert class vectors to binary class matrices
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)

x_train shape: (60000, 28, 28, 1)
60000 train samples
10000 test samples


In [3]:
INPUT_SHAPE = (28, 28, 1)

#Pooling layer
MIN_POOL_SIZE = 2
MAX_POOL_SIZE = 4

#Convolution layer
MIN_FEATURES = 5
MAX_FEATURES = 50
FEATURES_STD = 2.5
MIN_KERNEL_SIZE = 2
MAX_KERNEL_SIZE = 7

#VGG Block
MIN_CONV_LAYERS = 1
MAX_CONV_LAYERS = 4

#Search
POP_SIZE = 2

BACH_SIZE = 128
EPOCHS = 2

VERBOSE = 1

In [4]:
class Util:

    @staticmethod
    def normalMutation(value, min, max, std=0.8):
        while True:
            new_value = round(np.random.normal(loc=value, scale = std, size=1)[0])
            if new_value < min or new_value > max:
                continue
            return new_value

In [5]:
class ConvLayer:

    def __init__(self):
        self.features = randint(MIN_FEATURES, MAX_FEATURES)
        self.kernel_x = randint(MIN_KERNEL_SIZE, MAX_KERNEL_SIZE)
        self.kernel_y = randint(MIN_KERNEL_SIZE, MAX_KERNEL_SIZE)

    def getLayerList(self):
        layer = []
        layer.append(layers.Conv2D(self.features, self.getKernelSize()))
        layer.append(layers.Activation(activation='relu'))
        return layer

    def mutate(self):
        for key, value in vars(self).items():
            if key != 'features':
                vars(self)[key] = Util.normalMutation(value, MIN_KERNEL_SIZE, MAX_KERNEL_SIZE)
            else:
                vars(self)[key] = Util.normalMutation(value, MIN_FEATURES, MAX_FEATURES, FEATURES_STD)

    def getKernelSize(self):
        return (self.kernel_x, self.kernel_y)

In [6]:
class PoolingLayer:

    def __init__(self):
        self.pool_x = randint(MIN_POOL_SIZE, MAX_POOL_SIZE)
        self.pool_y = randint(MIN_POOL_SIZE, MAX_POOL_SIZE)

    def getLayerList(self):
        return [layers.MaxPooling2D((self.pool_x, self.pool_y), 1)]

    def mutate(self):
        for key, value in vars(self).items():
            vars(self)[key] = Util.normalMutation(value, MIN_POOL_SIZE, MAX_POOL_SIZE)

In [7]:
class VGGBlock:

    def __init__(self):
        self.convs = [ConvLayer() for _ in range(randint(MIN_CONV_LAYERS, MAX_CONV_LAYERS))]
        self.pooling = PoolingLayer()

    def getLayers(self):
        layers = self.convs[:]
        layers.append(self.pooling)
        return layers

    def mutate(self):
        [layer.mutate() for layer in self.convs]
        self.pooling.mutate()
        current_length = len(self.convs)
        new_length = Util.normalMutation(current_length, MIN_CONV_LAYERS, MAX_CONV_LAYERS)
        self.updateConvLength(new_length-current_length)

    def updateConvLength(self, change):
        if change > 0:
            for _ in range(change):
                self.convs.insert(randint(0, len(self.convs)), ConvLayer())
        elif change < 0:
            for _ in range(change*(-1)):
                self.convs.remove((choice(self.convs)))

In [8]:
class NeuralNetwork:

    def __init__(self):
        self.blocks = [VGGBlock(), VGGBlock()]
        self.val_acc = 0

    def evaluate(self):
        try:
            model = self.createModel()

            model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])

            model.fit(x_train, y_train, batch_size=BACH_SIZE, epochs=EPOCHS, validation_split=0.2, verbose=VERBOSE)
            self.val_acc = model.evaluate(x_test, y_test, verbose=VERBOSE)[1]
        except(ValueError):
            self.val_acc = 0

    def mutate(self):
        [block.mutate() for block in self.blocks]

    def createModel(self):
        model = keras.Sequential([keras.Input(shape=INPUT_SHAPE)])

        self.insertModelHiddenLayers(model)
        self.insertModelOutputLayers(model)
        
        return model

    def insertModelHiddenLayers(self, model):
        all_layers = []
        for block in self.blocks:
            all_layers.extend(block.getLayers())
        for layer_list in all_layers:
            [model.add(layer) for layer in layer_list.getLayerList()]

    def insertModelOutputLayers(self, model):
        model.add(layers.Flatten())
        model.add(layers.Dropout(0.5))
        model.add(layers.Dense(num_classes))
        model.add(layers.BatchNormalization(axis=1))
        model.add(layers.Activation(activation='softmax'))

In [9]:
class ENAS:

    def __init__(self):
        self.population = [NeuralNetwork() for _ in range(POP_SIZE)]
        self.best_net = None
        self.acc_history = []
        
        self.initialize()

    def initialize(self):
        self.evaluatePopulation(self.population)
        self.acc_history.append(self.best_net.val_acc)

    def search(self, max_iter):
        for iter in range(max_iter):
            print(f'Current iteration: {iter}')
            new_pop = self.selection()
            self.mutation(new_pop)
            self.evaluatePopulation(new_pop)
            self.succession(new_pop)
            self.acc_history.append(self.best_net.val_acc)
        return self.best_net
    
    def selection(self):
        new_population = []
        for _ in range(POP_SIZE):
            tournament = choices(self.population, k=2)
            if tournament[0].val_acc >= tournament[1].val_acc:
                new_population.append(deepcopy(tournament[0]))
            else:
                new_population.append(deepcopy(tournament[1]))
        return new_population

    def mutation(self, pop):
        [network.mutate() for network in pop]

    def succession(self, new_pop):
        self.population = new_pop
 
    def evaluatePopulation(self, pop):
        for network in pop:
            network.evaluate()
            if self.best_net is None:
                self.best_net = network
            elif network.val_acc > self.best_net.val_acc:
                self.best_net = network

In [10]:
arch_finder = ENAS()
best_ann = arch_finder.search(5)
print(best_ann.val_acc)
print(arch_finder.acc_history)

Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Current iteration: 0
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Current iteration: 1
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Current iteration: 2
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Current iteration: 3
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
Current iteration: 4
Epoch 1/2
Epoch 2/2
Epoch 1/2
Epoch 2/2
0.8794999718666077
[0.8705999851226807, 0.8705999851226807, 0.8709999918937683, 0.8723000288009644, 0.8794999718666077, 0.8794999718666077]
