In [None]:
import os
import torch
import torch.nn as nn
from torchvision import transforms, models
from torchvision.models import ResNet101_Weights
from torch.utils.data import Dataset, DataLoader
from transformers import BertModel, BertTokenizer
from sklearn.metrics import accuracy_score, confusion_matrix, f1_score, roc_auc_score
import optuna
import pickle
from PIL import Image
import re
import gc

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
# ─── Check Device And Save For Later ────────────────────────────────
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [3]:
# ─── Dataset Class ──────────────────────────────────────────────────
class GarbageDataset(Dataset):
    def __init__(self, root_dir, transform=None, tokenizer=None):
        self.root_dir = root_dir
        self.transform = transform
        self.tokenizer = tokenizer
        self.classes = ["Black", "Blue", "Green", "TTR"]
        self.data = []
        for label in self.classes:
            class_dir = os.path.join(root_dir, label)
            for file_name in os.listdir(class_dir):
                if file_name.endswith(".jpg") or file_name.endswith(".png"):
                    text_description = re.sub(r"\d+", "", file_name.split(".")[0])
                    self.data.append(
                        (
                            os.path.join(class_dir, file_name),
                            text_description,
                            self.classes.index(label),
                        )
                    )

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path, text, label = self.data[idx]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        if self.tokenizer:
            text = self.tokenizer(
                text,
                return_tensors="pt",
                padding="max_length",
                truncation=True,
                max_length=128,
            )
        return image, text, label

In [4]:
# ─── Transformer Class ──────────────────────────────────────────────
transform = transforms.Compose(
    [
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
    ]
)

# ─── Tokenizer Class ────────────────────────────────────────────────
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

In [None]:
# ─── Initiate Datasets ──────────────────────────────────────────────
    # ─── Training ───────────────────────────────────────────────
train_dataset = GarbageDataset(
    root_dir="garbage_data/CVPR_2024_dataset_Train",
    transform=transform,
    tokenizer=tokenizer,
)
    # ─── Validation ─────────────────────────────────────────────
val_dataset = GarbageDataset(
    root_dir="garbage_data/CVPR_2024_dataset_Val",
    transform=transform,
    tokenizer=tokenizer,
)
    # ─── Testing ────────────────────────────────────────────────
test_dataset = GarbageDataset(
    root_dir="garbage_data/CVPR_2024_dataset_Test",
    transform=transform,
    tokenizer=tokenizer,
)

# ─── Initiate Dataloaders ───────────────────────────────────────────
train_loader = DataLoader(train_dataset, batch_size=40, shuffle=True, num_workers=4, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=40, shuffle=False, num_workers=4, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=40, shuffle=False, num_workers=4, pin_memory=True)

In [None]:
# ─── Evaluation Function ────────────────────────────────────────────
def evaluate_model(model, val_loader, criterion, title="Validation"):
    model.eval()
    running_loss = 0.0
    all_labels = []
    all_preds = []
    all_probs = []
    with torch.no_grad():
        for images, texts, labels in val_loader:
            images = images.to(device, non_blocking=True)
            labels = labels.to(device, non_blocking=True)
            texts = {
                key: val.squeeze(1).to(device, non_blocking=True)
                for key, val in texts.items()
            }

            outputs = model(images, texts)
            loss = criterion(outputs, labels)
            running_loss += loss.item() * images.size(0)

            probs = nn.functional.softmax(outputs, dim=1)
            preds = torch.argmax(probs, dim=1)
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())
            all_probs.extend(probs.cpu().numpy())

    val_loss = running_loss / len(val_loader.dataset)
    accuracy = accuracy_score(all_labels, all_preds)
    conf_matrix = confusion_matrix(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average="weighted")
    auc = roc_auc_score(all_labels, all_probs, average="weighted", multi_class="ovr")

    print('<========================================================================================>')
    print(
        f"{title} | Loss: {val_loss:.4f}, Accuracy: {accuracy:.4f}, F1 Score: {f1:.4f}, AUC: {auc:.4f}"
    )
    print()
    print(f"Confusion Matrix:\n{conf_matrix}")
    print()

    return val_loss

In [None]:
# ─── Define The Model Class Outside The Objective Function ────
class MultimodalModel(nn.Module):
    def __init__(
        self, hidden_layer_sizes, dropout_rate, num_classes=4
    ):
        super(MultimodalModel, self).__init__()
        self.image_model = models.resnet101(weights=ResNet101_Weights.IMAGENET1K_V2)
        num_ftrs = self.image_model.fc.in_features
        self.image_model.fc = nn.Identity()
        self.text_model = BertModel.from_pretrained("bert-base-uncased")
        combined_input_size = num_ftrs + self.text_model.config.hidden_size

        layers = []
        input_size = combined_input_size
        for hidden_size in hidden_layer_sizes:
            layers.append(nn.Linear(input_size, hidden_size))
            layers.append(nn.Sigmoid())
            layers.append(nn.Dropout(dropout_rate))
            input_size = hidden_size
        layers.append(nn.Linear(input_size, num_classes))
        layers.append(nn.Sigmoid())
        self.fc = nn.Sequential(*layers)

    def forward(self, image, text):
        image_features = self.image_model(image)
        text_features = self.text_model(**text).last_hidden_state[:, 0, :]
        combined_features = torch.cat((image_features, text_features), dim=1)
        output = self.fc(combined_features)
        return output

In [None]:
# ─── Define Optimization Objective Function ─────────────────────────
def objective(trial):

    # ─── Print Trial Information ────────────────────────────────
    print()
    print()
    print(
        f"<--------------------------        || Trial Number: {trial.number} ||       -------------------------->"
    )
    print()

    # ─── Suggest Hyperparameters ────────────────────────────────
    num_hidden_layers = trial.suggest_int("num_hidden_layers", 2, 4)
    hidden_layer_sizes = [
        trial.suggest_int(f"hidden_layer_size_{i}", 128, 512, step=64)
        for i in range(num_hidden_layers)
    ]
    learning_rate = trial.suggest_float("learning_rate", 1e-4, 1e-2, log=True)
    dropout_rate = trial.suggest_float("dropout_rate", 0, 0.2, step=0.1)

    # ─── Initiale Model Instance ────────────────────────────────
    model = MultimodalModel(hidden_layer_sizes, dropout_rate).to(
        device
    )
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    # ─── Train The Model ────────────────────────────────────────
    num_epochs = 10
    best_val_loss = float("inf")

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for images, texts, labels in train_loader:
            images = images.to(device, non_blocking=True)
            labels = labels.to(device, non_blocking=True)
            texts = {
                key: val.squeeze(1).to(device, non_blocking=True)
                for key, val in texts.items()
            }

            optimizer.zero_grad()
            outputs = model(images, texts)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * images.size(0)

        epoch_loss = running_loss / len(train_loader.dataset)
        val_loss = evaluate_model(model, val_loader, criterion)

        print(
            f"<|||  Epoch {epoch+1}/{num_epochs}, Train Loss: {epoch_loss:.4f}, Val Loss: {val_loss:.4f}  |||>"
        )

        # ─── Report Intermediate Results To Optuna ──────────
        trial.report(val_loss, epoch)

        # ─── Prune Unpromising Trials ───────────────────────
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()

    # ─── Add Model To Trial Attributes ──────────────────────────
    trial.set_user_attr("model", model)

    # ─── Clear Cuda Memory After Training ───────────────────────
    del model, criterion, optimizer
    torch.cuda.empty_cache()
    gc.collect()

    return best_val_loss

In [None]:
# ─── Create A Study ─────────────────────────────────────────────────
study = optuna.create_study(direction="minimize")
# ─── Optimize The Study Objective ───────────────────────────────────
study.optimize(objective, n_trials=20)

In [None]:
# ─── Save The Study To A Pickle File ────────────────────────────────
with open("optuna_study.pkl", "wb") as f:
    pickle.dump(study, f)

# ─── Save The Best Model ────────────────────────────────────────────
best_model = study.best_trial.user_attrs["model"]
torch.save(best_model, "best_model.pth")

# ─── Print The Best Hyperparameters ─────────────────────────────────
print("Best hyperparameters: ", study.best_params)

In [None]:
torch.cuda.empty_cache()
gc.collect()

In [None]:
# ─── Load The Best Model ────────────────────────────────────────────
model = torch.load("best_model.pth")

# ─── Evaluate On Training Set ───────────────────────────────────────
evaluate_model(model, train_loader, nn.CrossEntropyLoss(), "Training")

# ─── Evaluate On Validation Set ─────────────────────────────────────
evaluate_model(model, val_loader, nn.CrossEntropyLoss(), "Validation")

# ─── Evaluate On Test Set ───────────────────────────────────────────
evaluate_model(model, test_loader, nn.CrossEntropyLoss(), "Test")