# Pruning a neural network by evolutionary algorithm

Author: Richard Klem

E-mail: xklemr00@fit.vutbr.cz

Date: 8.5.2022

Copyright 2020 The TensorFlow Datasets Authors, Licensed under the Apache License, Version 2.0
This copyright applies on the sections specifically annotated by _TAKEN FROM TF DEMO_.
Other code is my (see author at the top of the source code) own.

## Libraries
Import required libraries.

In [None]:
import random
from itertools import permutations

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import tensorflow as tf
import tensorflow_datasets as tfds
from adjustText import adjust_text

## Create your input pipeline

Start by building an efficient input pipeline using advices from:
* The [Performance tips](https://www.tensorflow.org/datasets/performances) guide
* The [Better performance with the `tf.data` API](https://www.tensorflow.org/guide/data_performance#optimize_performance) guide

_TAKEN FROM TF DEMO_


### Load a dataset

Load the MNIST dataset with the following arguments:

* `shuffle_files=True`: The MNIST data is only stored in a single file, but for larger datasets with multiple files on disk, it's good practice to shuffle them when training.
* `as_supervised=True`: Returns a tuple `(img, label)` instead of a dictionary `{'image': img, 'label': label}`.

_TAKEN FROM TF DEMO_

In [None]:
# TAKEN FROM TF DEMO
(ds_train, ds_test), ds_info = tfds.load(
    'mnist',
    split=['train', 'test'],
    shuffle_files=True,
    as_supervised=True,
    with_info=True,
)

### Build a training pipeline

Apply the following transformations:

* `tf.data.Dataset.map`: TFDS provide images of type `tf.uint8`, while the model expects `tf.float32`. Therefore, you need to normalize images.
* `tf.data.Dataset.cache` As you fit the dataset in memory, cache it before shuffling for a better performance.<br/>
__Note:__ Random transformations should be applied after caching.
* `tf.data.Dataset.shuffle`: For true randomness, set the shuffle buffer to the full dataset size.<br/>
__Note:__ For large datasets that can't fit in memory, use `buffer_size=1000` if your system allows it.
* `tf.data.Dataset.batch`: Batch elements of the dataset after shuffling to get unique batches at each epoch.
* `tf.data.Dataset.prefetch`: It is good practice to end the pipeline by prefetching [for performance](https://www.tensorflow.org/guide/data_performance#prefetching).

_TAKEN FROM TF DEMO_

In [None]:
# _TAKEN FROM TF DEMO_
def normalize_img(image, label):
    """Normalizes images: `uint8` -> `float32`."""
    return tf.cast(image, tf.float32) / 255., label


ds_train = ds_train.map(
    normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.cache()
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(128)
ds_train = ds_train.prefetch(tf.data.AUTOTUNE)

### Build an evaluation pipeline

Your testing pipeline is similar to the training pipeline with small differences:

 * You don't need to call `tf.data.Dataset.shuffle`.
 * Caching is done after batching because batches can be the same between epochs.

_TAKEN FROM TF DEMO_

In [None]:
# TAKEN FROM TF DEMO
ds_test = ds_test.map(
    normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_test = ds_test.batch(128)
ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.AUTOTUNE)

## Create and train the model
Swish function is smarter ReLU: https://towardsdatascience.com/swish-booting-relu-from-the-activation-function-throne-78f87e5ab6eb

We can use SparseCategoricalAccuracy as we have non-overlapping categories. It will save some computational time and memory.

See architecture bellow.
![SNOWFALL](matlab_schema.png)

In [None]:
def get_model():
    model = tf.keras.models.Sequential([
        tf.keras.layers.Conv2D(filters=3, kernel_size=(3, 3), activation='swish', input_shape=(28, 28, 1)),
        tf.keras.layers.AveragePooling2D(),
        tf.keras.layers.Conv2D(filters=6, kernel_size=(3, 3), activation='swish'),
        tf.keras.layers.AveragePooling2D(),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(10)
    ])
    model.compile(
        optimizer=tf.keras.optimizers.Adam(0.005),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
    )

    model.fit(
        ds_train,
        epochs=5,
        validation_data=ds_test
    )
    return model

model = get_model()

## Model architecture
We can inspect the model architecture in code:

In [None]:
model.summary()

There is analyzed structure from Matlab's Deep Network Analyzer.
You can see we have $150*10=1500$ trainable parameters in the dense layer.
Hence the rest of the model has about 200 parameters, we will aim to reduce insignificant or less significant weights in the dense layer by evolutionary algorithm.
![SNOWFALL](matlab_analyze.png)

## Individual class and some methods

### Individual class
Create a class to represent one individual "solution".

In [None]:
class Individual:
    def __init__(self, threshold):
        super(Individual, self).__init__()
        self.threshold = threshold
        self.acc = 0
        self.w_size = np.inf
        self.fitness = 0

    def __repr__(self):
        return self.__str__()

    def __str__(self):
        return f"threshold={round(self.threshold, 4)}, acc={round(self.acc, 4)}, w_size={self.w_size}, fitness={round(self.fitness, 4)}"

### Fitness
Function to evaluate fitness of the individual based on accuracy and model size.

In [None]:
def get_fitness(size, acc, baseline_w_size, baseline_acc, ):
    return (acc / baseline_acc) ** 2 + (1 - size / baseline_w_size) / 2

### Mating -- cross-mutation
Function to get child from parents â€“ genetic cross-over.

In [None]:
def mating(mother: Individual, father: Individual):
    return Individual((mother.threshold + father.threshold) / 2)

### Eval model size and accuracy
Function to get size and accuracy of the certain model. `w_size` like _weights_size_ as we are optimizing only weights, not biases.

In [None]:
def eval_phenotype(model):
    res = model.evaluate(ds_test, return_dict=True)
    w_size = np.count_nonzero(model.layers[5].weights[0])
    acc = res.get('sparse_categorical_accuracy')
    return w_size, acc

### Population
Function which creates one population based on `population_size`, `min_threshold` and `max_threshold` values.

In [None]:
def get_population(population_size=20, min_threshold=0, max_threshold=np.finfo(np.float64).max):
    # We want to start with rather lower thresholds, so thus the logspace.
    population = np.logspace(min_threshold, max_threshold, population_size - 1, base=16)
    # Normalize the thresholds.
    population = [min_threshold] + list(population / max(population) * max_threshold)
    # Create Individual instances.
    population = [Individual(threshold) for threshold in population]
    return population

### Epoch
Make a function to simulate one epoch of the EA.

In [None]:
def one_epoch(population: [Individual], min_t, max_t, base_w_size, base_acc):
    population_size = len(population)
    num_of_children = int(population_size // 4)
    num_of_randoms = int(population_size // 20)
    num_of_elites = population_size - num_of_children - num_of_randoms
    mating_possibilities = list(permutations(population, 2))
    pairs = random.sample(mating_possibilities, num_of_children)
    children: [Individual] = [mating(x, y) for (x, y) in pairs]

    pop = dict(zip(range(0, population_size), population))

    for one in pop.values():
        weights = orig_weights.copy()
        # Calculate new weights according to the threshold value.
        weights = weights * (abs(weights) > one.threshold)
        # Update the model.
        model.layers[5].set_weights([weights, orig_biases])
        # Evaluate the new model with the new weights.
        one.w_size, one.acc = eval_phenotype(model)
        one.fitness = get_fitness(one.w_size, one.acc, base_w_size, base_acc)

    # Filter elite individuals - the best ones.
    elites: [Individual] = list(dict(sorted(pop.items(), key=lambda x: x[1].fitness, reverse=True)[:num_of_elites]).values())

    # Get some randomly set individual(s) to discover something new.
    randoms: [Individual] = []
    for one in random.sample(list(pop.values()), num_of_randoms):
        print(one.threshold)
        one.threshold = np.random.uniform(min_t, max_t)
        randoms.append(one)
        print(one.threshold)

    new_population = elites + children + randoms
    return new_population

### Evolution
Function that runs N epochs of the EA.

In [None]:
def run(population, min_t, max_t, base_w_size, base_acc, epochs=30):
    for i in range(epochs):
        population = one_epoch(population, min_t, max_t, base_w_size, base_acc)
    return population

## Init a population and required variables
Create an initial population and set required variables.

In [None]:
base_w_size, base_acc = eval_phenotype(model)
orig_weights = np.array(model.layers[5].weights[0])
orig_biases = np.array(model.layers[5].weights[1])
weights = orig_weights.copy()
model.layers[5].set_weights([weights, orig_biases])

population_size = 20
min_threshold = 1e-6
max_threshold = max(abs(np.amin(orig_weights)), abs(np.amax(orig_weights)))
max_size = weights.shape[0] * weights.shape[1]

population = get_population(population_size, min_threshold, max_threshold)

## Run the evolution
Let's run 30 epochs of the EA. Each time, the population is updated according to the previously defined methods.

In [None]:
epochs = 30
population = run(population, min_threshold, max_threshold, base_w_size, base_acc, epochs=epochs)

## Pareto front
Create data for plotting a pareto front.
We plot top 75% solutions, the rest 25% are new children, so we cannot plot them as they were not evaluated.
We want to see points on the top, on the right and on the frontier which will be facing towards top-right corner.

In [None]:
output = list(sorted(population, key=lambda x: x.fitness, reverse=True)[:int(population_size * 0.75)])
labels = ['acc', 'w_size', 'fitness', 'threshold']
data = []

for ind in output:
    if ind.w_size > 10 and ind.acc > 0.5:
        data.append([ind.acc, max_size/ind.w_size, ind.fitness, ind.threshold])

data = pd.DataFrame(data, columns=labels)
plt.title("Pareto front")
plt.xlabel("Accuracy")
plt.ylabel("Compression")

x_labels = []
for i in range(len(data['threshold'])):
    x_labels.append(plt.text(data['acc'][i], data['w_size'][i], str(round(data['threshold'][i], 2))))

sns.scatterplot(data=data, x='acc', y='w_size')
adjust_text(x_labels, x=list(data['acc']), y=list(data['w_size']), force_points=1.5,
            arrowprops=dict(arrowstyle='->', color='red'))

## Statistics
Now we can run the code on the 10 different models and see if the EA is producing consistent outputs.

In [None]:
num_of_runs = 10
def run_multiple(num_of_runs=10):
    boxplot_accs = []
    boxplot_w_sizes = []
    boxplot_fitnesses = []
    for i in range(num_of_runs):
        # 1. Init model
        model = get_model()
        # 2. Init variables
        base_w_size, base_acc = eval_phenotype(model)
        orig_weights = np.array(model.layers[5].weights[0])
        orig_biases = np.array(model.layers[5].weights[1])
        weights = orig_weights.copy()
        model.layers[5].set_weights([weights, orig_biases])
        population_size = 20
        min_threshold = 1e-6
        max_threshold = max(abs(np.amin(orig_weights)), abs(np.amax(orig_weights)))
        max_size = weights.shape[0] * weights.shape[1]

        population = get_population(population_size, min_threshold, max_threshold)

        # 3. Run EA in N epochs
        epochs = 30
        population = run(population, min_threshold, max_threshold, base_w_size, base_acc, epochs=epochs)
        # 4. Save stats
        output = list(sorted(population, key=lambda x: x.fitness, reverse=True)[:int(population_size *0.75)])
        labels = ['acc', 'w_size', 'fitness', 'threshold']
        data = []

        for ind in output:
            if ind.w_size > 10:
                data.append([ind.acc, max_size/ind.w_size, ind.fitness, ind.threshold])

        data = pd.DataFrame(data, columns=labels)
        boxplot_accs = boxplot_accs + list(data['acc'])
        boxplot_w_sizes = boxplot_w_sizes + list(data['w_size'])
        boxplot_fitnesses = boxplot_fitnesses + list(data['fitness'])
    return boxplot_accs, boxplot_w_sizes, boxplot_fitnesses

boxplot_accs, boxplot_w_sizes, boxplot_fitnesses = run_multiple(num_of_runs=num_of_runs)

## Boxplot accuracy, Compression and Fitness values

In [None]:
fig, axes = plt.subplots(1, 3, constrained_layout=True)
axes[0].boxplot([np.array(boxplot_accs).transpose().flatten()], notch=True)
axes[0].set_title('Accuracy')
axes[1].boxplot([max_size / np.array(boxplot_w_sizes).transpose().flatten()], notch=True)
axes[1].set_title('Compression ratio')
axes[2].boxplot([np.array(boxplot_fitnesses).transpose().flatten()], notch=True)
axes[2].set_title('Fitness')
plt.suptitle(f'Collected values fror {num_of_runs} runs, each of {epochs} epochs.')

for ax in axes:
    ax.tick_params(bottom=False, labelbottom=False)