<a href="https://colab.research.google.com/github/JerryEnes/Multimodal-Biometrics/blob/main/Untitled62.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c2c7034f",
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import torch\n",
    "import torch.nn as nn\n",
    "from torch.utils.data import Dataset, DataLoader, random_split\n",
    "from torchvision import transforms\n",
    "from PIL import Image\n",
    "from transformers import ViTFeatureExtractor, ViTModel\n",
    "import timm\n",
    "import numpy as np\n",
    "from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix\n",
    "import matplotlib.pyplot as plt\n",
    "import seaborn as sns\n",
    "from tqdm import tqdm\n",
    "\n",
    "# --- CONSTANTS ---\n",
    "IMG_SIZE = 224\n",
    "BATCH_SIZE = 32\n",
    "EPOCHS = 50\n",
    "PATIENCE = 5\n",
    "SEED = 42\n",
    "DATASET_PATH = \"/content/drive/MyDrive/Multimodal Biometric 3\"\n",
    "DEVICE = torch.device(\"cuda\" if torch.cuda.is_available() else \"cpu\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "17c2ea3f",
   "metadata": {},
   "outputs": [],
   "source": [
    "# --- TRANSFORMS ---\n",
    "transform = transforms.Compose([\n",
    "    transforms.Resize((IMG_SIZE, IMG_SIZE)),\n",
    "    transforms.ToTensor(),\n",
    "    transforms.Normalize([0.5]*3, [0.5]*3)\n",
    "])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "40eb6e06",
   "metadata": {},
   "outputs": [],
   "source": [
    "# --- DATASET ---\n",
    "class MultiModalDataset(Dataset):\n",
    "    def __init__(self, root_dir, transform=None):\n",
    "        self.samples = []\n",
    "        self.transform = transform\n",
    "        self.class_to_idx = {}\n",
    "        for idx, class_name in enumerate(sorted(os.listdir(root_dir))):\n",
    "            self.class_to_idx[class_name] = idx\n",
    "            class_path = os.path.join(root_dir, class_name)\n",
    "            iris_dir = os.path.join(class_path, 'iris-eye')\n",
    "            vein_dir = os.path.join(class_path, 'finger-vein')\n",
    "            iris_imgs = sorted(os.listdir(iris_dir))\n",
    "            vein_imgs = sorted(os.listdir(vein_dir))\n",
    "            for i in range(min(len(iris_imgs), len(vein_imgs))):\n",
    "                self.samples.append({\n",
    "                    'iris': os.path.join(iris_dir, iris_imgs[i]),\n",
    "                    'vein': os.path.join(vein_dir, vein_imgs[i]),\n",
    "                    'label': idx\n",
    "                })\n",
    "\n",
    "    def __len__(self):\n",
    "        return len(self.samples)\n",
    "\n",
    "    def __getitem__(self, idx):\n",
    "        sample = self.samples[idx]\n",
    "        iris = Image.open(sample['iris']).convert('RGB')\n",
    "        vein = Image.open(sample['vein']).convert('RGB')\n",
    "        label = sample['label']\n",
    "\n",
    "        if self.transform:\n",
    "            iris = self.transform(iris)\n",
    "            vein = self.transform(vein)\n",
    "\n",
    "        return iris, vein, label"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f7337e64",
   "metadata": {},
   "outputs": [],
   "source": [
    "# --- MODEL ---\n",
    "class MultiModalFusionModel(nn.Module):\n",
    "    def __init__(self, num_classes):\n",
    "        super().__init__()\n",
    "\n",
    "        self.iris_model = timm.create_model('vit_tiny_patch16_224', pretrained=True, num_classes=num_classes)\n",
    "        self.vein_model = timm.create_model('vit_tiny_patch16_224', pretrained=True, num_classes=num_classes)\n",
    "\n",
    "        self.feature_extractor_iris = timm.create_model('vit_tiny_patch16_224', pretrained=True, num_classes=0)\n",
    "        self.feature_extractor_vein = timm.create_model('vit_tiny_patch16_224', pretrained=True, num_classes=0)\n",
    "\n",
    "        self.feature_fusion = nn.Sequential(\n",
    "            nn.Linear(192 * 2, 1024),\n",
    "            nn.GELU(), nn.BatchNorm1d(1024), nn.Dropout(0.3),\n",
    "            nn.Linear(1024, 1024), nn.ReLU(), nn.BatchNorm1d(1024), nn.Dropout(0.3),\n",
    "            nn.Linear(1024, 512), nn.GELU(), nn.BatchNorm1d(512), nn.Dropout(0.3),\n",
    "            nn.Linear(512, num_classes)\n",
    "        )\n",
    "\n",
    "        self.score_fusion_layer = nn.Sequential(\n",
    "            nn.Linear(num_classes * 2, 512),\n",
    "            nn.GELU(), nn.BatchNorm1d(512), nn.Dropout(0.4),\n",
    "            nn.Linear(512, 256), nn.GELU(), nn.BatchNorm1d(256), nn.Dropout(0.3),\n",
    "            nn.Linear(256, 128), nn.ReLU(), nn.BatchNorm1d(128), nn.Dropout(0.2),\n",
    "            nn.Linear(128, num_classes)\n",
    "        )\n",
    "\n",
    "    def forward(self, iris, vein, return_all=False):\n",
    "        score_iris = self.iris_model(iris)\n",
    "        score_vein = self.vein_model(vein)\n",
    "        score_concat = torch.cat((score_iris, score_vein), dim=1)\n",
    "        score_fusion = self.score_fusion_layer(score_concat)\n",
    "\n",
    "        feat_iris = self.feature_extractor_iris(iris)\n",
    "        feat_vein = self.feature_extractor_vein(vein)\n",
    "        feat_fusion = torch.cat((feat_iris, feat_vein), dim=1)\n",
    "        feat_output = self.feature_fusion(feat_fusion)\n",
    "\n",
    "        final_output = 0.6 * score_fusion + 0.4 * feat_output\n",
    "\n",
    "        if return_all:\n",
    "            return final_output, score_fusion, feat_output\n",
    "        return final_output"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6d32a2ee",
   "metadata": {},
   "outputs": [],
   "source": [
    "# --- TRAINING ---\n",
    "def train_model(model, train_loader, val_loader, device):\n",
    "    optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)\n",
    "    criterion = nn.CrossEntropyLoss()\n",
    "    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'max', patience=2)\n",
    "\n",
    "    best_val_acc = 0\n",
    "    patience_counter = 0\n",
    "    train_accuracies, val_accuracies = [], []\n",
    "\n",
    "    model.to(device)\n",
    "\n",
    "    for epoch in range(EPOCHS):\n",
    "        model.train()\n",
    "        train_acc = 0\n",
    "        for iris, vein, labels in tqdm(train_loader, desc=f\"Epoch {epoch+1}\"):\n",
    "            iris, vein, labels = iris.to(device), vein.to(device), labels.to(device)\n",
    "            optimizer.zero_grad()\n",
    "            outputs = model(iris, vein)\n",
    "            loss = criterion(outputs, labels)\n",
    "            loss.backward()\n",
    "            optimizer.step()\n",
    "            train_acc += accuracy_score(labels.cpu(), outputs.argmax(dim=1).cpu())\n",
    "\n",
    "        model.eval()\n",
    "        val_acc = 0\n",
    "        with torch.no_grad():\n",
    "            for iris, vein, labels in val_loader:\n",
    "                iris, vein, labels = iris.to(device), vein.to(device), labels.to(device)\n",
    "                outputs = model(iris, vein)\n",
    "                val_acc += accuracy_score(labels.cpu(), outputs.argmax(dim=1).cpu())\n",
    "\n",
    "        avg_train_acc = train_acc / len(train_loader)\n",
    "        avg_val_acc = val_acc / len(val_loader)\n",
    "\n",
    "        train_accuracies.append(avg_train_acc)\n",
    "        val_accuracies.append(avg_val_acc)\n",
    "\n",
    "        scheduler.step(avg_val_acc)\n",
    "\n",
    "        print(f\"Epoch {epoch+1} | Train Acc: {avg_train_acc:.4f} | Val Acc: {avg_val_acc:.4f}\")\n",
    "\n",
    "        if avg_val_acc > best_val_acc:\n",
    "            best_val_acc = avg_val_acc\n",
    "            patience_counter = 0\n",
    "            torch.save(model.state_dict(), 'best_model.pt')\n",
    "            print(\"✅ Best model saved\")\n",
    "        else:\n",
    "            patience_counter += 1\n",
    "            if patience_counter >= PATIENCE:\n",
    "                print(\"🛑 Early stopping triggered\")\n",
    "                break\n",
    "\n",
    "    plt.plot(train_accuracies, label=\"Train Accuracy\")\n",
    "    plt.plot(val_accuracies, label=\"Val Accuracy\")\n",
    "    plt.title(\"Accuracy over Epochs\")\n",
    "    plt.xlabel(\"Epoch\")\n",
    "    plt.ylabel(\"Accuracy\")\n",
    "    plt.legend()\n",
    "    plt.grid(True)\n",
    "    plt.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "98e3f090",
   "metadata": {},
   "outputs": [],
   "source": [
    "# --- CONFUSION MATRIX PLOT ---\n",
    "def plot_conf_matrix(y_true, y_pred, title=\"Confusion Matrix\"):\n",
    "    cm = confusion_matrix(y_true, y_pred)\n",
    "    plt.figure(figsize=(8, 6))\n",
    "    sns.heatmap(cm, annot=True, fmt=\"d\", cmap=\"Blues\")\n",
    "    plt.title(title)\n",
    "    plt.xlabel(\"Predicted\")\n",
    "    plt.ylabel(\"True\")\n",
    "    plt.show()\n",
    "\n",
    "# --- METRICS ---\n",
    "def print_metrics(y_true, y_pred, label=\"\"):\n",
    "    print(f\"\\n📊 {label} Metrics\")\n",
    "    print(f\"Accuracy:  {accuracy_score(y_true, y_pred):.4f}\")\n",
    "    print(f\"Precision: {precision_score(y_true, y_pred, average='macro'):.4f}\")\n",
    "    print(f\"Recall:    {recall_score(y_true, y_pred, average='macro'):.4f}\")\n",
    "    print(f\"F1 Score:  {f1_score(y_true, y_pred, average='macro'):.4f}\")\n",
    "    plot_conf_matrix(y_true, y_pred, f\"{label} Confusion Matrix\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1a0144e7",
   "metadata": {},
   "outputs": [],
   "source": [
    "# --- LOAD DATA ---\n",
    "dataset = MultiModalDataset(DATASET_PATH, transform)\n",
    "train_size = int(0.8 * len(dataset))\n",
    "val_size = len(dataset) - train_size\n",
    "train_set, val_set = random_split(dataset, [train_size, val_size])\n",
    "train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True)\n",
    "val_loader = DataLoader(val_set, batch_size=BATCH_SIZE)\n",
    "\n",
    "# --- TRAIN ---\n",
    "model = MultiModalFusionModel(num_classes=len(dataset.class_to_idx))\n",
    "train_model(model, train_loader, val_loader, DEVICE)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "307ace53",
   "metadata": {},
   "outputs": [],
   "source": [
    "# --- EVALUATION ---\n",
    "model.load_state_dict(torch.load('best_model.pt'))\n",
    "model.eval()\n",
    "\n",
    "score_preds, feature_preds, final_preds, targets = [], [], [], []\n",
    "with torch.no_grad():\n",
    "    for iris, vein, labels in val_loader:\n",
    "        iris, vein, labels = iris.to(DEVICE), vein.to(DEVICE), labels.to(DEVICE)\n",
    "        final_out, score_out, feature_out = model(iris, vein, return_all=True)\n",
    "        score_preds.extend(score_out.argmax(dim=1).cpu().numpy())\n",
    "        feature_preds.extend(feature_out.argmax(dim=1).cpu().numpy())\n",
    "        final_preds.extend(final_out.argmax(dim=1).cpu().numpy())\n",
    "        targets.extend(labels.cpu().numpy())\n",
    "\n",
    "print_metrics(targets, score_preds, \"Score-Level Fusion\")\n",
    "print_metrics(targets, feature_preds, \"Feature-Level Fusion\")\n",
    "print_metrics(targets, final_preds, \"Final Averaged Output\")"
   ]
  }
 ],
 "metadata": {},
 "nbformat": 4,
 "nbformat_minor": 5
}
