In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import numpy as np
from torch.utils.data import DataLoader
from scipy.stats import truncnorm
import random

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cuda


In [2]:
class CNN(nn.Module):
    def __init__(self, config, num_classes=10):
        super(CNN, self).__init__()
        self.layers = nn.ModuleList()
        in_channels = 1 
        current_h, current_w = 28, 28 

        for i in range(config['num_conv_layers']):
            out_channels = config['filters'][i]
            kernel_size = config['filter_sizes'][i]
            padding = kernel_size // 2
						# prevent kernel size > feature size 
            actual_kernel_size = min(kernel_size, current_h, current_w)
            actual_padding = actual_kernel_size // 2

            conv = nn.Conv2d(in_channels, out_channels, actual_kernel_size, padding=actual_padding)
            self.layers.append(conv)
            if config['conv_activations'][i] == 1:  # relu
                self.layers.append(nn.ReLU())
            elif config['conv_activations'][i] == 2:  # tanh
                self.layers.append(nn.Tanh())
            elif config['conv_activations'][i] == 3:  # linear
                self.layers.append(nn.Identity())
            elif config['conv_activations'][i] == 4:  # sigmoid
                self.layers.append(nn.Sigmoid())

            if config['pooling_types'][i] != 0: # 0 no, 1 max, 2 avg 
                pool_size = config['pool_sizes'][i]
                actual_pool_size = min(pool_size, current_h, current_w)
                
                if actual_pool_size >= 2 :
                    if config['pooling_types'][i] == 1:
                        self.layers.append(nn.MaxPool2d(actual_pool_size))
                    elif config['pooling_types'][i] == 2:
                         self.layers.append(nn.AvgPool2d(actual_pool_size))
                    
                    current_h //= actual_pool_size
                    current_w //= actual_pool_size
                    if current_h == 0 or current_w == 0:
                        raise ValueError("Invalid hehehe pooling")
            in_channels = out_channels

        self.layers.append(nn.Flatten())
        with torch.no_grad():
            dummy_input = torch.zeros(1, 1, 28, 28) 
            temp_layers_for_flatten_calc = nn.Sequential(*[l for l in self.layers if not isinstance(l, nn.Flatten)])
            x_conv_out = temp_layers_for_flatten_calc(dummy_input)
            flatten_layer_temp = nn.Flatten()
            flattened_output_temp = flatten_layer_temp(x_conv_out)
            flattened_size = flattened_output_temp.shape[1]

            if flattened_size == 0:
                 raise ValueError("Flattened size is 0")

        in_features_dense = flattened_size
        for i in range(config['num_dense_layers']):
            out_features = config['dense_units'][i]
            dense = nn.Linear(in_features_dense, out_features)
            self.layers.append(dense)

            if i < config['num_dense_layers'] : # apply activation to all hidden dense layers except output layer 
                if config['dense_activations'][i] == 1:
                    self.layers.append(nn.ReLU())
                elif config['dense_activations'][i] == 2:
                    self.layers.append(nn.Tanh())
                elif config['dense_activations'][i] == 3:
                    self.layers.append(nn.Identity())
                elif config['dense_activations'][i] == 4:
                    self.layers.append(nn.Sigmoid())
            in_features_dense = out_features

        self.output_layer_final = nn.Linear(in_features_dense, num_classes)


    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        x = self.output_layer_final(x) 
        return x


In [3]:
class HSA:
    def __init__(self, hms=10, hmcr=0.85, par=0.3, bw_factor=0.1, max_improvs=50, dataset_name='digits'):
        self.hms = hms  # Harmony Memory Size
        self.hmcr = hmcr  # Harmony Memory Considering Rate
        self.par = par  # Pitch Adjusting Rate
        self.bw_factor = bw_factor # Bandwidth factor for pitch adjustment of continuous-like params
        self.max_improvs = max_improvs
        self.dataset_name = dataset_name

        transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.5,), (0.5,))
        ])
        self.train_dataset = torchvision.datasets.EMNIST(root='./data', split=dataset_name,train=True, transform=transform, download=True)
        self.test_dataset = torchvision.datasets.EMNIST(root='./data', split=dataset_name,train=False, transform=transform)
        self.num_classes = len(self.train_dataset.classes)


        self.hyperparameter_ranges = {
            'num_epochs': (1, 49), 
            'batch_size': [32, 64, 128, 256],
            'num_conv_layers': (1, 9), 
            'filters': (1, 65), 
            'filter_sizes': [3, 5],  
            'conv_activations': (1, 4),  # 1: ReLU, 2: Tanh, 3: Linear, 4: Sigmoid
            'pooling_types': (0, 2),  #0 none, 1 max, 2 avg
            'pool_sizes': [2, 3], # Pooling kernel sizes
            'num_dense_layers': (1,9),
            'dense_units': (1, 65), # Range for units in dense layers
            'dense_activations': (1, 4),
            'optimizer': (1, 4),  # 1: SGD, 2: Adam, 3: RMSprop, 4: Adadelta
            'learning_rate': [0.0001, 0.001, 0.01, 0.1] # Discrete choices
        }
        self.harmony_memory = [] 
    def _generate_random_value(self, key):
        if key == 'batch_size' or key == 'filter_sizes' or key == 'pool_sizes' or key == 'learning_rate':
            return random.choice(self.hyperparameter_ranges[key])
        elif isinstance(self.hyperparameter_ranges[key], tuple) and len(self.hyperparameter_ranges[key]) == 2:
             min_val, max_val = self.hyperparameter_ranges[key]
             if isinstance(min_val, int):
                return np.random.randint(min_val, max_val + 1) #
             else: 
                return np.random.uniform(min_val, max_val)
        else:
            raise ValueError(f"Unknown type for hyperparameter range: {key}")

    def initialize_harmony(self):
        config = {}
        config['num_epochs'] = self._generate_random_value('num_epochs')
        config['batch_size'] = self._generate_random_value('batch_size')
        config['num_conv_layers'] = self._generate_random_value('num_conv_layers')
        
        config['filters'] = [self._generate_random_value('filters') for _ in range(config['num_conv_layers'])]
        config['filter_sizes'] = [self._generate_random_value('filter_sizes') for _ in range(config['num_conv_layers'])]
        config['conv_activations'] = [self._generate_random_value('conv_activations') for _ in range(config['num_conv_layers'])]
        config['pooling_types'] = [self._generate_random_value('pooling_types') for _ in range(config['num_conv_layers'])]
        config['pool_sizes'] = [self._generate_random_value('pool_sizes') for _ in range(config['num_conv_layers'])]
        
        config['num_dense_layers'] = self._generate_random_value('num_dense_layers')
        config['dense_units'] = [self._generate_random_value('dense_units') for _ in range(config['num_dense_layers'])]
        config['dense_activations'] = [self._generate_random_value('dense_activations') for _ in range(config['num_dense_layers'])]
        
        config['optimizer'] = self._generate_random_value('optimizer')
        config['learning_rate'] = self._generate_random_value('learning_rate')
        return config

    def evaluate_harmony(self, config):
        try:
            model = CNN(config, self.num_classes).to(device)
            
            lr = config['learning_rate']
            if config['optimizer'] == 1:
                optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9) 
            elif config['optimizer'] == 2:
                optimizer = optim.Adam(model.parameters(), lr=lr)
            elif config['optimizer'] == 3:
                optimizer = optim.RMSprop(model.parameters(), lr=lr)
            elif config['optimizer'] == 4:
                 optimizer = optim.Adadelta(model.parameters(), lr=lr)
            else: # Default to Adam
                optimizer = optim.Adam(model.parameters(), lr=lr)


            criterion = nn.CrossEntropyLoss()
            train_loader = DataLoader(self.train_dataset, batch_size=config['batch_size'], shuffle=True, num_workers=2, pin_memory=True)
            test_loader = DataLoader(self.test_dataset, batch_size=config['batch_size'], shuffle=False, num_workers=2, pin_memory=True)


            best_val_accuracy_for_this_config = 0.0
            # early stopping 
            # tolerance  = 3
            # tol_count = 0

            for epoch in range(config['num_epochs']):
                model.train()
                for images, labels in train_loader:
                    images, labels = images.to(device), labels.to(device)
                    optimizer.zero_grad()
                    outputs = model(images)
                    loss = criterion(outputs, labels)
                    loss.backward()
                    optimizer.step()

                model.eval()
                correct = 0
                total = 0
                with torch.no_grad():
                    for images, labels in test_loader: 
                        images, labels = images.to(device), labels.to(device)
                        outputs = model(images)
                        _, predicted = torch.max(outputs.data, 1)
                        total += labels.size(0)
                        correct += (predicted == labels).sum().item()
                
                current_val_accuracy = correct / total
                if current_val_accuracy > best_val_accuracy_for_this_config:
                    best_val_accuracy_for_this_config = current_val_accuracy
                    # tol_count = 0 # reset 
                # else:
                    # tol_count += 1
                
                # if tol_count >= tolerance:
                #     print(f"early stopping at epoch {epoch+1} for this config.")
                #     break 
                # print(f"Epoch {epoch+1}/{config['num_epochs']}, Accuracy: {current_val_accuracy:.4f} (Best this config: {best_val_accuracy_for_this_config:.4f})")


            return best_val_accuracy_for_this_config 

        except (ValueError, RuntimeError) as e:
            print(f"Evaluation failed for config. Error: {e}")
            # print(f"INvalid Config: {config}") # debugging 
            return 0.0 # invalid architectures ko 0 fitness
        except Exception as e:
            print(f"An unexpected error occurred during evaluation: {e}")
            # print(f"INvalid Config: {config}") #debugging
            return 0.0


    def _pitch_adjust_value(self, current_value, key, parent_config_for_lengths=None):
        """Adjusts a single value for a hyperparameter key."""
        param_spec = self.hyperparameter_ranges[key]

        if key in ['batch_size', 'filter_sizes', 'pool_sizes', 'learning_rate']:
            choices = list(param_spec)
            if current_value in choices: choices.remove(current_value)
            return random.choice(choices) if choices else current_value
        
        elif isinstance(param_spec, tuple) and len(param_spec) == 2:
            min_val, max_val = param_spec
            if isinstance(min_val, int): 
                step = max(1, int(self.bw_factor * (max_val - min_val)))
                change = random.choice([-step, step, 0]) 
                new_val = current_value + change
                return np.clip(new_val, min_val, max_val)
            else: 
                std_dev = self.bw_factor * (max_val - min_val)
                a, b = (min_val - current_value) / std_dev, (max_val - current_value) / std_dev
                new_val = truncnorm.rvs(a, b, loc=current_value, scale=std_dev)
                return np.clip(new_val, min_val, max_val)
        else:
            return current_value


    def run(self):

        print("Initializing Harmony Memory...")
        for i in range(self.hms):
            while True: 
                config = self.initialize_harmony()
                fitness = self.evaluate_harmony(config)
                if fitness > 0.0 or i > self.hms * 2 : # Try a few times to get a valid one, then accept 0
                    break
                print("Initial config invalid, retrying...")
            self.harmony_memory.append({'config': config, 'fitness': fitness})
            print(f"Initial Harmony {i+1}/{self.hms}, Fitness: {fitness:.4f}")

        self.harmony_memory.sort(key=lambda x: x['fitness'], reverse=True)
        print(f"\nInitial Best Fitness: {self.harmony_memory[0]['fitness']:.4f}")

        for impro_count in range(self.max_improvs):
            print(f"\nImprovisation {impro_count + 1}/{self.max_improvs}")
            
            new_harmony_config = {} 

            structural_keys = ['num_conv_layers', 'num_dense_layers']
            temp_structural_values = {}

            for key in structural_keys:
                if random.random() < self.hmcr: 
                    parent_idx = random.randrange(self.hms)
                    value = self.harmony_memory[parent_idx]['config'][key]
                    if random.random() < self.par:
                        value = self._pitch_adjust_value(value, key)
                else: 
                    value = self._generate_random_value(key)
                temp_structural_values[key] = value

            new_harmony_config['num_conv_layers'] = temp_structural_values['num_conv_layers']
            new_harmony_config['num_dense_layers'] = temp_structural_values['num_dense_layers']
 
            scalar_params = ['num_epochs', 'optimizer', 'learning_rate', 'batch_size']
            for key in scalar_params:
                 if random.random() < self.hmcr:
                    parent_idx = random.randrange(self.hms)
                    value = self.harmony_memory[parent_idx]['config'][key]
                    if random.random() < self.par:
                        value = self._pitch_adjust_value(value, key)
                 else:
                    value = self._generate_random_value(key)
                 new_harmony_config[key] = value

            conv_list_params = ['filters', 'filter_sizes', 'conv_activations', 'pooling_types', 'pool_sizes']
            num_cv_layers = new_harmony_config['num_conv_layers']
            for key in conv_list_params:
                new_harmony_config[key] = []
                for _ in range(num_cv_layers):
                    if random.random() < self.hmcr:
                        parent_idx = random.randrange(self.hms)
                        parent_list = self.harmony_memory[parent_idx]['config'][key]
                        if len(new_harmony_config[key]) < len(parent_list): 
                            value = parent_list[len(new_harmony_config[key])]
                        else: 
                            value = random.choice(parent_list) if parent_list else self._generate_random_value(key)
                        
                        if random.random() < self.par:
                             value = self._pitch_adjust_value(value, key, self.harmony_memory[parent_idx]['config'])
                    else:
                        value = self._generate_random_value(key)
                    new_harmony_config[key].append(value)

            dense_list_params = ['dense_units', 'dense_activations']
            num_ds_layers = new_harmony_config['num_dense_layers']
            for key in dense_list_params:
                new_harmony_config[key] = []
                for _ in range(num_ds_layers):
                    if random.random() < self.hmcr:
                        parent_idx = random.randrange(self.hms)
                        parent_list = self.harmony_memory[parent_idx]['config'][key]
                        if len(new_harmony_config[key]) < len(parent_list):
                            value = parent_list[len(new_harmony_config[key])]
                        else:
                            value = random.choice(parent_list) if parent_list else self._generate_random_value(key)

                        if random.random() < self.par:
                             value = self._pitch_adjust_value(value, key, self.harmony_memory[parent_idx]['config'])
                    else:
                        value = self._generate_random_value(key)
                    new_harmony_config[key].append(value)

            new_fitness = self.evaluate_harmony(new_harmony_config)
            print(f"New Harmony improvised. Fitness: {new_fitness:.4f}")

            worst_fitness_in_hm = self.harmony_memory[-1]['fitness']
            if new_fitness > worst_fitness_in_hm:
                print(f"New harmony is better than worst in HM ({new_fitness:.4f} > {worst_fitness_in_hm:.4f}). Replacing.")
                self.harmony_memory[-1] = {'config': new_harmony_config, 'fitness': new_fitness}
                self.harmony_memory.sort(key=lambda x: x['fitness'], reverse=True)
            else:
                print(f"New harmony not better than worst in HM ({new_fitness:.4f} <= {worst_fitness_in_hm:.4f}). Discarding.")
            
            print(f" Current Best Fitness in HM: {self.harmony_memory[0]['fitness']:.4f}")

        best_solution = self.harmony_memory[0]
        return best_solution['config'], best_solution['fitness']


In [None]:
.....

In [7]:
hsa = HSA(hms=3,hmcr=0.9,par=0.3, bw_factor=0.1,max_improvs=2, dataset_name='digits')
bestConfig, bestAcc = hsa.run() 
for key, value in bestConfig.items(): 
    print(f"{key}: {value}") 
print(f"Best accuracy: {bestAcc}")

Initializing Harmony Memory...
Initial Harmony 1/3, Fitness: 0.8521
Initial Harmony 2/3, Fitness: 0.9763
Initial Harmony 3/3, Fitness: 0.1000

Initial Best Fitness: 0.9763

Improvisation 1/2
New Harmony improvised. Fitness: 0.8031
New harmony is better than worst in HM (0.8031 > 0.1000). Replacing.
 Current Best Fitness in HM: 0.9763

Improvisation 2/2
New Harmony improvised. Fitness: 0.9049
New harmony is better than worst in HM (0.9049 > 0.8031). Replacing.
 Current Best Fitness in HM: 0.9763
num_epochs: 7
batch_size: 256
num_conv_layers: 1
filters: [42]
filter_sizes: [3]
conv_activations: [4]
pooling_types: [1]
pool_sizes: [3]
num_dense_layers: 2
dense_units: [93, 115]
dense_activations: [2, 2]
optimizer: 2
learning_rate: 0.0001
Best accuracy: 0.976325


In [4]:
hsa = HSA(hms=10,hmcr=0.9,par=0.3, bw_factor=0.1,max_improvs=3, dataset_name='digits')
bestConfig, bestAcc = hsa.run() 
for key, value in bestConfig.items(): 
    print(f"{key}: {value}") 
print(f"Best accuracy: {bestAcc}")

Initializing Harmony Memory...
Initial Harmony 1/10, Fitness: 0.1000
Initial Harmony 2/10, Fitness: 0.8253
Initial Harmony 3/10, Fitness: 0.9741
Initial Harmony 4/10, Fitness: 0.9931
Initial Harmony 5/10, Fitness: 0.5711
Initial Harmony 6/10, Fitness: 0.9860
Initial Harmony 7/10, Fitness: 0.4269
Initial Harmony 8/10, Fitness: 0.9231
Initial Harmony 9/10, Fitness: 0.9928
Initial Harmony 10/10, Fitness: 0.1000

Initial Best Fitness: 0.9931

Improvisation 1/3
New Harmony improvised. Fitness: 0.9910
New harmony is better than worst in HM (0.9910 > 0.1000). Replacing.
 Current Best Fitness in HM: 0.9931

Improvisation 2/3
New Harmony improvised. Fitness: 0.8050
New harmony is better than worst in HM (0.8050 > 0.1000). Replacing.
 Current Best Fitness in HM: 0.9931

Improvisation 3/3
New Harmony improvised. Fitness: 0.9273
New harmony is better than worst in HM (0.9273 > 0.4269). Replacing.
 Current Best Fitness in HM: 0.9931
num_epochs: 11
batch_size: 64
num_conv_layers: 3
filters: [54, 37,

In [None]:
hsa = HSA(hms=20,hmcr=0.9,par=0.3, bw_factor=0.1,max_improvs=2, dataset_name='digits')
bestConfig, bestAcc = hsa.run() 
for key, value in bestConfig.items(): 
    print(f"{key}: {value}") 
print(f"Best accuracy: {bestAcc}")