In [None]:
import torch
import numpy as np
import pandas as pd
import torchvision
import pathlib
from torchvision.transforms import transforms
from torch.utils.data import DataLoader, Dataset, random_split
from torch.utils.checkpoint import checkpoint
import torch.nn.functional as F
from torch import nn
from torch import Tensor
from einops import rearrange, repeat
from einops.layers.torch import Rearrange
import time
from tqdm import tqdm, trange
from accelerate import Accelerator
import random
import os
from sklearn.metrics import confusion_matrix, recall_score, precision_score, mean_absolute_error
from datasets import load_dataset
from PIL import Image
from pyDOE2 import lhs
from scipy.stats import f_oneway
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
import seaborn as sns
import matplotlib.pyplot as plt
from langchain_groq import ChatGroq
# from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser
torch.cuda.empty_cache()

In [None]:
# Try to get torchinfo, install it if it doesn't work
try:
    from torchinfo import summary
except:
    print("[INFO] Couldn't find torchinfo... installing it.")
    !pip install -q torchinfo
    from torchinfo import summary

In [None]:
RANDOM_SEED = 42
random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
torch.cuda.manual_seed(RANDOM_SEED)

In [None]:
POP_SIZE = 10
MAX_GEN = 10
save_dir = './'
# Data Path
train_path = 'Training data path'
test_path = 'Testing data path'
BATCH_SIZE = 2

In [None]:
train_split = 0.8
# Transforms
def dataloader():
    train_transformer = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(degrees=10), 
        transforms.ToTensor(),  
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) 
    ])
    
    data = torchvision.datasets.ImageFolder(train_path,transform = train_transformer)
    
    tr_s = int(train_split * len(data))
    val_s = len(data) - tr_s
    train, val = random_split(data, [tr_s, val_s])

    train_loader = DataLoader(
        train,
        batch_size = BATCH_SIZE, shuffle = True
    )
    
    val_loader = DataLoader(
        val,
        batch_size = BATCH_SIZE, shuffle = False
    )

    test_transformer = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    test_loader = DataLoader(
        torchvision.datasets.ImageFolder(test_path,transform = test_transformer),
        batch_size = BATCH_SIZE, shuffle = False
    )

    root=pathlib.Path(train_path)
    classes=sorted([j.name.split('/')[-1] for j in root.iterdir()])

    return train_loader, val_loader, test_loader, classes

In [None]:
# Loading Data Set
accelerator = Accelerator()
device = torch.device("cuda")
device = accelerator.device
train_loader, val_loader, classes = dataloader()

In [None]:
train_loader, val_loader = accelerator.prepare(train_loader, val_loader)
num_class = len(classes)
print(f"Number of classes: {num_class}")

In [None]:
len(train_loader), len(val_loader)

In [None]:
# Latin Hypercube sampeling

rate_lhs = lhs(2, 1)[0]
rate = {
    'F': {
        'low': 0.5,
        'high': 1,
        'type': 'float'
    }
}
rates = {}
param_keys = list(rate.keys())
for i, param in enumerate(param_keys):
        param_info = rate[param]
        if param_info["type"] == "float":
            # Scale [0, 1] sample to the float range
            rates[param] = round(param_info["low"] + rate_lhs[i] * (param_info["high"] - param_info["low"]),1)
        
        
SCALING_FAC  = rates['F']         # F
print(rates)

# Model

In [None]:
class Embedding(nn.Module):

    def __init__(self, 
                 DR, 
                 patch_size: int, 
                 emb_dim: int, 
                 in_channels: int = 3, 
                 img_size: int = 224):
        super().__init__()
        self.in_channels = in_channels
        self.patch_size = patch_size
        self.emb_dim = emb_dim
        # self.batch_size = batch_size
        assert img_size % self.patch_size == 0, f"Input img must be divisble by patch size {self.patch_size}"
        self.num_patches = (img_size * img_size) // (self.patch_size ** 2)

        self.patcher = nn.Conv2d(
                in_channels = self.in_channels,
                out_channels = self.emb_dim,
                kernel_size = self.patch_size,
                stride = self.patch_size,
                padding = 0
                    )
        self.flatten = nn.Flatten(2)
        
        self.cls_token = nn.Parameter(
            torch.randn(1, 1, self.emb_dim),
            requires_grad = True
        )
        self.pos_embd = nn.Parameter(
            torch.randn(1, self.num_patches + 1, self.emb_dim),
            requires_grad = True
        )
        self.emb_dropout = nn.Dropout(p = DR)

    def forward(self, x):
        self.batch_size = x.shape[0]
        img_res = x.shape[-1]
        assert img_res % self.patch_size == 0, f"Input img must be divisble by patch size {self.patch_size} and current image shape {img_res}"
        cls_token = self.cls_token.expand(self.batch_size, -1, -1)
        x = self.patcher(x)
        x = self.flatten(x)
        x = x.transpose(1, 2)
        # x = x.permute(0, 2 , 1)
        x = torch.cat((cls_token, x), dim = 1)
        x = x + self.pos_embd
        x = self.emb_dropout(x)

        return x


# Step(2) ---> Create ViT Model
class MyViT(nn.Module):

    def __init__(self,
                DR,
                activation,
                patch_size,
                emb_dim, 
                num_layers,
                num_heads,
                num_classes,
                d_ff,
                in_channels: int = 3,
                img_size: int = 224): #224
        super().__init__()
        self.mlp_size = d_ff #4 * emb_dim
        assert img_size % patch_size == 0, "Img Size must be divisble patch size"
        self.embedding = Embedding(
                                  DR = DR,
                                  patch_size = patch_size,
                                  emb_dim = emb_dim,
                                  in_channels = in_channels,
                                  img_size = img_size
                                  )
        self.encoder = nn.TransformerEncoder(
                                encoder_layer = nn.TransformerEncoderLayer(
                                                            d_model = emb_dim,
                                                            nhead = num_heads,
                                                            dim_feedforward = self.mlp_size,
                                                            activation = activation,
                                                            batch_first = True,
                                                            norm_first = True), # Create a single Transformer Encoder Layer
                                                    num_layers = num_layers
                                            )
        self.mlp_head = nn.Sequential(
            nn.LayerNorm(normalized_shape = emb_dim, eps = 1e-12),
            nn.Linear(in_features = emb_dim,
                     out_features = num_classes)
        )

    def forward(self, x):

        x = self.embedding(x)
        x = self.encoder(x)
        x = self.mlp_head(x[:, 0])
        return x

# Evaluate Model

In [None]:
def calc_lr(step, dim_embed, warmup_steps):
    return dim_embed**(-0.5) * min(step**(-0.5), step * warmup_steps**(-1.5))

In [None]:
class Scheduler(torch.optim.lr_scheduler._LRScheduler):
    def __init__(self,
                 optimizer,
                 dim_embed,
                 warmup_steps,
                 steps_in_epoch,
                 last_epoch=-1,
                 verbose=False):

        self.dim_embed = dim_embed
        self.warmup_steps = warmup_steps
        self.num_param_groups = len(optimizer.param_groups)

        super().__init__(optimizer, last_epoch, verbose)
        self._step_count = (last_epoch+1)*steps_in_epoch

    def get_lr(self):
        lr = calc_lr(self._step_count, self.dim_embed, self.warmup_steps)
        return [lr] * self.num_param_groups

In [None]:
def evaluate_candidate(candidate, N):
    try:
        scaler = torch.cuda.amp.GradScaler()
        # LOAD PARAMS
        emb_size = hp["emb_size"]["choices"][candidate["emb_size"]] 
        num_heads = hp["num_head"]["choices"][candidate["num_head"]]
        d_ff = hp["d_ff"]["choices"][candidate["d_ff"]]
#         num_layers = candidate["num_layers"]
#         DR = candidate["DR"]
        acf = hp["acf"]["choices"][candidate["acf"]]
        patch_size = hp["patch_size"]["choices"][candidate["patch_size"]]
#         lr = candidate["lr"]

        # Create Model
        print(".....Creating model.....")
        model = MyViT(
                DR = 0.1,
                activation = acf,
                patch_size = patch_size,
                emb_dim = emb_size, 
                num_layers = 12,
                num_heads = num_heads,
                num_classes = num_class,
                d_ff = d_ff,
                in_channels = 3,
                img_size = 224   
        ).to(device)
        criterion = nn.CrossEntropyLoss()
        optimizer = torch.optim.Adam(
            model.parameters(),
            lr = 0.000050, 
            betas = (0.9, 0.999),
            eps = 1.0e-9,
            weight_decay = 1e-4
        )
        model, optimizer, criterion = accelerator.prepare(model, optimizer, criterion)
        warmup_steps = len(train_loader)*5

        scheduler = Scheduler(
            optimizer,
            dim_embed = emb_size,
            warmup_steps = warmup_steps,
            steps_in_epoch = len(train_loader),
        )
        N_EPOCHS = N
        early_stopping_patience = 4
        best_val_loss = float("inf")
        print("------------TRAIN - VAL - LOOP--------------")
        
        total_start_time = time.time()  # Start total timer
        
        # ------------ TRAIN -- LOOP ----------------
        for epoch in range(N_EPOCHS):
            # torch.cuda.synchronize()
            # torch.cuda.empty_cache()
            train_loss = 0.0
            model.train()
            correctt, totalt = 0, 0
            for batch in tqdm(train_loader, desc=f"Epoch {epoch + 1} in training", leave=True):
                x, y = batch
                x, y = x.to(device), y.to(device)
                optimizer.zero_grad()
                with torch.amp.autocast(device_type = 'cuda', dtype = torch.float16):      
                    y_hat = model(x)
                    del x
                    loss = criterion(y_hat.to(device), y)
                scaler.scale(loss).backward() #Change from loss.backward()
                # optimizer.step()
                scaler.step(optimizer)
                scaler.update()
                scheduler.step()
                train_loss = train_loss + loss.item() / len(train_loader)
                correctt = correctt + (torch.argmax(y_hat, dim=1) == y).sum().item()
                totalt = totalt + y.size(0)
        # ----------- VALIDATION -------------
            with torch.no_grad():
                correct, total = 0, 0
                test_loss = 0.0
                all_preds, all_target = [], []
                for batch in tqdm(val_loader, desc="Testing"):
                    x, y = batch
                    x, y = x.to(device), y.to(device)
                    y_hat = model(x)
                    del x
                    y_hat_cpu = y_hat.to("cpu")
                    y_cpu = y.to("cpu")
                    loss = criterion(y_hat.to(device), y)
                    test_loss = test_loss + loss.item() / len(val_loader)
        
                    correct = correct + (torch.argmax(y_hat, dim=1) == y).sum().item()
                    all_preds.append(torch.argmax(y_hat_cpu, dim = 1).flatten().tolist())
                    all_target.append(torch.flatten(y_cpu).tolist())
                    total = total + y.size(0)
            
            test_acc = round(correct / total, 4)
            print(f"Epoch: {epoch + 1}/{N_EPOCHS} Train loss: {train_loss:.2f} Train accuracy: {correctt / totalt * 100:.2f}% Val loss: {test_loss:.2f} Val accuracy: {correct / total * 100:.2f}%")
            # Early Stopping
            if test_loss < best_val_loss:
                best_val_loss = test_loss
                no_improvement_counter = 0
            else:
                no_improvement_counter = no_improvement_counter + 1
                if no_improvement_counter >= early_stopping_patience:
                    print(f"Early stopping triggered at epoch {epoch + 1}")
                    break
        total_end_time = time.time()  # End total timer
        print(f"Total time taken: {(total_end_time - total_start_time):.2f} seconds")
        
        
    except Exception as err:
        print("ERROR while evaluating candidate!")
        print(err)
        return float("inf"), 0.0
    return test_loss, test_acc

In [None]:
def experiment(candidate, N):
    try:
        scaler = torch.cuda.amp.GradScaler()
        # LOAD PARAMS
        emb_size = candidate["emb_size"]
        num_heads = candidate["num_head"]
        d_ff = candidate["d_ff"]
        num_layers = candidate["num_layers"]
        DR = candidate["DR"]
        acf = candidate["acf"]
        patch_size = candidate["patch_size"]
        lr = candidate["lr"]

        # Create Model
        print(".....Creating model.....")
        model = MyViT(
                DR = DR,
                activation = acf,
                patch_size = patch_size,
                emb_dim = emb_size, 
                num_layers = num_layers,
                num_heads = num_heads,
                num_classes = num_class,
                d_ff = d_ff,
                in_channels = 3,
                img_size = 224   
        ).to(device)
        criterion = nn.CrossEntropyLoss()
        optimizer = torch.optim.Adam(
            model.parameters(),
            lr = lr, 
            betas = (0.9, 0.999),
            eps = 1.0e-9,
            weight_decay = 1e-4
        )
        model, optimizer, criterion = accelerator.prepare(model, optimizer, criterion)
        warmup_steps = len(train_loader)*5

        scheduler = Scheduler(
            optimizer,
            dim_embed = emb_size,
            warmup_steps = warmup_steps,
            steps_in_epoch = len(train_loader),
        )
        N_EPOCHS = N
        early_stopping_patience = 4
        best_val_loss = float("inf")
        print("------------TRAIN - VAL - LOOP--------------")
        
        total_start_time = time.time()  # Start total timer
        
        # ------------ TRAIN -- LOOP ----------------
        for epoch in range(N_EPOCHS):
            train_loss = 0.0
            model.train()
            correctt, totalt = 0, 0
            for batch in tqdm(train_loader, desc=f"Epoch {epoch + 1} in training", leave=True):
                x, y = batch
                x, y = x.to(device), y.to(device)
                optimizer.zero_grad()
                with torch.amp.autocast(device_type = 'cuda', dtype = torch.float16):      
                    y_hat = model(x)
                    del x
                    loss = criterion(y_hat.to(device), y)
                scaler.scale(loss).backward() 
                scaler.step(optimizer)
                scaler.update()
                scheduler.step()
                train_loss = train_loss + loss.item() / len(train_loader)
                correctt = correctt + (torch.argmax(y_hat, dim=1) == y).sum().item()
                totalt = totalt + y.size(0)
        # ----------- VALIDATION -------------
            with torch.no_grad():
                correct, total = 0, 0
                test_loss = 0.0
                all_preds, all_target = [], []
                for batch in tqdm(val_loader, desc="Testing"):
                    x, y = batch
                    x, y = x.to(device), y.to(device)
                    y_hat = model(x)
                    del x
                    y_hat_cpu = y_hat.to("cpu")
                    y_cpu = y.to("cpu")
                    loss = criterion(y_hat.to(device), y)
                    test_loss = test_loss + loss.item() / len(val_loader)
        
                    correct = correct + (torch.argmax(y_hat, dim=1) == y).sum().item()
                    all_preds.append(torch.argmax(y_hat_cpu, dim = 1).flatten().tolist())
                    all_target.append(torch.flatten(y_cpu).tolist())
                    total = total + y.size(0)
            
            test_acc = round(correct / total, 4)
            print(f"Epoch: {epoch + 1}/{N_EPOCHS} Train loss: {train_loss:.2f} Train accuracy: {correctt / totalt * 100:.2f}% Val loss: {test_loss:.2f} Val accuracy: {correct / total * 100:.2f}%")
            # Early Stopping
            if test_loss < best_val_loss:
                best_val_loss = test_loss
                no_improvement_counter = 0
            else:
                no_improvement_counter = no_improvement_counter + 1
                if no_improvement_counter >= early_stopping_patience:
                    print(f"Early stopping triggered at epoch {epoch + 1}")
                    break
        total_end_time = time.time()  # End total timer
        print(f"Total time taken: {(total_end_time - total_start_time):.2f} seconds")
        
        
    except Exception as err:
        print("ERROR while evaluating candidate!")
        print(err)
        return float("inf"), 0.0
    return test_loss, test_acc

# DOE

In [None]:
def generate_candidates_taguchi_L18():
    """
    Generate candidates for hyperparameters using a Taguchi L18 orthogonal array.

    Returns:
        list: A list of candidate dictionaries.
    """
    # Define the discrete levels for each hyperparameter (ensuring correct indexing) On the basis of servay
    levels = {
        "acf": ["relu", "gelu"],                     
        "patch_size": [4, 8, 16],                     
        "emb_size": [32, 128, 512],     
        "num_layers": [4, 12, 16],             
        "num_head": [4, 8, 16],                    
        "DR": [0.1, 0.3, 0.5],             
        "d_ff": [48, 192, 768],   
        "lr": [1e-6, 5e-5, 1e-4]              
    }

    # Define the corrected Taguchi L18 orthogonal array (valid values)
    taguchi_array = np.array([
        [1,1,1,1,1,1,1,1], [1,1,2,2,2,2,2,2],
        [1,1,3,3,3,3,3,3], [1,2,1,2,3,3,2,1],
        [1,2,2,3,1,1,3,2], [1,2,3,1,2,2,1,3],
        [1,3,1,3,2,1,2,2], [1,3,2,1,3,2,3,3],
        [1,3,3,2,1,3,1,1], [2,1,1,2,3,2,3,1],
        [2,1,2,3,1,3,1,2], [2,1,3,1,2,1,2,3],
        [2,2,1,3,1,1,2,3], [2,2,2,1,2,2,3,1],
        [2,2,3,2,3,3,1,2], [2,3,1,1,3,3,1,2],
        [2,3,2,2,1,1,2,3], [2,3,3,3,2,2,3,1]
    ])

    candidates = []
    for row in taguchi_array:
        candidate = {
            "acf": levels["acf"][row[0] - 1],
            "patch_size": levels["patch_size"][row[1] - 1],
            "emb_size": levels["emb_size"][row[2] - 1],
            "num_layers": levels["num_layers"][row[3] - 1],
            "num_head": levels["num_head"][row[4] - 1],
            "DR": levels["DR"][row[5] - 1],
            "d_ff": levels["d_ff"][row[6] - 1],
            "lr": levels["lr"][row[7] - 1]
        }
        candidates.append(candidate)

    return candidates

# Example usage:
candidates_L18 = generate_candidates_taguchi_L18()
print("Generated Candidates using Taguchi L18:")
for i, cand in enumerate(candidates_L18):
    print(f"Candidate {i+1}: {cand}")

In [None]:
taguchi_acc = []
taguchi_loss = []
for i, cand in enumerate(candidates_L18):
    print(f"Candidate {i+1}: {cand}")  
    loss, acc = experiment(cand, 50) 
    taguchi_acc.append(acc)
    taguchi_loss.append(loss)

# Finding the ranking of Hp

In [None]:
df = pd.DataFrame(candidates_L18)
df["accuracy"] = taguchi_acc
df["loss"] = taguchi_loss
df

In [None]:
### Statistical Analysis
## Mean and Standard Deviation
mean_acc = df["accuracy"].mean()
mean_loss = df["loss"].mean()
std_acc = df["accuracy"].std()
std_loss = df["loss"].std()

print(f"Mean Of Accuracy And Loss: {mean_acc:.2f} and {mean_loss:.2f}")
print(f"Standard Deviation Of Accuracy And Loss: {std_acc:.2f} and {std_loss:.2f}")

In [None]:
### Perfom ANOVA 
df = pd.get_dummies(df, columns = ["acf"], drop_first = True)
anova_acc = f_oneway(df["accuracy"], 
                     df["lr"],
                     df["patch_size"],
                     df["emb_size"],
                     df["num_layers"],
                     df["num_head"],
                     df["DR"], 
                     df["acf_relu"],
                     df["d_ff"])

anova_loss = f_oneway(df["loss"], 
                     df["lr"],
                     df["patch_size"],
                     df["emb_size"],
                     df["num_layers"],
                     df["num_head"],
                     df["DR"], 
                     df["acf_relu"],
                     df["d_ff"])

print(f"ANOVA For Accuracy test p-value = {anova_acc.pvalue:.12f} and test statistic = {anova_acc.statistic:.5f}")
print(f"ANOVA For Loss test p-value = {anova_loss.pvalue:.12f} and test statistic = {anova_loss.statistic:.5f}")

In [None]:
### Feature Importance Analysis
X = df.iloc[:, [0,1,2,3,4,5,6,9]]
y = df[["accuracy", "loss"]]
rf = RandomForestRegressor(n_estimators = 100, random_state = 42)
rf.fit(X, y)
feature_imp = pd.Series(rf.feature_importances_, index = X.columns)
print("\nHP Importance:\n")
print(feature_imp.sort_values(ascending = False))

In [None]:
plt.figure(figsize = (6, 4))
sns.barplot(x = feature_imp.values,
            y = feature_imp.index,
            palette = "viridis")
plt.title("HP Importance In Accuracy And Loss Prediction")
plt.show()

In [None]:
plt.figure(figsize = (15, 10))
for i, col in enumerate(df.columns[[0,1,2,3,4,5,6,9]]):
    plt.subplot(3,3,i+1)
    sns.boxplot(x = df[col], y = df["accuracy"])
    plt.xticks(rotation = 45)
    plt.title(f"Impact of {col} on accuracy")
    
plt.tight_layout()
plt.show()

In [None]:
plt.figure(figsize = (15, 10))
for i, col in enumerate(df.columns[[0,1,2,3,4,5,6,9]]):
    plt.subplot(3,3,i+1)
    sns.boxplot(x = df[col], y = df["loss"])
    plt.xticks(rotation = 45)
    plt.title(f"Impact of {col} on loss")
    
plt.tight_layout()
plt.show()

# Redefine Hps

In [None]:
# Redefine Hyper parameters 
hp = {
    'Based on importance ranking redefine the hp'
}

# LLM+DE

In [None]:
def clamp(x, low, high):
    return max(low, min(x, high))

In [None]:
# Randomly Generate candidate 
def generate_candidate():
    candidate = {}
    for param in hp.keys():
        if hp[param]["type"] == "float":
            candidate[param] = random.uniform(hp[param]["low"], hp[param]["high"])
        elif hp[param]["type"] == "int" or hp[param]["type"] == "choice":
            candidate[param] = random.randint(hp[param]["low"], hp[param]["high"])
    return candidate

# Using DOE generate candidate
def generate_candidate_doe(hp):
    param_keys = list(hp.keys())
    n_params = len(param_keys)
    # Generate LHS samples in [0, 1]
    lhs_sample = lhs(n_params, samples=1)[0]
    # Scale samples to the hyperparameter ranges
    candidate = {}
    for i, param in enumerate(param_keys):
            param_info = hp[param]
            if param_info["type"] == "float":
                # Scale [0, 1] sample to the float range
                candidate[param] = param_info["low"] + lhs_sample[i] * (param_info["high"] - param_info["low"])
            elif param_info["type"] == "int":
                # Scale and round to integer range
                candidate[param] = int(round(param_info["low"] + lhs_sample[i] * (param_info["high"] - param_info["low"])))
            elif param_info["type"] == "choice":
                # Treat as discrete choices
                candidate[param] = int(round(param_info["low"] + lhs_sample[i] * (param_info["high"] - param_info["low"])))
    # for i, param in enumerate(param_keys):
    #     param_info = hp[param]
    #     if param_info["type"] == "float":
    #         # Scale [0, 1] sample to the float range
    #         candidate[param] = param_info["low"] + lhs_sample[i] * (param_info["high"] - param_info["low"])
    #     elif param_info["type"] == "int":
    #         # Scale and round to integer range
    #         candidate[param] = int(round(param_info["low"] + lhs_sample[i] * (param_info["high"] - param_info["low"])))
    #     elif param_info["type"] == "choice":
    #         # Map sample to discrete choices
    #         idx = int(lhs_sample[i] * len(param_info["choices"]))  # Scale to choice indices
    #         candidate[param] = param_info["choices"][min(idx, len(param_info["choices"]) - 1)]
    return candidate   

In [None]:
# Print Candidate
def print_candidate(candidate):
    readable_candidate = {}
    for param in candidate.keys():
        if hp[param]["type"] == "choice":
            choice = candidate[param]
            readable_candidate[param] = hp[param]["choices"][choice]
        else:
            readable_candidate[param] = candidate[param]
    print(readable_candidate)

In [None]:
# Generate Population
def generate_population(POP_SIZE = 5):
    return [{"candidate": generate_candidate_doe(hp), "score": float("inf"), "Acc": 0} for _ in range(POP_SIZE)]

In [None]:
population = generate_population(POP_SIZE = POP_SIZE)
print(population)

In [None]:
def print_population(population):
    for p in population:
        print(f"{p['score']:.4f}", end=" -> ")
        print(f"{p['Acc']:.4f}", end = " -> ")
        print_candidate(p['candidate'])

In [None]:
# CALCULATE SCORES FOR INITIAL POPULATION
def cal_score_intial_pop(population, N):
    for i in range(len(population)):
        print(f"-------------------------[ CANDIDATE {i+1:2d} ]-------------------------")
        candidate = population[i]["candidate"]
        print_candidate(candidate)
        score, Acc = evaluate_candidate(candidate , N)
        population[i]["score"] = score
        population[i]["Acc"] = Acc
    best_index = np.argmin([c["score"] for c in population])
    best_score = population[best_index]["score"]
    best_Acc = population[best_index]["Acc"]
    best_candidate = population[best_index]["candidate"]
    return best_candidate, best_score, best_Acc

In [None]:
API_KEY =  "Your API Key"
def prompt(target_vector, donor_vector):
    """
    Generate a natural language prompt.
    """
    prompt = (
        f" I am running a hyper parameter tuning of ViT model using differential algorithm. I give you my trail vector and target vector after mutation.Your role is to perform the cross over as you are the expert on cross over operation.\n"
        f" - Here's the donor vector: {donor_vector}\n"
        f" - Here's the traget vector: {target_vector}\n"
        f" - Now perform the cross over. Choose the float values from the target and donor vector on the basis of normal distribution.\n"
        f" Return the trail vector in jeson format (NO PREAMBLE)."
    )
    return prompt

In [None]:
def llm_guided_crossover(target_vector, donor_vector):
    llm = ChatGroq(
    temperature = 0,
    groq_api_key = API_KEY,
    model_name = 'llama-3.3-70b-versatile'   
    )
    chat = prompt(target_vector, donor_vector)
    response = llm.invoke(chat)
    json_parser = JsonOutputParser()
    json_res = json_parser.parse(response.content)
    return json_res

In [None]:
def De(population = population,
       MAX_GEN = MAX_GEN, 
       F = SCALING_FAC, 
    #    CR = CROSSOVER_RATE, 
       N = 50):
    print_population(population)
    best_candidate, best_score, best_Acc = cal_score_intial_pop(population, N)
    for G in range(MAX_GEN):
        print(f"=========================[ GENERATION {G+1:2d} ]=========================")
        for i in range(len(population)):
            print(f"-------------------------[ CANDIDATE  {i+1:2d} ]-------------------------")
            target_vector = population[i]["candidate"]
            print("target_vector:", end=" ")
            print_candidate(target_vector)
            # GENERATE
            choices = list(range(0, i)) + list(range(i+1, POP_SIZE))  # make chance of picking ith candidate 0
            a, b, c = np.random.choice(choices, 3, replace=False)
            x1 = population[a]["candidate"]
            x2 = population[b]["candidate"]
            x3 = population[c]["candidate"]
            print(f"X_r1 = {x1}")
            print(f"X_r2 = {x2}")
            print(f"X_r3 = {x3}")
            # MUTATION
            donor_vector = {}
            for param in target_vector.keys():
                donor_vector[param] = x1[param] + F * (x2[param] - x3[param])
                if hp[param]["type"] in ("int", "choice"):
                    donor_vector[param] = round(donor_vector[param])
                donor_vector[param] = clamp(donor_vector[param], hp[param]["low"], hp[param]["high"])
            print("donor_vector:", end=" ")
            print_candidate(donor_vector)

            # LLM BASED CROSSOVER
            try:
                trial_vector = llm_guided_crossover(target_vector, donor_vector)
                print("trial_vector:", end=" ")
                print_candidate(trial_vector)
            except Exception as e:
                print("Error during LLM-guided crossover:", e) 


            # EVALUATE
            trial_score, trial_acc = evaluate_candidate(trial_vector, N)
            if trial_score < population[i]["score"]:
                print(f"{trial_score:0.5f} < {population[i]['score']:0.5f}, picking trial_vector")
                population[i]["candidate"] = trial_vector
                population[i]["score"] = trial_score
                population[i]["Acc"] = trial_acc
            else:
                print(f"{trial_score:0.5f} >= {population[i]['score']:0.5f}, keeping target_vector")

                # FIND BEST TILL NOW
        best_index = np.argmin([c["score"] for c in population])
        new_best_score = population[best_index]["score"]
        new_best_acc = population[best_index]["Acc"]
        if new_best_score < best_score:
            print(f"Best score improved from {best_score:0.4f} to {new_best_score:0.4f}")
            best_score = new_best_score
            best_Acc = new_best_acc
            best_candidate = population[best_index]["candidate"]
            print("Best candidate: ", end="")
            print_candidate(best_candidate)

        print("====================================")
        print("Final Best candidate: ", end="")
        print_candidate(best_candidate)

        # WRITE TO CSV LOG
        logfilepath = os.path.join(save_dir,f"logs_p{POP_SIZE}_bs{BATCH_SIZE}.csv")
        if not os.path.isfile(logfilepath):
            with open(logfilepath, "a") as logfile:
                logfile.write("gen," + ",".join(map(str, range(len(population)))) + "\n")

        with open(logfilepath, "a") as logfile:
            logfile.write(f"{G+1}," + ",".join(map(str, [c["score"] for c in population])) + "\n")

        print("\nPopulation at end of generation", G+1)
        print_population(population)
        torch.cuda.empty_cache()

In [None]:
torch.cuda.empty_cache()
De()

# **RUNNING BEST MODEL ARCHITECTURE**

In [None]:
best = {}