<a href="https://colab.research.google.com/github/diviiij/CogniTrack/blob/main/CogniTrack_Alzheimer_Detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.
import kagglehub
uraninjo_augmented_alzheimer_mri_dataset_path = kagglehub.dataset_download('uraninjo/augmented-alzheimer-mri-dataset')

print('Data source import complete.')


In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All"
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
 # Standard libraries
import os
import random
import numpy as np
import matplotlib.pyplot as plt

# PyTorch and Torchvision
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models, transforms
from torch.utils.data import DataLoader, Dataset, Subset
from torchvision.datasets import ImageFolder

# For splitting dataset and evaluation
from sklearn.model_selection import StratifiedKFold


In [None]:
dataset_paths = {
    "Mild" : "/kaggle/input/augmented-alzheimer-mri-dataset/AugmentedAlzheimerDataset/MildDemented ",
    "Moderate" :"/kaggle/input/augmented-alzheimer-mri-dataset/AugmentedAlzheimerDataset/ModerateDemented",
    "Non" : "/kaggle/input/augmented-alzheimer-mri-dataset/AugmentedAlzheimerDataset/NonDemented",
    "VeryMild"  :  "/kaggle/input/augmented-alzheimer-mri-dataset/AugmentedAlzheimerDataset/VeryMildDemented"
}

label_map = {
    "Mild": 0,
    "Moderate": 1,
    "Non": 2,
    "VeryMild": 3
}

In [None]:
 transform = transforms.Compose([
     transforms.Resize((224,224)),
     transforms.ToTensor(),
     transforms.Normalize(mean=[0.485,0.456,0.406],
                             std=[0.229,0.224,0.225])
 ])

In [None]:
from PIL import Image
import glob

class AlzMergeDataset(Dataset):
        def __init__(self,dataset_paths,label_map,transform= None):
            self.data =[]
            self.labels =[]
            self.transform = transform

            for stage,path in dataset_paths.items():
                    label = label_map[stage]
                     # You can adjust the glob pattern depending on your folder structure
                    image_paths = glob.glob(os.path.join(path, '**', '*.jpg'), recursive=True)
                    for img_path in image_paths:
                        self.data.append(img_path)
                        self.labels.append(label)

        def __len__(self):
            return len(self.data)

        def __getitem__(self, idx):
            img_path = self.data[idx]
            label = self.labels[idx]
            image = Image.open(img_path).convert("RGB")
            if self.transform:
                image = self.transform(image)
            return image, label

In [None]:
combined_dataset = AlzMergeDataset(dataset_paths, label_map, transform=transform)

print("Total images:", len(combined_dataset))
print("Example:", combined_dataset[0])


In [None]:
from sklearn.model_selection import train_test_split
from torch.utils.data import Subset

# Get all indices and labels
all_indices = list(range(len(combined_dataset)))
all_labels = combined_dataset.labels  # We assigned this during dataset creation

# Stratified split
train_idx, val_idx = train_test_split(
    all_indices,
    test_size=0.2,
    stratify=all_labels,
    random_state=42
)

# Create subsets
train_dataset = Subset(combined_dataset, train_idx)
val_dataset = Subset(combined_dataset, val_idx)


In [None]:
from collections import Counter

train_labels = [combined_dataset.labels[i] for i in train_idx]
val_labels = [combined_dataset.labels[i] for i in val_idx]

print("Train distribution:", Counter(train_labels))
print("Val distribution:", Counter(val_labels))


In [None]:
from torch.utils.data import DataLoader

BATCH_SIZE = 32

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)


In [None]:
!git clone https://huggingface.co/timm/densenet121.ra_in1k.git

In [None]:
 import torch
from torchvision import models

# Load base DenseNet121 model
model = models.densenet121()

# Load weights
state_dict = torch.load("densenet121.ra_in1k/pytorch_model.bin", map_location="cpu")

# Remove any prefix like 'model.' if needed
# Example: state_dict = {k.replace("model.", ""): v for k, v in state_dict.items()}

# Load the weights (ignore unexpected keys if any)
model.load_state_dict(state_dict, strict=False)

# Modify classifier for 4 classes (Alzheimer's stages)
model.classifier = torch.nn.Linear(model.classifier.in_features, 4)

# Move to device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

print("Model loaded and ready! 🧠💥")


In [None]:
import os

# List all files and folders inside the cloned repo
for root, dirs, files in os.walk("densenet121.ra_in1k"):
    for name in files:
        print(os.path.join(root, name))


In [None]:
import torch.nn as nn

# Modify final classifier to fit 4 classes
model.classifier = nn.Linear(model.classifier.in_features, 4)


In [None]:
import torchvision.transforms as transforms

transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor()
])


In [None]:
import torch
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.nn import CrossEntropyLoss

# Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Loss and Optimizer
criterion = CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Dataloaders (replace with your own)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True,num_workers=2, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=2, pin_memory=True)


In [None]:
!ls /kaggle/input



In [None]:
import os

# Show the top-level structure
print(os.listdir('/kaggle/input/augmented-alzheimer-mri-dataset'))

# Try listing a subdirectory
for subdir in os.listdir('/kaggle/input/augmented-alzheimer-mri-dataset'):
    path = os.path.join('/kaggle/input/augmented-alzheimer-mri-dataset', subdir)
    if os.path.isdir(path):
        print(f"\nContents of '{subdir}':")
        print(os.listdir(path))


In [None]:
import os
os.listdir("/content")

In [None]:
import os

for root, dirs, files in os.walk("/kaggle/input"):
    print(root)
    for d in dirs:
        print("   📁", d)
    for f in files:
        print("   📄", f)


In [None]:
from sklearn.model_selection import train_test_split
import os
import shutil

base_dir = "/kaggle/input/augmented-alzheimer-mri-dataset/AugmentedAlzheimerDataset"
work_dir = "/kaggle/working/alzheimers_split"

# Create train and val folders
for split in ["train", "val"]:
    for class_name in os.listdir(base_dir):
        os.makedirs(os.path.join(work_dir, split, class_name), exist_ok=True)


In [None]:
for class_name in os.listdir(base_dir):
    class_dir = os.path.join(base_dir, class_name)
    images = os.listdir(class_dir)
    train_imgs, val_imgs = train_test_split(images, test_size=0.2, random_state=42)

    for img in train_imgs:
        shutil.copy(os.path.join(class_dir, img), os.path.join(work_dir, "train", class_name, img))
    for img in val_imgs:

        shutil.copy(os.path.join(class_dir, img), os.path.join(work_dir, "val", class_name, img))


In [None]:
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

train_dataset = datasets.ImageFolder(os.path.join(work_dir, "train"), transform=transform)
val_dataset = datasets.ImageFolder(os.path.join(work_dir, "val"), transform=transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)


In [None]:
from torchvision.models import densenet121, DenseNet121_Weights

# Use the most up-to-date pre-trained weights
weights = DenseNet121_Weights.DEFAULT
model = densenet121(weights=weights)


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)


In [None]:
import torch.optim as optim

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)


In [None]:
from torch.optim.lr_scheduler import ReduceLROnPlateau

scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.2, patience=2, )


In [None]:
class EarlyStopping:
    def __init__(self, patience=5):
        self.patience = patience
        self.counter = 0
        self.best_loss = None
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss is None or val_loss < self.best_loss:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True


In [None]:
def save_checkpoint(model, path='best_model.pth'):
    torch.save(model.state_dict(), path)


In [None]:
import time
import torch
from torch.cuda.amp import GradScaler, autocast

def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs, device):
    scaler = GradScaler(enabled=torch.cuda.is_available())

    best_val_acc = 0.0
    early_stopping_counter = 0
    patience = 5
    best_loss = None

    print(f"Using: {device}")

    # Start the training timer
    start_time = time.time()

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            with autocast(device_type='cuda', enabled=torch.cuda.is_available()):
                outputs = model(images)
                loss = criterion(outputs, labels)

            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            running_loss += loss.item() * images.size(0)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_loss = running_loss / total
        train_acc = 100 * correct / total

        # Validation phase
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)

                with autocast(device_type='cuda', enabled=torch.cuda.is_available()):
                    outputs = model(images)
                    loss = criterion(outputs, labels)

                val_loss += loss.item() * images.size(0)
                _, predicted = torch.max(outputs, 1)
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()

        val_loss /= val_total
        val_acc = 100 * val_correct / val_total

        print(f"Epoch {epoch+1}: Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Train Acc: {train_acc:.2f}%, Val Acc: {val_acc:.2f}%")

        scheduler.step(val_loss)

        # Early stopping and checkpointing
        if best_loss is None or val_loss <= best_loss:
            best_loss = val_loss
            best_val_acc = val_acc
            torch.save(model.state_dict(), 'best_model.pth')
            print("Model checkpoint saved.")
            early_stopping_counter = 0
        else:
            early_stopping_counter += 1
            if early_stopping_counter >= patience:
                print(f"Early stopping triggered at epoch {epoch+1}")
                break

    # End the training timer
    end_time = time.time()
    elapsed_time = end_time - start_time
    minutes = int(elapsed_time // 60)
    seconds = int(elapsed_time % 60)

    print(f"\n✅ Training completed in {minutes} minutes and {seconds} seconds.")

    return model


In [None]:
import torch
import torch.nn as nn
from torch.optim import Adam
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models
from sklearn.metrics import classification_report, confusion_matrix
import os
import numpy as np
from torch.amp import autocast, GradScaler
import time

# ==== DEVICE SETUP ====
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using:", device)

# ==== TRANSFORMS ====
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5])
])

# ==== DATASETS & LOADERS ====
data_dir = "/kaggle/input/augmented-alzheimer-mri-dataset/AugmentedAlzheimerDataset"
from sklearn.model_selection import train_test_split
from torchvision.datasets import ImageFolder
from torch.utils.data import Subset

dataset = ImageFolder(data_dir, transform=transform)
class_names = dataset.classes

# Split manually
indices = list(range(len(dataset)))
train_idx, val_idx = train_test_split(indices, test_size=0.2, stratify=[dataset.targets[i] for i in indices], random_state=42)
train_dataset = Subset(dataset, train_idx)
val_dataset = Subset(dataset, val_idx)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=2, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=2, pin_memory=True)

# ==== MODEL ====
from torchvision.models import densenet121, DenseNet121_Weights
model = densenet121(weights=DenseNet121_Weights.DEFAULT)
model.classifier = nn.Linear(model.classifier.in_features, len(class_names))
model = model.to(device)

# ==== TRAINING SETUP ====
criterion = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=0.0001)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2)
scaler = GradScaler(enabled=torch.cuda.is_available())

# ==== EARLY STOPPING CLASS ====
class EarlyStopping:
    def __init__(self, patience=3):
        self.patience = patience
        self.counter = 0
        self.best_loss = None
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss is None or val_loss < self.best_loss:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True

early_stopping = EarlyStopping(patience=3)

# ==== MODEL CHECKPOINT ====
def save_checkpoint(model, path="best_model.pth"):
    torch.save(model.state_dict(), path)

# ==== TRAINING LOOP ====
start_time = time.time()

for epoch in range(20):
    model.train()
    running_loss = 0
    correct = 0
    total = 0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()

        with autocast(device_type='cuda', enabled=torch.cuda.is_available()):
            outputs = model(images)
            loss = criterion(outputs, labels)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item()
        _, preds = torch.max(outputs, 1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)

    train_loss = running_loss / len(train_loader)
    train_acc = correct / total * 100

    # ==== VALIDATION ====
    model.eval()
    val_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            with autocast(device_type='cuda', enabled=torch.cuda.is_available()):
                outputs = model(images)
                loss = criterion(outputs, labels)

            val_loss += loss.item()
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

    val_loss /= len(val_loader)
    val_acc = correct / total * 100

    print(f"Epoch {epoch+1}: Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, "
          f"Train Acc: {train_acc:.2f}%, Val Acc: {val_acc:.2f}%")

    scheduler.step(val_loss)

    # Save model if improved
    if early_stopping.best_loss is None or val_loss < early_stopping.best_loss:
        save_checkpoint(model)
        print("Model checkpoint saved.")

    early_stopping(val_loss)
    if early_stopping.early_stop:
        print("Early stopping triggered.")
        break
        pass

end_time = time.time()
elapsed_time = end_time - start_time

hours = int(elapsed_time // 3600)
minutes = int((elapsed_time % 3600) // 60)
seconds = int(elapsed_time % 60)

print(f"\nTraining completed in {hours}h {minutes}m {seconds}s")


# ==== FINAL EVALUATION ====
model.load_state_dict(torch.load("best_model.pth"))
model.eval()

all_preds = []
all_labels = []

with torch.no_grad():
    for images, labels in val_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, preds = torch.max(outputs, 1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

print("\nClassification Report:\n", classification_report(all_labels, all_preds, target_names=class_names))
print("Confusion Matrix:\n", confusion_matrix(all_labels, all_preds))


In [None]:
import copy
import random
import time
import torch.nn.functional as F
from torch.utils.data import Subset, DataLoader


In [None]:
def get_few_shot_task(dataset, n_classes=2, k_shot=5):
    """
    Samples a task with n_classes, each with k_shot examples.
    """
    class_indices = {}
    for idx, (_, label) in enumerate(dataset):
        class_indices.setdefault(label, []).append(idx)

    selected_classes = random.sample(class_indices.keys(), n_classes)
    selected_indices = []
    for cls in selected_classes:
        selected_indices += random.sample(class_indices[cls], k_shot)

    return Subset(dataset, selected_indices)


In [None]:
def reptile_train(model, train_dataset, meta_iterations=500, inner_steps=5, meta_lr=0.05, n_classes=2, k_shot=10):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    scaler = torch.amp.GradScaler(enabled=torch.cuda.is_available())

    def inner_loop(cloned_model, task_loader):
        optimizer = torch.optim.SGD(cloned_model.parameters(), lr=1e-2)
        cloned_model.train()
        for _ in range(inner_steps):
            for images, labels in task_loader:
                images, labels = images.to(device), labels.to(device)
                optimizer.zero_grad()
                with torch.amp.autocast(device_type='cuda', enabled=torch.cuda.is_available()):
                    outputs = cloned_model(images)
                    loss = F.cross_entropy(outputs, labels)
                scaler.scale(loss).backward()
                scaler.step(optimizer)
                scaler.update()

    best_model_state = None
    best_meta_loss = float("inf")

    print("Starting Reptile training...")
    start_time = time.time()

    for iteration in range(meta_iterations):
        task_data = get_few_shot_task(train_dataset, n_classes=n_classes, k_shot=k_shot)
        task_loader = DataLoader(task_data, batch_size=4, shuffle=True)

        # Clone model
        cloned_model = copy.deepcopy(model).to(device)

        # Inner-loop task training
        inner_loop(cloned_model, task_loader)

        # Outer-loop Reptile update
        for param, cloned_param in zip(model.parameters(), cloned_model.parameters()):
            param.data = param.data + meta_lr * (cloned_param.data - param.data)

        if (iteration + 1) % 50 == 0:
            print(f"[Meta-Iter {iteration + 1}] Reptile step complete.")

    elapsed = time.time() - start_time
    print(f"\n✅ Reptile training completed in {int(elapsed // 60)}m {int(elapsed % 60)}s.")


In [None]:
reptile_train(model, train_dataset, meta_iterations=500, inner_steps=5, meta_lr=0.05, n_classes=2, k_shot=10)
