In [13]:
import torch
torch.cuda.is_available(), torch.cuda.get_device_name(0)
!nvidia-smi
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# !watch -n 1 nvidia-smi


In [14]:
# model_cnn.py
import torch.nn as nn

class CNN(nn.Module):
    def __init__(self, genes, input_channels=3, num_classes=10, input_size=32):
        super(CNN, self).__init__()

        layers = []
        in_channels = input_channels
        current_size = input_size

        for i, conv_config in enumerate(genes['conv_configs']):
            filters = conv_config['filters']
            kernel_size = conv_config['kernel_size']
            padding = kernel_size // 2

            layers.append(nn.Conv2d(in_channels, filters, kernel_size, padding=padding))
            layers.append(nn.BatchNorm2d(filters))

            if genes['activation'] == 'relu':
                layers.append(nn.ReLU(inplace=True))
            else:
                layers.append(nn.LeakyReLU(0.1, inplace=True))

            if (i + 1) % 2 == 0 or i == len(genes['conv_configs']) - 1:
                if genes['pool_type'] == 'max':
                    layers.append(nn.MaxPool2d(2, 2))
                else:
                    layers.append(nn.AvgPool2d(2, 2))
                current_size = current_size // 2

            in_channels = filters

        self.features = nn.Sequential(*layers)
        self.flat_size = in_channels * current_size * current_size

        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(self.flat_size, genes['fc_units']),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(genes['fc_units'], num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

In [15]:
# model_ga.py

import torch, random, os, json
import torch.nn as nn
from torch.optim import AdamW
from copy import deepcopy

# from model_cnn import CNN

# Define the search space for CNN architecture
class CNNSearchSpace:
    def __init__(self):
        self.conv_layers = [1, 2, 3, 4]
        self.filters = [16, 32, 64, 128]
        self.kernel_sizes = [3, 5, 7]
        self.pool_types = ['max', 'avg']
        self.activations = ['relu', 'leaky_relu']
        self.fc_units = [64, 128, 256, 512]

# Encode architecture as a chromosome (gene representation)
class Architecture:
    def __init__(self, genes=None):
        if genes is None:
            self.genes = self.random_genes()
        else:
            self.genes = genes
        self.fitness = 0
        self.accuracy = 0
        self.best_epoch = 0

    def random_genes(self):
        space = CNNSearchSpace()
        num_conv = random.choice(space.conv_layers)

        genes = {
            'num_conv': num_conv,
            'conv_configs': [],
            'pool_type': random.choice(space.pool_types),
            'activation': random.choice(space.activations),
            'fc_units': random.choice(space.fc_units)
        }

        for _ in range(num_conv):
            genes['conv_configs'].append({
                'filters': random.choice(space.filters),
                'kernel_size': random.choice(space.kernel_sizes)
            })

        return genes

    def __repr__(self):
        return f"Arch(conv={self.genes['num_conv']}, acc={self.accuracy:.4f})"

# Genetic Algorithm Operations
class GeneticAlgorithm:
    def __init__(self, population_size=20, generations=10, mutation_rate=0.2, crossover_rate=0.7):
        self.population_size = population_size
        self.generations = generations
        self.mutation_rate = mutation_rate
        self.crossover_rate = crossover_rate
        self.population = []
        self.best_architecture = None
        self.search_space = CNNSearchSpace()

    def initialize_population(self):
        self.population = [Architecture() for _ in range(self.population_size)]

    def evaluate_fitness(self, architecture, train_loader, val_loader, device, epochs=100):
        """Train and evaluate a single architecture"""
        try:
            model = CNN(architecture.genes).to(device)
            criterion = nn.CrossEntropyLoss()
            optimizer = AdamW(model.parameters(), lr=0.001)

            # Quick training
            best_acc = 0
            patience = 10
            step = 1
            best_epoch = 1
            for epoch in range(1, epochs+1):
                model.train()
                for inputs, labels in train_loader:
                    inputs, labels = inputs.to(device), labels.to(device)
                    optimizer.zero_grad()
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)
                    loss.backward()
                    optimizer.step()

                # Evaluation
                model.eval()
                correct = 0
                # total = 0
                with torch.no_grad():
                    for inputs, labels in val_loader:
                        inputs, labels = inputs.to(device), labels.to(device)
                        outputs = model(inputs)
                        _, predicted = torch.max(outputs.data, 1)
                        # total += labels.size(0)
                        correct += (predicted == labels).sum().item()

                accuracy = correct / len(val_loader.dataset)
                if accuracy > best_acc:
                    step = 0
                    best_acc = accuracy
                    best_epoch = epoch
                else:
                    step += 1
                if step >= patience:
                    break

            # Calculate model complexity penalty

            conv_params = 0
            fc_params = 0

            for module in model.modules():
                if isinstance(module, nn.Conv2d):
                    conv_params += sum(p.numel() for p in module.parameters())

                elif isinstance(module, nn.Linear):
                    fc_params += sum(p.numel() for p in module.parameters())


            # Question 1 - b
            w_conv = 0.7
            w_fc = 0.3

            penalty = (w_conv * conv_params + w_fc * fc_params) * 1e-6

            architecture.accuracy = best_acc
            architecture.best_epoch = best_epoch
            architecture.fitness = best_acc - penalty

            old_num_params = sum(p.numel() for p in model.parameters())
            old_penalty = old_num_params / 1e6  # Normalize

            del model, inputs, outputs, labels
            torch.cuda.empty_cache()

            # Fitness = accuracy - lambda * complexity
            # architecture.accuracy = best_acc
            # architecture.best_epoch = best_epoch
            # architecture.fitness = best_acc - 0.01 * penalty

            new_penalty = penalty
            difference = new_penalty - old_penalty

            print("\n[Q1B] Architecture Complexity Report:")
            print(f"Conv Parameters: {conv_params}")
            print(f"FC Parameters: {fc_params}")
            print(f"Old Penalty (all params equally):{old_penalty:.8f}")
            print(f"New Penalty (weighted): {new_penalty:.8f}")
            print(f"Difference (New - Old): {difference:.8f}")
            print(f"Final Fitness: {architecture.fitness:.6f}\n")

            return architecture.fitness

        except Exception as e:
            print(f"Error evaluating architecture: {e}", flush=True)
            architecture.fitness = 0
            architecture.accuracy = 0
            return 0

    def selection(self):
        # """Tournament selection"""
        # tournament_size = 3
        # selected = []

        # for _ in range(self.population_size):
        #     tournament = random.sample(self.population, tournament_size)
        #     winner = max(tournament, key=lambda x: x.fitness)
        #     selected.append(winner)


        # Question 1A
        """Roulette Wheel selection"""
        k = 2
        fitnesses = [c.fitness for c in self.population]
        total = sum(fitnesses)
        if total <= 0:
            return random.sample(range(len(fitnesses)), k)

        probs = [f / total for f in fitnesses]

        cumulative = []
        s = 0
        for p in probs:
            s += p
            cumulative.append(s)

        selected = []
        for _ in range(k):
            r = random.random()
            for idx, c in enumerate(cumulative):
                if r <= c:
                    selected.append(idx)
                    break

        print("\n[Q1A] Roulette Wheel Selection Details:")
        print("Index | Fitness | Probability | Cumulative")
        for i, (fit, prob, cum) in enumerate(zip(fitnesses, probs, cumulative)):
            print(f"{i:5d} | {fit:.6f} | {prob:.6f} | {cum:.6f}")
        print()

        return selected

    def crossover(self, parent1, parent2):
        """Single-point crossover for architectures"""
        if random.random() > self.crossover_rate:
            return deepcopy(parent1), deepcopy(parent2)

        child1_genes = deepcopy(parent1.genes)
        child2_genes = deepcopy(parent2.genes)

        # Crossover number of conv layers and pool type
        if random.random() < 0.5:
            child1_genes['num_conv'], child2_genes['num_conv'] = child2_genes['num_conv'], child1_genes['num_conv']

        # Crossover pool type and activation
        if random.random() < 0.5:
            child1_genes['pool_type'], child2_genes['pool_type'] = child2_genes['pool_type'], child1_genes['pool_type']
            child1_genes['activation'], child2_genes['activation'] = child2_genes['activation'], child1_genes['activation']

        # Adjust conv_configs to match num_conv
        min_len = min(child1_genes['num_conv'], len(child1_genes['conv_configs']))
        child1_genes['conv_configs'] = child1_genes['conv_configs'][:min_len]
        while len(child1_genes['conv_configs']) < child1_genes['num_conv']:
            child1_genes['conv_configs'].append({
                'filters': random.choice(self.search_space.filters),
                'kernel_size': random.choice(self.search_space.kernel_sizes)
            })

        min_len = min(child2_genes['num_conv'], len(child2_genes['conv_configs']))
        child2_genes['conv_configs'] = child2_genes['conv_configs'][:min_len]
        while len(child2_genes['conv_configs']) < child2_genes['num_conv']:
            child2_genes['conv_configs'].append({
                'filters': random.choice(self.search_space.filters),
                'kernel_size': random.choice(self.search_space.kernel_sizes)
            })

        return Architecture(child1_genes), Architecture(child2_genes)

    def mutation(self, architecture):
        """Mutate architecture genes"""
        if random.random() > self.mutation_rate:
            return architecture

        genes = deepcopy(architecture.genes)
        mutation_type = random.choice(['conv_param', 'num_layers', 'pool_activation', 'fc_units'])

        if mutation_type == 'conv_param' and genes['conv_configs']:
            # Mutate a random conv layer
            idx = random.randint(0, len(genes['conv_configs']) - 1)
            genes['conv_configs'][idx]['filters'] = random.choice(self.search_space.filters)
            genes['conv_configs'][idx]['kernel_size'] = random.choice(self.search_space.kernel_sizes)

        elif mutation_type == 'num_layers':
            # Change number of conv layers
            genes['num_conv'] = random.choice(self.search_space.conv_layers)
            # Adjust conv_configs
            if genes['num_conv'] > len(genes['conv_configs']):
                for _ in range(genes['num_conv'] - len(genes['conv_configs'])):
                    genes['conv_configs'].append({
                        'filters': random.choice(self.search_space.filters),
                        'kernel_size': random.choice(self.search_space.kernel_sizes)
                    })
            else:
                genes['conv_configs'] = genes['conv_configs'][:genes['num_conv']]

        elif mutation_type == 'pool_activation':
            genes['pool_type'] = random.choice(self.search_space.pool_types)
            genes['activation'] = random.choice(self.search_space.activations)

        elif mutation_type == 'fc_units':
            genes['fc_units'] = random.choice(self.search_space.fc_units)

        return Architecture(genes)

    def evolve(self, train_loader, val_loader, device, run=1):
        parent = os.path.abspath('')
        """Main evolutionary loop"""
        self.initialize_population()
        print(f"Starting with {self.population_size} Population:\n{self.population}\n", flush=True)

        for generation in range(self.generations):
            print(f"\n{'='*60}", flush=True)
            print(f"Generation {generation + 1}/{self.generations}", flush=True)
            print(f"{'='*60}", flush=True)

            # Evaluate fitness
            for i, arch in enumerate(self.population):
                print(f"Evaluating architecture {i+1}/{self.population_size}...", end=' ', flush=True)
                fitness = self.evaluate_fitness(arch, train_loader, val_loader, device)
                print(f"Fitness: {fitness:.4f}, Accuracy: {arch.accuracy:.4f}", flush=True)

            # Sort by fitness score
            print(f"\nSorting population in terms of fitness score (high -> low) ...", flush=True)
            self.population.sort(key=lambda x: x.fitness, reverse=True)

            # Track best
            if self.best_architecture is None or self.population[0].fitness > self.best_architecture.fitness:
                self.best_architecture = deepcopy(self.population[0])

            print(f"Best in generation: {self.population[0]}\n", flush=True)
            print(f"Best overall: {self.best_architecture}", flush=True)

            # Selection
            print(f"\nPerforming tournament selection of total population: {self.population_size} ...", flush=True)
            selected = self.selection()

            # Crossover and Mutation
            print(f"Performing Crossover & Mutation ...", flush=True)
            next_generation = []

            # Elitism: keep top 2 architectures
            print(f"Elitism: Keeping top 2 architectures in next generation.", flush=True)
            next_generation.extend([deepcopy(self.population[0]), deepcopy(self.population[1])])

            while len(next_generation) < self.population_size:
                idx1 = random.choice(selected)
                idx2 = random.choice(selected)

                parent1 = self.population[idx1]
                parent2 = self.population[idx2]

                child1, child2 = self.crossover(parent1, parent2)
                child1 = self.mutation(child1)
                child2 = self.mutation(child2)
                # parent1 = random.choice(selected)
                # parent2 = random.choice(selected)

                # child1, child2 = self.crossover(parent1, parent2)
                # child1 = self.mutation(child1)
                # child2 = self.mutation(child2)

                next_generation.append(child1)
                if len(next_generation) < self.population_size:
                    next_generation.append(child2)

            self.population = next_generation
            print(f"Next Generation: {self.population}", flush=True)
            with open(os.path.join(parent, 'outputs', f'run_{run}', f"generation_{generation}.jsonl"), 'w') as f:
                for obj in self.population:
                    f.write(json.dumps(obj.genes))

        return self.best_architecture

In [16]:
# nas_run.py

import torch, sys, os, pickle
import torchvision.transforms as T
from torch.utils.data import DataLoader, Subset
from torchvision.datasets import CIFAR10

# from model_ga import GeneticAlgorithm
# from model_cnn import CNN

# if __name__ == "__main__":

parent = os.path.abspath('')
if not os.path.exists(os.path.join(parent, 'outputs')):
    os.mkdir(os.path.join(parent, 'outputs'))
all_logs = [i for i in os.listdir(os.path.join(parent, 'outputs')) if 'log' in i]
os.mkdir(os.path.join(parent, 'outputs', f'run_{len(all_logs)+1}'))

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

sys.stdout = open(os.path.join(parent, 'outputs', f'run_{len(all_logs)+1}', f'nas_run.log'), 'w')

print(f"Using device: {device}", flush=True)

# Load CIFAR-10 dataset (reduced for faster NAS)
transform = T.Compose([
    T.ToTensor(),
    T.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

trainset = CIFAR10(root='./data', train=True, download=True, transform=transform)
valset = CIFAR10(root='./data', train=False, download=True, transform=transform)

# Use only 5000 samples for quick NAS
train_subset = Subset(trainset, range(5000))
val_subset = Subset(valset, range(1000))

train_loader = DataLoader(train_subset, batch_size=256, shuffle=True)
val_loader = DataLoader(val_subset, batch_size=256, shuffle=False)

# Run NAS with GA
ga = GeneticAlgorithm(
    population_size=10,  # Small population for demonstration
    generations=5,       # Few generations for quick results
    mutation_rate=0.3,
    crossover_rate=0.7
)

best_arch = ga.evolve(train_loader, val_loader, device, run=len(all_logs)+1)

print(f"\n{'='*60}", flush=True)
print("FINAL BEST ARCHITECTURE", flush=True)
print(f"{'='*60}", flush=True)
print(f"Genes: {best_arch.genes}", flush=True)
print(f"Accuracy: {best_arch.accuracy:.4f}", flush=True)
print(f"Fitness: {best_arch.fitness:.4f}", flush=True)

# Build and test final model
final_model = CNN(best_arch.genes).to(device)
print(f"\nTotal parameters: {sum(p.numel() for p in final_model.parameters()):,}", flush=True)
print(f"\nModel architecture:\n{final_model}", flush=True)

with open(os.path.join(parent, 'outputs', f'run_{len(all_logs)+1}', f"best_arch.pkl"), 'wb') as f:
    pickle.dump(best_arch, f)

sys.stdout = sys.__stdout__