In [None]:
!pip install deap

Collecting deap
  Downloading deap-1.4.1-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (13 kB)
Downloading deap-1.4.1-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (135 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/135.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.4/135.4 kB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: deap
Successfully installed deap-1.4.1


In [None]:
import tensorflow as tf
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.utils import to_categorical
from deap import base, creator, tools
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# Set random seeds for reproducibility
random.seed(42)
np.random.seed(42)
tf.random.set_seed(42)
torch.manual_seed(42)

# Data Loading, Preprocessing, and Splitting Function
def load_data(limit_samples=10000):
    print("Loading and preprocessing CIFAR-10 dataset...")
    (x_train, y_train), (x_test, y_test) = cifar10.load_data()

    # Normalize pixel values to [0, 1]
    x_train = x_train.astype('float32') / 255.0
    x_test = x_test.astype('float32') / 255.0

    # Convert labels to one-hot encoding
    y_train = to_categorical(y_train, 10)
    y_test = to_categorical(y_test, 10)

    # Limit the training dataset size
    x_train = x_train[:limit_samples]
    y_train = y_train[:limit_samples]

    # Perform 60/20/20 split
    total_samples = len(x_train)
    train_size = int(0.6 * total_samples)
    val_size = int(0.2 * total_samples)

    # Split the data
    x_train_split = x_train[:train_size]
    y_train_split = y_train[:train_size]
    x_val_split = x_train[train_size:train_size + val_size]
    y_val_split = y_train[train_size:train_size + val_size]
    x_test_split = x_test[:val_size]
    y_test_split = y_test[:val_size]

    print(f"Data split into: {train_size} train, {val_size} validation, {len(x_test_split)} test samples.")

    # Convert to PyTorch tensors
    train_dataset = TensorDataset(
        torch.tensor(x_train_split).permute(0, 3, 1, 2).float(),
        torch.tensor(np.argmax(y_train_split, axis=1)).long()
    )
    val_dataset = TensorDataset(
        torch.tensor(x_val_split).permute(0, 3, 1, 2).float(),
        torch.tensor(np.argmax(y_val_split, axis=1)).long()
    )
    test_dataset = TensorDataset(
        torch.tensor(x_test_split).permute(0, 3, 1, 2).float(),
        torch.tensor(np.argmax(y_test_split, axis=1)).long()
    )

    # Create DataLoaders
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    return train_loader, val_loader, test_loader

# Load data with a limit of 10,000 samples for training and validation
train_loader, val_loader, test_loader = load_data(limit_samples=30000)

# Define DEAP's Fitness and Individual
if "FitnessMulti" not in creator.__dict__:
    creator.create("FitnessMulti", base.Fitness, weights=(2.0, -0.5, -1.0))

if "Individual" not in creator.__dict__:
    creator.create("Individual", list, fitness=creator.FitnessMulti)

# Generate random CNN architecture
def generate_individual():
    layers = []
    num_layers = random.randint(3, 10)  # 3-10 layers in the architecture

    for _ in range(num_layers):
        layer_type = random.choice(['conv', 'batchnorm', 'dropout', 'dense'])

        if layer_type == 'conv':
            layers.append({
                'type': 'conv',
                'filters': random.choice([16, 32, 64]),
                'kernel_size': random.choice([3, 5]),
                'stride': random.choice([1, 2]),
                'activation': random.choice(['relu', 'tanh'])
            })
        elif layer_type == 'batchnorm':
            layers.append({'type': 'batchnorm'})
        elif layer_type == 'dropout':
            layers.append({'type': 'dropout', 'rate': random.uniform(0.1, 0.5)})
        elif layer_type == 'dense':
            layers.append({
                'type': 'dense',
                'units': random.choice([64, 128, 256]),
                'activation': random.choice(['relu', 'tanh'])
            })

    # Ensure a dense layer at the end
    layers.append({'type': 'dense', 'units': 10, 'activation': 'softmax'})

    return creator.Individual(layers)

# Define CNN Model
class CustomCNN(nn.Module):
    def __init__(self, architecture):
        super(CustomCNN, self).__init__()
        layers = []
        in_channels = 3
        current_height, current_width = 32, 32
        flattened = False

        for layer in architecture:
            if layer['type'] == 'conv' and not flattened:
                kernel_size = layer['kernel_size']
                stride = layer['stride']
                padding = (kernel_size - 1) // 2

                layers.append(
                    nn.Conv2d(in_channels, layer['filters'], kernel_size=kernel_size, stride=stride, padding=padding)
                )
                if layer['activation'] == 'relu':
                    layers.append(nn.ReLU())
                elif layer['activation'] == 'tanh':
                    layers.append(nn.Tanh())

                in_channels = layer['filters']
                current_height = (current_height + 2 * padding - kernel_size) // stride + 1
                current_width = (current_width + 2 * padding - kernel_size) // stride + 1

            elif layer['type'] == 'batchnorm' and not flattened:
                layers.append(nn.BatchNorm2d(in_channels))

            elif layer['type'] == 'dropout':
                layers.append(nn.Dropout(layer['rate']))

            elif layer['type'] == 'dense':
                if not flattened:
                    layers.append(nn.Flatten())
                    flattened = True
                    in_features = current_height * current_width * in_channels

                layers.append(nn.Linear(in_features, layer['units']))
                in_features = layer['units']

                if layer['activation'] == 'relu':
                    layers.append(nn.ReLU())
                elif layer['activation'] == 'tanh':
                    layers.append(nn.Tanh())

        if not flattened:
            layers.append(nn.Flatten())
            in_features = current_height * current_width * in_channels
        layers.append(nn.Linear(in_features, 10))

        self.model = nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)

# Define Mutation
def mutate(individual):
    if random.random() < 0.5:
        layer_idx = random.randint(0, len(individual) - 2)
        layer = individual[layer_idx]
        if layer['type'] == 'conv':
            layer['filters'] = random.choice([16, 32, 64])
            layer['kernel_size'] = random.choice([3, 5])
            layer['stride'] = random.choice([1, 2])
            layer['activation'] = random.choice(['relu', 'tanh'])
        elif layer['type'] == 'dropout':
            layer['rate'] = random.uniform(0.1, 0.5)
        elif layer['type'] == 'dense':
            layer['units'] = random.choice([64, 128, 256])
            layer['activation'] = random.choice(['relu', 'tanh'])
    else:
        if random.random() < 0.5 and len(individual) > 3:
            layer_idx = random.randint(0, len(individual) - 2)
            del individual[layer_idx]
        else:
            new_layer = random.choice(generate_individual()[:-1])
            individual.insert(random.randint(0, len(individual) - 1), new_layer)
    return creator.Individual(individual)

# Define Crossover
def crossover(ind1, ind2):
    point = random.randint(1, min(len(ind1), len(ind2)) - 1)
    child1 = creator.Individual(ind1[:point] + ind2[point:])
    child2 = creator.Individual(ind2[:point] + ind1[point:])
    return child1, child2

# Register Mutation and Crossover
toolbox = base.Toolbox()
toolbox.register("individual", tools.initIterate, creator.Individual, generate_individual)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("mate", crossover)
toolbox.register("mutate", mutate)

# Evaluate Individual Fitness
def evaluate_cnn(individual):
    model = CustomCNN(individual).to("cpu")
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    for epoch in range(1):  # Train for 1 epoch
        model.train()
        for x_batch, y_batch in train_loader:
            optimizer.zero_grad()
            outputs = model(x_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()

    model.eval()
    correct = 0
    total = 0
    val_loss = 0.0
    with torch.no_grad():
        for x_batch, y_batch in val_loader:
            outputs = model(x_batch)
            val_loss += criterion(outputs, y_batch).item()
            _, predicted = outputs.max(1)
            total += y_batch.size(0)
            correct += (predicted == y_batch).sum().item()

    accuracy = correct / total
    complexity = sum(p.numel() for p in model.parameters())
    return accuracy, complexity, val_loss / len(val_loader)

toolbox.register("evaluate", evaluate_cnn)

# Lexicase Selection
def lexicase_selection(population, fitnesses):
    indices = list(range(len(population)))
    objectives = len(fitnesses[0])
    while len(indices) > 1:
        obj = random.randint(0, objectives - 1)
        best_value = max(fitnesses[i][obj] for i in indices)
        indices = [i for i in indices if fitnesses[i][obj] == best_value]
    return population[indices[0]]

# Evolutionary Algorithm
def evolutionary_algorithm_lexicase(n_gen=3, pop_size=5, cxpb=0.7, mutpb=0.2):
    population = toolbox.population(n=pop_size)
    for gen in range(n_gen):
        print(f"\n-- Generation {gen} --")
        fitnesses = list(map(toolbox.evaluate, population))

        for ind, fit in zip(population, fitnesses):
            ind.fitness.values = fit
            print(f"Individual Fitness: {fit}")

        offspring = []
        for _ in range(len(population)):
            selected = lexicase_selection(population, fitnesses)
            offspring.append(selected)

        for i in range(1, len(offspring), 2):
            if random.random() < cxpb:
                offspring[i - 1], offspring[i] = toolbox.mate(offspring[i - 1], offspring[i])
        for i in range(len(offspring)):
            if random.random() < mutpb:
                offspring[i] = toolbox.mutate(offspring[i])

        population[:] = offspring

    return tools.selBest(population, 1)[0]

# Run Evolutionary Algorithm
best_architecture_lexicase = evolutionary_algorithm_lexicase()
print("Best architecture using lexicase selection:", best_architecture_lexicase)


Loading and preprocessing CIFAR-10 dataset...
Data split into: 18000 train, 6000 validation, 6000 test samples.

-- Generation 0 --

-- Generation 1 --

-- Generation 2 --
Best architecture using lexicase selection: [{'type': 'conv', 'filters': 16, 'kernel_size': 5, 'stride': 2, 'activation': 'relu'}, {'type': 'dense', 'units': 128, 'activation': 'tanh'}, {'type': 'batchnorm'}, {'type': 'dropout', 'rate': 0.3110695149673016}, {'type': 'dropout', 'rate': 0.19204589303863082}, {'type': 'conv', 'filters': 16, 'kernel_size': 3, 'stride': 2, 'activation': 'tanh'}, {'type': 'dropout', 'rate': 0.12647544208461892}, {'type': 'dropout', 'rate': 0.18505061762165625}, {'type': 'dense', 'units': 10, 'activation': 'softmax'}]


In [None]:
def test_model(architecture, train_loader, test_loader):
    """
    Fine-tune and evaluate the model on the test set.

    Parameters:
    - architecture: The CNN architecture to be tested.
    - train_loader: DataLoader for the training set.
    - test_loader: DataLoader for the test set.

    Prints:
    - Test accuracy after fine-tuning.
    """
    model = CustomCNN(architecture).to("cpu")
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    print("Fine-tuning the model on the training set...")

    # Train the model
    for epoch in range(5):  # Fine-tune for 5 epochs
        model.train()
        running_loss = 0.0
        for x_batch, y_batch in train_loader:
            optimizer.zero_grad()
            outputs = model(x_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        print(f"Epoch {epoch + 1}, Loss: {running_loss / len(train_loader):.4f}")

    print("Evaluating the model on the test set...")

    # Evaluate on the test set
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for x_batch, y_batch in test_loader:
            outputs = model(x_batch)
            _, predicted = outputs.max(1)
            total += y_batch.size(0)
            correct += (predicted == y_batch).sum().item()

    accuracy = correct / total
    print(f"Test Accuracy: {accuracy:.4f}")
    return accuracy

# Test the best architecture from the evolutionary algorithm
print("Testing the best architecture from evolutionary algorithm...")
test_accuracy = test_model(best_architecture_lexicase, train_loader, test_loader)
print(f"Final Test Accuracy: {test_accuracy:.4f}")


Testing the best architecture from evolutionary algorithm...
Fine-tuning the model on the training set...
Epoch 1, Loss: 1.8576
Epoch 2, Loss: 1.5770
Epoch 3, Loss: 1.4859
Epoch 4, Loss: 1.4005
Epoch 5, Loss: 1.3461
Evaluating the model on the test set...
Test Accuracy: 0.5198
Final Test Accuracy: 0.5198
