### installing requiered libraries

In [None]:
!pip install deap

### imports

In [None]:
import random
from deap import base, creator, tools
from tqdm import tqdm
from copy import deepcopy
import numpy as np
import pandas as pd
import torch

import torch.nn as nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from PIL import Image
from imblearn.over_sampling import SMOTE

import matplotlib.pyplot as plt
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
import torch.nn.init as init
from torchvision.transforms import transforms,RandomRotation,ColorJitter,GaussianBlur,RandomHorizontalFlip,Resize,AutoAugment,AutoAugmentPolicy
import torchvision
from torch import Tensor
import torch.nn.functional as F
from PIL import Image
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data.sampler import WeightedRandomSampler
import seaborn as sns
import copy
import cv2
from sklearn.preprocessing import normalize
from imblearn.over_sampling import SMOTE
import random
import os


np.random.seed(0)
# torch.backends.cudnn.deterministic = True
torch.manual_seed(0)
torch.cuda.manual_seed_all(0)
np.random.seed(0)

### fetching data from google drive in colab env

In [None]:
from google.colab import drive
drive.mount('/content/drive')
!unzip "drive/MyDrive/data/dataset_name.zip" -d ./data/

ds_name='dataset_name'

### fetching data from google drive in kaggle env

In [None]:
from pydrive2.auth import GoogleAuth
from pydrive2.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials
import shutil  
import os
import zipfile  
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

ds_name='dataset_name'
downloaded = drive.CreateFile({'id': 'XXXXXXXXXXXXXXXXXX'})
os.makedirs('data', exist_ok=True)
downloaded.GetContentFile(f'data/{ds_name}.zip')

with zipfile.ZipFile(f'data/{ds_name}.zip', 'r') as zip_ref:  
    zip_ref.extractall('data')  
os.remove(f'data/{ds_name}.zip')



### Init

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
batch_size = 32
n_classes=3
img_size=100
is_warming_up=False

### Prepare DataLoader

In [None]:
class CustomDataset(Dataset):

    def __init__(self, imgs, lbls, transform=None):
        self.imgs = imgs
        self.lbls=torch.tensor(lbls, dtype=torch.long)
        self.transform = transform


    def __len__(self):
        return len(self.imgs)

    def __getitem__(self, idx):
        img,lbl = self.imgs[idx], self.lbls[idx]
        img=Image.fromarray(img)
        if self.transform != None:
            img = self.transform(img)
        return img,lbl

def string_to_array(x):
    result=np.array(x.split(' '),dtype=np.uint8).reshape(img_size,img_size)
    return result
def get_dataloaders(dataset_name,batch_size,img_size):


    df=pd.read_csv(f'./data/{dataset_name}.csv')

    X = np.array(df['pixels'].apply(lambda x: string_to_array(x)).tolist())

    y=df['lbl'].astype(np.uint8)


    train_mask = df['usage']=='train'
    valid_mask = df['usage']=='valid'
    test_mask = df['usage']=='test'


    x_train=X[train_mask]
    y_train=y[train_mask]

    x_valid=X[valid_mask]
    y_valid=y[valid_mask]

    x_test=X[test_mask]
    y_test=y[test_mask]
    
    # Balancing training data
    sm = SMOTE(random_state = 2)
    x_train, y_train = sm.fit_resample(x_train.reshape(x_train.shape[0], -1) , y_train)
    x_train=x_train.reshape(-1,48,48)

    train_transform = transforms.Compose([
                transforms.RandomHorizontalFlip(p=0.5),
                Resize((img_size,img_size)),
                transforms.ToTensor(),
                transforms.Normalize(mean=[0.5], std=[0.5])
                ])
    valid_test_transform = transforms.Compose([
                Resize((img_size,img_size)),
                transforms.ToTensor(),  
                transforms.Normalize(mean=[0.5], std=[0.5])
                ])

    dataset_train = CustomDataset(x_train,y_train,transform=train_transform)
    dataset_valid = CustomDataset(x_valid, y_valid,transform=valid_test_transform)
    dataset_test = CustomDataset(x_test, y_test,transform=valid_test_transform)

    data_loaders_dict={}
    data_loaders_dict['train']=DataLoader(dataset_train, batch_size=batch_size,shuffle=True,num_workers=2)
    data_loaders_dict['valid']=DataLoader(dataset_valid, batch_size=batch_size,shuffle=True,num_workers=2)

    data_loaders_dict['test'] = DataLoader(dataset_test, batch_size=batch_size,shuffle=True,num_workers=2)


    return data_loaders_dict



dataloader_dict= get_dataloaders(ds_name,batch_size=batch_size,img_size=img_size)

### Model

In [None]:
class Simple_CNN(nn.Module):

    def __init__(self,feature_extractor_layers,mlp_layers):
        super(Simple_CNN,self).__init__()

        self.optimizer = None
        self.scheduler = None
        self.criterion = None

        self.loss_acc_info={}

        for validation_type in ['train','valid','test']:

            self.loss_acc_info[validation_type]={}

            for metric in ['acc','loss']:
                self.loss_acc_info[validation_type][metric]=[]

        self.best_checkpoint={}
        for metric in ['acc','loss']:
            self.best_checkpoint[metric]={
            'best_acc_model_state_dicts':self.state_dict(),
            metric:0 if metric=='acc' else float('inf'),

            }

        self.feature_extractor=nn.Sequential(*feature_extractor_layers)
        self.flt = nn.Flatten()
        self.coarse_mlp_model=nn.Sequential(*mlp_layers)

    def forward(self, x):

        features=self.feature_extractor(x)
        flattened_features=self.flt(features)
        logits=self.coarse_mlp_model(flattened_features)

        return logits

    def init_weights(self,m):
        if isinstance(m, nn.Linear):
            init.xavier_uniform_(m.weight)
            if m.bias is not None:
                init.zeros_(m.bias)

        if isinstance(m, nn.Conv2d):
            init.kaiming_uniform_(m.weight, nonlinearity='relu')
            if m.bias is not None:
                init.zeros_(m.bias)

        
    def is_model_initialized(self):
        for name, param in self.named_parameters():
            if isinstance(param, nn.parameter.UninitializedParameter):
                print(f"Uninitialized parameter found in: {name}")
                return False
        print("All layers are initialized")
        return True


    def calculate_loss(self,y_pred,y_target):

        return self.criterion(y_pred, y_target)


    def eval_model(self,dl_dict):
        self.eval()
        with torch.no_grad():

            for dl_type_temp , dl_temp in dl_dict.items():
                num_sample = []
                correct=[]
                running_loss=[]
                all_pred=[]
                all_target=[]


                for x,y in dl_temp:

                    x = x.to(device)
                    y = y.to(device)
                    
                    logits= self(x)

                    batch_loss=self.calculate_loss(y_pred=logits, y_target=y)
                    running_loss.append(batch_loss.item())
                    all_pred.extend(list(logits.detach().cpu().argmax(dim=1)))
                    all_target.extend(list(y.detach().cpu()))

                    correct.append((y.detach().cpu()==logits.argmax(dim=1).detach().cpu()).sum().item())
                    num_sample.append(logits.shape[0])

                self.loss_acc_info[dl_type_temp]['acc'].append(sum(correct)/sum(num_sample))
                self.loss_acc_info[dl_type_temp]['loss'].append(np.mean(running_loss))

                avg_loss=self.loss_acc_info[dl_type_temp]['loss'][-1]
                
                if dl_type_temp == 'valid' :
                    self.scheduler.step(avg_loss)

                acc=self.loss_acc_info[dl_type_temp]['acc'][-1]
                if dl_type_temp == 'test' and acc>=self.best_checkpoint['acc']['acc']:

                    self.best_checkpoint['acc']['acc']=acc
                    self.best_checkpoint['model_state_dict_acc']=copy.deepcopy(self.state_dict())


                loss=self.loss_acc_info[dl_type_temp]['loss'][-1]
                if dl_type_temp == 'test' and loss<=self.best_checkpoint['loss']['loss']:

                    self.best_checkpoint['loss']['loss']=loss
                    self.best_checkpoint['model_state_dict_acc']=copy.deepcopy(self.state_dict())

    def one_step_train(self,dl):
        self.train()

        for x,y in dl:

            self.optimizer.zero_grad()
            
            x = x.cuda()
            y = y.cuda()
            
            logits= self(x)

            batch_loss=self.calculate_loss(y_pred=logits, y_target=y)

            batch_loss.backward()
            self.optimizer.step()

    def train_model(self,num_epochs,dataloader_dict):

        for epoch in tqdm(range(num_epochs)):

            self.one_step_train(dataloader_dict['train'])
            self.eval_model(dataloader_dict)

def create_layer(gen):
    config=gen.config
    layer_type=config['layer_type']

    if layer_type=='conv':
        layer=nn.Sequential()
        layer.append(nn.LazyConv2d(config['num_feature_map'],kernel_size=config['kernel_size'],bias=False))
        layer.append(nn.LazyBatchNorm2d())
        layer.append(nn.ReLU())

    elif layer_type=='deconv':
        layer=nn.Sequential()
        layer.append(nn.LazyConvTranspose2d(out_channels=config['num_feature_map'],kernel_size=config['kernel_size'],bias=False))
        layer.append(nn.LazyBatchNorm2d())
        layer.append(nn.ReLU())

    elif layer_type=='identity':
        layer=nn.Identity()

    elif layer_type=='maxpool':
        layer=nn.MaxPool2d((config['kernel_size'],config['stride']))

    else:
        raise ValueError('invalid layer type')

    return layer
def set_layers(ind):
        
    feature_extractor_layers=[]
    for idx in range(len(ind)-1):
        feature_extractor_layers.append(create_layer(ind[idx]))
    feature_extractor_layers.append(nn.AdaptiveAvgPool2d((1,1)))


    mlp_layers=[
        nn.Dropout1d(ind[-1].config['drop_rate']),
        nn.LazyLinear(n_classes)
    ]
    return feature_extractor_layers,mlp_layers


def get_acc_loss_nparam(ind,dataloader_dict,epoch):
    criterion = nn.CrossEntropyLoss()

    try:
        my_model=Simple_CNN(*set_layers(ind)).to(device)
    
        my_model.criterion=criterion
        my_model.optimizer=optim.Adamw(my_model.parameters())
        my_model.scheduler=ReduceLROnPlateau(my_model.optimizer, mode='min', patience=3, factor=0.95)
        # warming up model
        temp_x = torch.randn(1,1,img_size,img_size).to(device)
        my_model(temp_x)    
        # my_model.is_model_initialized()
        
        my_model.apply(my_model.init_weights)
        num_params = sum(p.numel() for p in my_model.parameters())  

        my_model.train_model(num_epochs=epoch,dataloader_dict=dataloader_dict)
        return (my_model.best_checkpoint['acc']['acc'],my_model.best_checkpoint['loss']['loss'],num_params)

    except Exception as e:
        # print("An error occurred:", e)
        return (-1.0,-1.0,-1.0)

### GA

In [None]:

config_seprator='*'
gen_seprator='***'

class Gen():
    
    def __init__(self,config) -> None:
        self.config=config

    def get_gen_name(self):
        gen_seq=''

        for key,val in self.config.items():
            gen_seq=gen_seq+f'({key}_{val})'

        gen_seq=gen_seq.replace( ')(',
                                 f'){config_seprator}('
                                 )

        return gen_seq
    def __getitem__(self, index):
        
        return self.config[index] 
    def __setitem__(self, index, value):  
        self.config[index] = value  

class Chromosome():

    def __init__(self,gens_list) -> None:
        self.gens_list=gens_list

    def get_chromosome_name(self):
        chrom_seq=''
        for gen in self.gens_list:
            chrom_seq=chrom_seq+gen.get_gen_name()
        chrom_seq=chrom_seq.replace( ')(',
                          f'){gen_seprator}('
                        )
        
        return chrom_seq

    def __iter__(self):  
        return iter(self.gens_list)
    def __getitem__(self, index):
        
        return self.gens_list[index] 

    def __len__(self):  
        return len(self.gens_list)
    def __setitem__(self, index, value):  
        self.gens_list[index] = value 


creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", Chromosome, fitness=creator.FitnessMax)


p_mutation = 0.2
p_cross = 0.8
n_generation = 20
n_run = 2
population_size = 10
tournament_size = 2
n_feature_extraction_layers=6
n_dropout_layers=1
epoch=2
config_names = [
    f'layer_{i}'
    for i in range(n_feature_extraction_layers)
]
config_names = config_names + [
    f'drop_{i}'
    for i in range(n_dropout_layers)
]
config_names+=['acc','loss','num_param']
feature_extraction_layer_types=['conv','deconv','identity','maxpool']
conv_kernel_size_range=[1,3,5]
conv_num_feature_map_range=[0,5]
dropout_range = [0,5]
maxpool_kernel_range=[2,3]
maxpool_stride_range=[2,3]
os.makedirs('output', exist_ok=True)

config_path=f'output/{ds_name} configs.csv'


def create_individual():

    while True:
        gens_list=[]
        for _ in range(n_feature_extraction_layers):
            layer_type=np.random.choice(feature_extraction_layer_types, size=1)[0]
            if layer_type in ['conv','deconv']:
                gen=Gen(
                    config=
                    {
                        'layer_type':layer_type,
                        'kernel_size':int(np.random.choice(conv_kernel_size_range, size=1)[0]),
                        'num_feature_map':pow(2, random.randrange(conv_num_feature_map_range[0], conv_num_feature_map_range[1]+1))
                    }
                )
            elif layer_type == 'maxpool':
                gen=Gen(
                    config=
                    {
                        'layer_type':layer_type,
                        'kernel_size':int(np.random.choice(maxpool_kernel_range, size=1)[0]),
                        'stride':int(np.random.choice(maxpool_stride_range, size=1)[0])
                    }
                )
            else:
                gen=Gen(
                    config=
                    {
                        'layer_type':layer_type
                    }
                )


            gens_list.append(gen)

        for _ in range(n_dropout_layers):
            gen=Gen(
                    config=
                    {
                        'layer_type':'dropout',
                        'drop_rate':round(random.randrange(dropout_range[0], dropout_range[1]+1)*0.1, 1)
                    }
                )
            gens_list.append(gen)
        
        ind=Chromosome(gens_list=gens_list)
        if ind.get_chromosome_name() not in seen_fit:
            return ind  

toolbox = base.Toolbox()
toolbox.register("individual", tools.initIterate,
                 creator.Individual, create_individual)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)


def custom_mutation(x, mpb):

    ind = deepcopy(x)

    if random.random() < mpb:
        mutation_point = random.choice(range(len(ind)))
        
        if mutation_point<len(ind)-n_dropout_layers:
            layer_type=np.random.choice(feature_extraction_layer_types, size=1)[0]
            if layer_type in ['conv','deconv']:
                gen=Gen(
                    config=
                    {
                        'layer_type':layer_type,
                        'kernel_size':int(np.random.choice(conv_kernel_size_range, size=1)[0]),
                        'num_feature_map':pow(2, random.randrange(conv_num_feature_map_range[0], conv_num_feature_map_range[1]+1))
                    }
                )
            elif layer_type == 'maxpool':
                gen=Gen(
                    config=
                    {
                        'layer_type':layer_type,
                        'kernel_size':int(np.random.choice(maxpool_kernel_range, size=1)[0]),
                        'stride':int(np.random.choice(maxpool_stride_range, size=1)[0])
                    }
                )
            else:
                gen=Gen(
                    config=
                    {
                        'layer_type':layer_type
                    }
                )
            ind[mutation_point]=gen

        else:
            gen=Gen(
                    config=
                    {
                        'layer_type':'dropout',
                        'drop_rate':round(random.randrange(dropout_range[0], dropout_range[1]+1)*0.1, 1)
                    }
                )
            ind[mutation_point] = gen

    return ind


def custom_crossover(ind1, ind2, cxpb):
    if random.random() < cxpb:

        c1, c2 = tools.cxOnePoint(ind1[:], ind2[:])
        c1, c2 = creator.Individual(c1), creator.Individual(c2)
        
        return c1, c2
    else:
        return creator.Individual(ind1[:]), creator.Individual(ind2[:])


def get_fitness(ind):
    
    ind_name = ind.get_chromosome_name()
    if ind_name in seen_fit:
        return (seen_fit[ind_name][0],)
    else:
        # fitnesses=(acc,loss,num_param)
        fitnesses = get_acc_loss_nparam(ind=ind,dataloader_dict=dataloader_dict,epoch=epoch)
        
        # print(fitnesses)
        seen_fit[ind_name] = fitnesses
        df_all = pd.DataFrame([ind_name.split(gen_seprator)+list(ind_fit) for ind_name,ind_fit in seen_fit.items()],
                              columns=config_names)
        df_all.to_csv(config_path, index=False)

        return (fitnesses[0],)

def eval_and_check_duplicate(inds_list,traget_list=None):
    if traget_list is not None:
        target_name_list=[t.get_chromosome_name() for t in traget_list ]
    else:
        target_name_list=[]
    for idx in range(len(inds_list)):
        is_ok=False
        while not is_ok:
            fitness=toolbox.evaluate(inds_list[idx])
            if (inds_list[idx].get_chromosome_name() in target_name_list) or fitness[0]==-1:
                inds_list[idx]=toolbox.population(n=1)[0] 
            else:
                inds_list[idx].fitness.values = fitness
                is_ok=True
    return inds_list


toolbox.register("select", tools.selTournament, tournsize=tournament_size)

toolbox.register("evaluate", get_fitness)

toolbox.register("mate", custom_crossover, cxpb=p_cross)

toolbox.register("mutate", custom_mutation, mpb=p_mutation)

max_acc_per_run = []

seen_fit = {}

# load previous inds
print('Previous ckp: ',os.path.isfile(config_path))
if os.path.isfile(config_path):
    df = pd.read_csv(config_path)
    for temp_data in df.iterrows():
        ind_name=gen_seprator.join(temp_data[1].values[:-3])

        seen_fit[ind_name] = ((float(temp_data[1].values[-3]),float(temp_data[1].values[-2]),int(temp_data[1].values[-1])))

    print(f'Previous ckp is loaded {len(seen_fit)=}')


for _ in range(n_run):

    population = toolbox.population(n=population_size)

    for idx in range(len(population)):
        population=eval_and_check_duplicate(population,None)
    
    for current_gen in range(n_generation):
        print(f'{current_gen=}')
        
        fitnesses = [ind.fitness.values[0] for ind in population]

        print(f'best ind:{population[np.argmax(fitnesses)].get_chromosome_name()} \nfittness :{np.max(fitnesses)}')

        offspring = []

        while len(offspring) < population_size:

            ind1, ind2 = random.sample(population, 2)
            child1, child2 = toolbox.mate(ind1, ind2)

            child1 = toolbox.mutate(child1)
            child2 = toolbox.mutate(child2)
            
            new_children=[child1,child2]
            new_children=eval_and_check_duplicate(new_children,population + offspring)

            offspring.extend(new_children)

        candidate_for_next_generation=population + offspring
        
        elitism_ind =candidate_for_next_generation.pop(np.argmax(
            [ind.fitness.values[0] for ind in (population + offspring)]))
        population = toolbox.select(
            candidate_for_next_generation, int(0.8*population_size))

        population.append(elitism_ind)
        new_rand_inds=toolbox.population(n=population_size-len(population))
        new_rand_inds=eval_and_check_duplicate(new_rand_inds,population)

        population.extend(new_rand_inds)
    max_acc_per_run.append(max([ind.fitness.values[0] for ind in population]))

fitnesses = [ind.fitness.values[0] for ind in population]

print(f'best ind:{population[np.argmax(fitnesses)].get_chromosome_name()} \nfittness :{np.max(fitnesses)}')

