# IMPORT ALL LIBRARY

In [None]:
#!pip install einops

In [None]:
import torch
import numpy as np
import torchvision
import pathlib
from torchvision.transforms import transforms
from torch.utils.data import DataLoader, Dataset, random_split
from torch.utils.checkpoint import checkpoint
import torch.nn.functional as F
from torch import nn
from torch import Tensor
from einops import rearrange, repeat
from einops.layers.torch import Rearrange
import time
from tqdm import tqdm, trange
from accelerate import Accelerator
import random
import os
from sklearn.metrics import confusion_matrix, recall_score, precision_score
# from torch.cuda.amp import autocast, GradScaler
from datasets import load_dataset
from PIL import Image
torch.cuda.empty_cache()

In [None]:
RANDOM_SEED = 42
random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
torch.cuda.manual_seed(RANDOM_SEED)
torch.cuda.manual_seed_all(RANDOM_SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [None]:
POP_SIZE = 10
N = 30
MAX_GEN = 10
CROSSOVER_RATE = 0.9        # CR
SCALING_FAC  = 0.8         # F
save_dir = './'

# DATA PRE-PROCESSING

In [None]:
label = {
    "Tomato___Bacterial_spot": 0,
    "Tomato___Early_blight": 1,
    "Tomato___Late_blight": 2,
    "Tomato___Leaf_Mold": 3,
    "Tomato___Septoria_leaf_spot": 4,
    "Tomato___Spider_mites Two-spotted_spider_mite": 5,
    "Tomato___Target_Spot": 6,
    "Tomato___Tomato_Yellow_Leaf_Curl_Virus": 7,
    "Tomato___Tomato_mosaic_virus": 8,
    "Tomato___healthy": 9
}

In [None]:
class CustomDataSet(Dataset):

    def __init__(self, dataset, label, transform=None):
        self.dataset = dataset
        self.label = label
        self.transform = transform

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        item = self.dataset[idx]
        image = item['image'].convert("RGB")  # Ensure image is in RGB format
        label = item['label']
        
        # Apply the transform if provided
        if self.transform:
            image = self.transform(image)
        
        # Map the label to the correct index
        label_name = list(self.label.keys())[label]
        label_idx = self.label[label_name]
        
        return image, label_idx

In [None]:
# Data Path
train_path = './tomato/New Plant Diseases Dataset(Augmented)/train'
test_path = './tomato/New Plant Diseases Dataset(Augmented)/valid'
BATCH_SIZE = 32
# Transforms
def dataloader():

    # Load the dataset
    ds = load_dataset("./tomato/New Plant Diseases Dataset(Augmented)/")
    train_data = ds["train"]
    validation_data = ds["validation"]
    
    train_transformer = transforms.Compose([
        transforms.Resize((224, 224)), #224, 224
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(degrees=10), #15
        transforms.ToTensor(),  
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) 
    ])

    test_transformer = transforms.Compose([
        transforms.Resize((224, 224)), #224, 224
        transforms.ToTensor(),
        transforms.Normalize(mean = torch.tensor([0.485, 0.456, 0.406]), 
                             std = torch.tensor([0.229, 0.224, 0.225])),
    ])
    
    train_ratio = 0.75
    
    # Create the custom Dataset
    train_dataset = CustomDataSet(train_data, label, train_transformer)
    test_dataset = CustomDataSet(validation_data, label, train_transformer)

    #  Calculate the sizes for each split
    dataset_size = len(train_dataset)
    train_size = int(train_ratio * dataset_size)
    validation_size = dataset_size - train_size
    
    # Perform the split
    train_dataset, validation_dataset = random_split(train_dataset, [train_size, validation_size])
    train_loader = DataLoader(
        train_dataset,
        # torchvision.datasets.ImageFolder(train_path,transform = train_transformer),
        batch_size = BATCH_SIZE, shuffle = True, #pin_memory=True #num_workers = 2,
    )

    val_loader = DataLoader(
        validation_dataset,
        batch_size = BATCH_SIZE, shuffle = False,
    )
    
    test_loader = DataLoader(
        test_dataset,
        # torchvision.datasets.ImageFolder(test_path,transform = test_transformer),
        batch_size = BATCH_SIZE, shuffle = False, #num_workers = 2, pin_memory=True
    )

    root=pathlib.Path(train_path)
    classes=sorted([j.name.split('/')[-1] for j in root.iterdir()])

    return train_loader, val_loader, test_loader, classes

In [None]:
# Loading Data Set
accelerator = Accelerator()
device = torch.device("cuda:2")
device = accelerator.device
train_loader, val_loader, test_loader, classes = dataloader()
train_loader, val_loader, test_loader = accelerator.prepare(train_loader, val_loader, test_loader)
num_class = len(classes)

len(train_loader), len(val_loader), len(test_loader)

# ViT MODEL

In [None]:
# Create Patch embedding
class Embedding(nn.Module):

    def __init__(self, 
                 # batch_size, 
                 DR, 
                 patch_size: int, 
                 emb_dim: int, 
                 in_channels: int = 3, 
                 img_size: int = 224):
        super().__init__()
        self.in_channels = in_channels
        self.patch_size = patch_size
        self.emb_dim = emb_dim
        # self.batch_size = batch_size
        assert img_size % self.patch_size == 0, f"Input img must be divisble by patch size {self.patch_size}"
        self.num_patches = (img_size * img_size) // (self.patch_size ** 2)

        self.patcher = nn.Conv2d(
                in_channels = self.in_channels,
                out_channels = self.emb_dim,
                kernel_size = self.patch_size,
                stride = self.patch_size,
                padding = 0
                    )
        self.flatten = nn.Flatten(2)
        
        self.cls_token = nn.Parameter(
            torch.randn(1, 1, self.emb_dim),
            requires_grad = True
        )
        self.pos_embd = nn.Parameter(
            torch.randn(1, self.num_patches + 1, self.emb_dim),
            requires_grad = True
        )
        self.emb_dropout = nn.Dropout(p = DR)

    def forward(self, x):
        self.batch_size = x.shape[0]
        img_res = x.shape[-1]
        assert img_res % self.patch_size == 0, f"Input img must be divisble by patch size {self.patch_size} and current image shape {img_res}"
        cls_token = self.cls_token.expand(self.batch_size, -1, -1)
        x = self.patcher(x)
        x = self.flatten(x)
        x = x.transpose(1, 2)
        # x = x.permute(0, 2 , 1)
        x = torch.cat((cls_token, x), dim = 1)
        x = x + self.pos_embd
        x = self.emb_dropout(x)

        return x

In [None]:
# Create ViT Model

class MyViT(nn.Module):

    def __init__(self,
                DR,
                activation,
                patch_size,
                emb_dim, 
                num_layers,
                num_heads,
                num_classes,
                d_ff,
                in_channels: int = 3,
                img_size: int = 224): #224
        super().__init__()
        self.mlp_size = d_ff #4 * emb_dim
        assert img_size % patch_size == 0, "Img Size must be divisble patch size"
        self.embedding = Embedding(
                                  DR = DR,
                                  patch_size = patch_size,
                                  emb_dim = emb_dim,
                                  in_channels = in_channels,
                                  img_size = img_size
                                  )
        self.encoder = nn.TransformerEncoder(
                                encoder_layer = nn.TransformerEncoderLayer(
                                                            d_model = emb_dim,
                                                            nhead = num_heads,
                                                            dim_feedforward = self.mlp_size,
                                                            activation = activation,
                                                            batch_first = True,
                                                            norm_first = True), # Create a single Transformer Encoder Layer
                                                    num_layers = num_layers
                                            )
        self.mlp_head = nn.Sequential(
            nn.LayerNorm(normalized_shape = emb_dim, eps = 1e-12),
            nn.Linear(in_features = emb_dim,
                     out_features = num_classes)
        )

    def forward(self, x):

        x = self.embedding(x)
        x = self.encoder(x)
        x = self.mlp_head(x[:, 0])
        return x

# HYPER-PARAMETER DECLARE

In [None]:
# Hyper parameters
hp = {
    'patch_size': {
        "low": 0,
        "high": 2,
        "type": "choice",
        "choices": [8, 4, 16]
    },
    'emb_size': {
        "low": 0,
        "high": 5,
        "type": "choice",
        "choices": [32, 64, 128, 256, 512, 768],
    },
    'num_layers': {
        "low": 4,
        "high": 16,
        "type": "int",
    },
    'num_head': {
        "low": 0,
        "high": 3,
        "type": "choice",
        "choices": [2, 4, 8, 16]
    },
    "DR": {
        "low": 0.1,
        "high": 0.5,
        "type": "float"
    },
    "acf": {
        "low": 0,
        "high": 1,
        "type": "choice",
        "choices": ["relu", "gelu"]
    },
    "d_ff": {
        "low": 0,
        "high": 4,
        "type": "choice",
        "choices": [128, 256, 512, 1024, 3072]
    }
}

# EVALUATE MODEL

In [None]:
class Scheduler(torch.optim.lr_scheduler._LRScheduler):
    def __init__(self,
                 optimizer,
                 dim_embed,
                 warmup_steps,
                 steps_in_epoch,
                 last_epoch=-1,
                 verbose=False):

        self.dim_embed = dim_embed
        self.warmup_steps = warmup_steps
        self.num_param_groups = len(optimizer.param_groups)

        super().__init__(optimizer, last_epoch, verbose)
        self._step_count = (last_epoch+1)*steps_in_epoch

    def get_lr(self):
        lr = calc_lr(self._step_count, self.dim_embed, self.warmup_steps)
        return [lr] * self.num_param_groups


def calc_lr(step, dim_embed, warmup_steps):
    return dim_embed**(-0.5) * min(step**(-0.5), step * warmup_steps**(-1.5))


def evaluate_candidate(candidate, N):
    try:
        scaler = torch.amp.GradScaler()
        # LOAD PARAMS
        emb_size = hp["emb_size"]["choices"][candidate["emb_size"]] 
        num_heads = hp["num_head"]["choices"][candidate["num_head"]]
        d_ff = hp["d_ff"]["choices"][candidate["d_ff"]]
        num_layers = candidate["num_layers"]
        DR = candidate["DR"]
        acf = hp["acf"]["choices"][candidate["acf"]]
        patch_size = hp["patch_size"]["choices"][candidate["patch_size"]]

        # Create Model
        print(".....Creating model.....")
        model = MyViT(
                DR = DR,
                activation = acf,
                patch_size = patch_size,
                emb_dim = emb_size, 
                num_layers = num_layers,
                num_heads = num_heads,
                num_classes = num_class,
                d_ff = d_ff,
                in_channels = 3,
                img_size = 256   #224
        ).to(device)
       
        criterion = nn.CrossEntropyLoss()
        optimizer = torch.optim.Adam(
            model.parameters(),
            lr = 0.0001, # 
            betas = (0.9, 0.999),
            eps = 1.0e-9,
            weight_decay = 1e-4
        )
        model, optimizer, criterion = accelerator.prepare(model, optimizer, criterion)
        warmup_steps = len(train_loader)*5

        scheduler = Scheduler(
            optimizer,
            dim_embed = emb_size,
            warmup_steps = warmup_steps,
            steps_in_epoch = len(train_loader),
        )
        N_EPOCHS = N
        early_stopping_patience = 4
        best_val_loss = float("inf")
        print("------------TRAIN - VAL - LOOP--------------")
        
        total_start_time = time.time()  # Start total timer
        
        # ------------ TRAIN -- LOOP ----------------
        for epoch in range(N_EPOCHS):
            # torch.cuda.synchronize()
            # torch.cuda.empty_cache()
            train_loss = 0.0
            model.train()
            correctt, totalt = 0, 0
            for batch in tqdm(train_loader, desc=f"Epoch {epoch + 1} in training", leave=True):
                x, y = batch
                x, y = x.to(device), y.to(device)
                optimizer.zero_grad()
                with torch.amp.autocast(device_type = 'cuda', dtype = torch.bfloat16):      
                    y_hat = model(x)
                    del x
                    loss = criterion(y_hat.to(device), y)
                # loss.backward()
                scaler.scale(loss).backward() #Change from loss.backward()
                # optimizer.step()
                scaler.step(optimizer)
                scaler.update()
                scheduler.step()
                train_loss = train_loss + loss.item() / len(train_loader)
    
                # optimizer.zero_grad()
                # loss.backward()
                # optimizer.step()
                # scheduler.step()
                correctt = correctt + (torch.argmax(y_hat, dim=1) == y).sum().item()
                totalt = totalt + y.size(0)

            # avg_train_loss = train_loss / len(train_loader)
        # ----------- VALIDATION -------------
            with torch.no_grad():
                correct, total = 0, 0
                test_loss = 0.0
                all_preds, all_target = [], []
                for batch in tqdm(val_loader, desc="Testing"):
                    x, y = batch
                    x, y = x.to(device), y.to(device)
                    y_hat = model(x)
                    del x
                    y_hat_cpu = y_hat.to("cpu")
                    y_cpu = y.to("cpu")
                    loss = criterion(y_hat.to(device), y)
                    test_loss = test_loss + loss.item() / len(test_loader)
        
                    correct = correct + (torch.argmax(y_hat, dim=1) == y).sum().item()
                    all_preds.append(torch.argmax(y_hat_cpu, dim = 1).flatten().tolist())
                    all_target.append(torch.flatten(y_cpu).tolist())
                    total = total + y.size(0)
            
            test_acc = round(correct / total, 4)
            print(f"Epoch: {epoch + 1}/{N_EPOCHS} Train loss: {train_loss:.2f} Train accuracy: {correctt / totalt * 100:.2f}% Val loss: {test_loss:.2f} Val accuracy: {correct / total * 100:.2f}%")
            # Early Stopping
            if test_loss < best_val_loss:
                best_val_loss = test_loss
                no_improvement_counter = 0
            else:
                no_improvement_counter = no_improvement_counter + 1
                if no_improvement_counter >= early_stopping_patience:
                    print(f"Early stopping triggered at epoch {epoch + 1}")
                    break
        total_end_time = time.time()  # End total timer
        print(f"Total time taken: {(total_end_time - total_start_time):.2f} seconds")
        
        
    except Exception as err:
        print("ERROR while evaluating candidate!")
        print(err)
        return float("inf"), 0.0
    return test_loss, test_acc


# DIFFERNTIAL EVOLUTION

In [None]:
def clamp(x, low, high):
    return max(low, min(x, high))

# Generate candidate 
def generate_candidate():
    candidate = {}
    for param in hp.keys():
        if hp[param]["type"] == "float":
            candidate[param] = random.uniform(hp[param]["low"], hp[param]["high"])
        elif hp[param]["type"] == "int" or hp[param]["type"] == "choice":
            candidate[param] = random.randint(hp[param]["low"], hp[param]["high"])
    return candidate
# Print Candidate
def print_candidate(candidate):
    readable_candidate = {}
    for param in candidate.keys():
        if hp[param]["type"] == "choice":
            choice = candidate[param]
            readable_candidate[param] = hp[param]["choices"][choice]
        else:
            readable_candidate[param] = candidate[param]
    print(readable_candidate)

# Generate Population
def generate_population(POP_SIZE = POP_SIZE):
    return [{"candidate": generate_candidate(), "score": float("inf"), "Acc": 0} for _ in range(POP_SIZE)]
def print_population(population):
    for p in population:
        print(f"{p['score']:.4f}", end=" -> ")
        print(f"{p['Acc']:.4f}", end = " -> ")
        print_candidate(p['candidate'])


# CALCULATE SCORES FOR INITIAL POPULATION
def cal_score_intial_pop(population, N):
    for i in range(len(population)):
        print(f"-------------------------[ CANDIDATE {i+1:2d} ]-------------------------")
        candidate = population[i]["candidate"]
        print_candidate(candidate)
        score, Acc = evaluate_candidate(candidate , N)
        population[i]["score"] = score
        population[i]["Acc"] = Acc
    best_index = np.argmin([c["score"] for c in population])
    best_score = population[best_index]["score"]
    best_Acc = population[best_index]["Acc"]
    best_candidate = population[best_index]["candidate"]
    return best_candidate, best_score, best_Acc


def De(population = generate_population(POP_SIZE = POP_SIZE), MAX_GEN = MAX_GEN, F = SCALING_FAC, CR = CROSSOVER_RATE, N = N):
    print_population(population)
    best_candidate, best_score, best_Acc = cal_score_intial_pop(population, N)
    for G in range(MAX_GEN):
        print(f"=========================[ GENERATION {G+1:2d} ]=========================")
        for i in range(len(population)):
            print(f"-------------------------[ CANDIDATE  {i+1:2d} ]-------------------------")
            target_vector = population[i]["candidate"]
            print("target_vector:", end=" ")
            print_candidate(target_vector)

            # GENERATE
            choices = list(range(0, i)) + list(range(i+1, POP_SIZE))  # make chance of picking ith candidate 0
            a, b, c = np.random.choice(choices, 3, replace=False)

            x1 = population[a]["candidate"]
            x2 = population[b]["candidate"]
            x3 = population[c]["candidate"]

            # MUTATION
            donor_vector = {}
            for param in target_vector.keys():
                donor_vector[param] = x1[param] + F * (x2[param] - x3[param])
                if hp[param]["type"] in ("int", "choice"):
                    donor_vector[param] = round(donor_vector[param])
                donor_vector[param] = clamp(donor_vector[param], hp[param]["low"], hp[param]["high"])

            print("donor_vector:", end=" ")
            print_candidate(donor_vector)

            # CROSSOVER
            keep_param = random.choice(list(target_vector.keys()))      # R: random param to always keep
            trial_vector = {}
            for param in target_vector.keys():
                r = random.random()
                if r < CR or param == keep_param:
                    trial_vector[param] = donor_vector[param]
                else:
                    trial_vector[param] = target_vector[param]

            print("trial_vector:", end=" ")
            print_candidate(trial_vector)

            # EVALUATE
            trial_score, trial_acc = evaluate_candidate(trial_vector, N)
            if trial_score < population[i]["score"]:
                print(f"{trial_score:0.5f} < {population[i]['score']:0.5f}, picking trial_vector")
                population[i]["candidate"] = trial_vector
                population[i]["score"] = trial_score
                population[i]["Acc"] = trial_acc
            else:
                print(f"{trial_score:0.5f} >= {population[i]['score']:0.5f}, keeping target_vector")

                # FIND BEST TILL NOW
        best_index = np.argmin([c["score"] for c in population])
        new_best_score = population[best_index]["score"]
        new_best_acc = population[best_index]["Acc"]
        if new_best_score < best_score:
            print(f"Best score improved from {best_score:0.4f} to {new_best_score:0.4f}")
            best_score = new_best_score
            best_Acc = new_best_acc
            best_candidate = population[best_index]["candidate"]
            print("Best candidate: ", end="")
            print_candidate(best_candidate)

        # WRITE TO CSV LOG
        logfilepath = os.path.join(save_dir,f"logs_p{POP_SIZE}_bs{BATCH_SIZE}.csv")
        if not os.path.isfile(logfilepath):
            with open(logfilepath, "a") as logfile:
                logfile.write("gen," + ",".join(map(str, range(len(population)))) + "\n")

        with open(logfilepath, "a") as logfile:
            logfile.write(f"{G+1}," + ",".join(map(str, [c["score"] for c in population])) + "\n")

        print("\nPopulation at end of generation", G+1)
        print_population(population)
        torch.cuda.empty_cache()

In [None]:
# torch.backends.cuda.matmul.allow_tf32 = True
torch.cuda.empty_cache()
De()