<a href="https://colab.research.google.com/github/Epatsili2000/ParkinsonDetection/blob/main/LSTM_MFCC.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### LSTM Model for Parkinson's Disease Detection

Train an LSTM (Long Short-Term Memory) neural network using sequential MFCC features extracted from PC-GITA speech recordings to classify between **Parkinson’s Disease (PD)** and **Healthy Controls (HC)**.

---

####  Dataset
- Input file: `pcgita_features_final_scaled.csv`
- Features: MFCC1–MFCC13, reshaped into sequences for LSTM input
- Labels: `"PD"` = 1, `"HC"` = 0
- Grouping by `Speaker_ID` to prevent speaker leakage during cross-validation

---

####  Preprocessing
- Load and filter rows with missing or invalid labels
- Standardize MFCC features
- Reshape feature matrix: `(samples, time_steps, 13)`  
  Example: 11760 samples reshaped to (11760, N, 13)
- Apply 5-fold **GroupKFold** split using speaker ID

---

####  Model Architecture
```python
class LSTMClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers):
        ...
```
- Input: 13 MFCC coefficients per frame
- Hidden size: 64
- Output: Binary classification (PD vs HC) via sigmoid activation

---

####  Training Configuration
- Loss Function: `BCELoss`
- Optimizer: `Adam (lr=0.001)`
- Batch size: 32
- Epochs: max 30 with early stopping (patience=5)
- Metric: **Validation Accuracy**

---

####  Evaluation (per fold)
- Save best model checkpoint
- Print:
  - Classification Report
  - Confusion Matrix
  - ROC AUC Score
- Plot:
  - 📈 Training Loss vs Epoch
  - 📈 Validation Accuracy vs Epoch
  - 🔷 Confusion Matrix

---

####  Output Files
Saved under: `/content/drive/MyDrive/PCGITA_MODELS/models_lstm/`

- `best_model_fold{n}.pt` — model weights
- `metrics_fold{n}.png` — loss/accuracy plots
- `confusion_matrix_fold{n}.png` — evaluation

---

####  Example Fold Result
```
Fold 1:
Accuracy       = 0.921
Sensitivity    = 0.945
Specificity    = 0.875
ROC AUC Score  = 0.970
```

---

####  Directory Structure
```
/PCGITA_MODELS/
└── models_lstm/
    ├── best_model_fold1.pt
    ├── metrics_fold1.png
    ├── confusion_matrix_fold1.png
    ├── ...
```


In [None]:
# -------------------- SETUP --------------------
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import GroupKFold
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report, roc_auc_score
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import seaborn as sns

# -------------------- PATHS --------------------
CSV_PATH = "/content/drive/MyDrive/PCGITA_RESULTS/pcgita_features_final_scaled.csv"
SAVE_DIR = "/content/drive/MyDrive/PCGITA_MODELS/models_lstm"
os.makedirs(SAVE_DIR, exist_ok=True)

# -------------------- LOAD & PREPROCESS --------------------
df = pd.read_csv(CSV_PATH)
df.columns = df.columns.str.strip().str.lower()
df = df[df["label"].isin(["PD", "HC"])].dropna()
df["label"] = LabelEncoder().fit_transform(df["label"])  # HC=0, PD=1

mfcc_cols = [col for col in df.columns if col.startswith("mfcc")]
assert len(mfcc_cols) >= 13, "Not enough MFCC features"

X = df[mfcc_cols].values
y = df["label"].values
groups = df["speaker_id"].values

scaler = StandardScaler()
X = scaler.fit_transform(X)
assert X.shape[1] % 13 == 0, "Feature count not divisible by 13"
X = X.reshape(X.shape[0], -1, 13)

# -------------------- DATASET CLASS --------------------
class ParkinsonDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32)
    def __len__(self): return len(self.y)
    def __getitem__(self, idx): return self.X[idx], self.y[idx]

# -------------------- MODEL --------------------
class LSTMClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers):
        super(LSTMClassifier, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)
        self.sigmoid = nn.Sigmoid()
    def forward(self, x):
        _, (hn, _) = self.lstm(x)
        return self.sigmoid(self.fc(hn[-1])).squeeze(1)

# -------------------- PLOTTING --------------------
def plot_metrics(losses, accs, fold):
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(losses); plt.title("Loss"); plt.xlabel("Epoch"); plt.ylabel("Loss"); plt.grid()
    plt.subplot(1, 2, 2)
    plt.plot(accs); plt.title("Validation Accuracy"); plt.xlabel("Epoch"); plt.ylabel("Accuracy"); plt.grid()
    plt.suptitle(f"Fold {fold} Training Progress")
    plt.savefig(f"{SAVE_DIR}/metrics_fold{fold}.png")
    plt.close()

def plot_confusion_matrix(cm, fold):
    plt.figure(figsize=(5, 4))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=["HC", "PD"], yticklabels=["HC", "PD"])
    plt.xlabel("Predicted"); plt.ylabel("Actual")
    plt.title(f"Confusion Matrix Fold {fold}")
    plt.tight_layout()
    plt.savefig(f"{SAVE_DIR}/confusion_matrix_fold{fold}.png")
    plt.close()

# -------------------- TRAINING FUNCTION --------------------
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=30, fold=0, patience=5):
    best_acc = 0
    patience_counter = 0
    loss_list, acc_list = [], []

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        # Validation
        model.eval()
        val_preds, val_labels = [], []
        with torch.no_grad():
            for inputs, labels in val_loader:
                outputs = model(inputs)
                val_preds.extend(outputs.numpy())
                val_labels.extend(labels.numpy())

        val_preds_bin = [1 if p > 0.5 else 0 for p in val_preds]
        acc = accuracy_score(val_labels, val_preds_bin)
        loss_list.append(total_loss)
        acc_list.append(acc)
        print(f"Epoch {epoch+1}: Loss={total_loss:.2f}, Val Acc={acc:.4f}")

        if acc > best_acc:
            best_acc = acc
            torch.save(model.state_dict(), f"{SAVE_DIR}/best_model_fold{fold}.pt")
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print(f"⏹ Early stopping triggered at epoch {epoch+1}")
                break

    print(f" Best Accuracy Fold {fold}: {best_acc:.4f}")
    plot_metrics(loss_list, acc_list, fold)

# -------------------- CROSS-VALIDATION --------------------
gkf = GroupKFold(n_splits=5)
for fold, (train_idx, val_idx) in enumerate(gkf.split(X, y, groups)):
    print(f"\n Fold {fold + 1}/5")
    X_train, y_train = X[train_idx], y[train_idx]
    X_val, y_val = X[val_idx], y[val_idx]

    train_dataset = ParkinsonDataset(X_train, y_train)
    val_dataset = ParkinsonDataset(X_val, y_val)
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32)

    model = LSTMClassifier(input_size=X.shape[2], hidden_size=64, num_layers=1)
    criterion = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    train_model(model, train_loader, val_loader, criterion, optimizer, fold=fold, patience=5)

    # Evaluation
    model.load_state_dict(torch.load(f"{SAVE_DIR}/best_model_fold{fold}.pt"))
    model.eval()

    val_preds, val_labels = [], []
    with torch.no_grad():
        for inputs, labels in val_loader:
            outputs = model(inputs)
            val_preds.extend(outputs.numpy())
            val_labels.extend(labels.numpy())

    val_preds_bin = [1 if p > 0.5 else 0 for p in val_preds]
    cm = confusion_matrix(val_labels, val_preds_bin)

    print(" Classification Report:\n", classification_report(val_labels, val_preds_bin, target_names=["HC", "PD"]))
    print(" Confusion Matrix:\n", cm)
    print(" ROC AUC Score:", roc_auc_score(val_labels, val_preds))

    plot_confusion_matrix(cm, fold)



🔁 Fold 1/5
Epoch 1: Loss=48.61, Val Acc=0.6746
Epoch 2: Loss=36.72, Val Acc=0.8270
Epoch 3: Loss=30.12, Val Acc=0.8254
Epoch 4: Loss=26.34, Val Acc=0.8381
Epoch 5: Loss=23.86, Val Acc=0.8460
Epoch 6: Loss=22.08, Val Acc=0.8524
Epoch 7: Loss=20.35, Val Acc=0.8635
Epoch 8: Loss=19.87, Val Acc=0.8619
Epoch 9: Loss=18.39, Val Acc=0.8698
Epoch 10: Loss=17.54, Val Acc=0.8778
Epoch 11: Loss=17.19, Val Acc=0.8746
Epoch 12: Loss=16.32, Val Acc=0.8683
Epoch 13: Loss=15.73, Val Acc=0.8794
Epoch 14: Loss=15.21, Val Acc=0.8762
Epoch 15: Loss=14.96, Val Acc=0.8825
Epoch 16: Loss=14.27, Val Acc=0.8714
Epoch 17: Loss=13.78, Val Acc=0.8810
Epoch 18: Loss=13.26, Val Acc=0.8825
Epoch 19: Loss=12.67, Val Acc=0.8825
Epoch 20: Loss=12.36, Val Acc=0.8873
Epoch 21: Loss=12.08, Val Acc=0.8873
Epoch 22: Loss=11.23, Val Acc=0.8810
Epoch 23: Loss=11.16, Val Acc=0.8873
Epoch 24: Loss=10.62, Val Acc=0.8873
Epoch 25: Loss=10.10, Val Acc=0.8778
⏹️ Early stopping triggered at epoch 25
✅ Best Accuracy Fold 0: 0.8873
📊

#### Fold Evaluation Metrics and ROC Curve

For each fold, we compute and store the following evaluation metrics:

- **Precision**: Proportion of predicted PD samples that are truly PD  
- **Recall (Sensitivity)**: Proportion of actual PD samples correctly identified  
- **F1-Score**: Harmonic mean of precision and recall  
- **AUC**: Area under the ROC Curve

A ROC curve is plotted per fold and saved under `/models_lstm/roc_curve_foldN.png`.

```
Fold 1:
Precision: 0.91
Recall:    0.95
F1-Score:  0.93
AUC:       0.97
```


###  Fold Evaluation Metrics and ROC Curve

For each validation fold, this code computes classification metrics (precision, recall, F1-score, AUC) and saves them to a summary list. It also generates and saves the ROC curve plot for visual inspection of model performance.


In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score, roc_curve, auc
import matplotlib.pyplot as plt

# Store metrics for summary
fold_metrics = []

# ---- Inside the for loop over folds, after predictions ----
precision = precision_score(val_labels, val_preds_bin)
recall = recall_score(val_labels, val_preds_bin)
f1 = f1_score(val_labels, val_preds_bin)
fpr, tpr, _ = roc_curve(val_labels, val_preds)
roc_auc = auc(fpr, tpr)

# Save metrics
fold_metrics.append({
    "Fold": fold,
    "Precision": precision,
    "Recall": recall,
    "F1-Score": f1,
    "AUC": roc_auc
})

# ROC Curve Plot
plt.figure()
plt.plot(fpr, tpr, label=f"AUC = {roc_auc:.2f}")
plt.plot([0, 1], [0, 1], linestyle='--', color='gray')
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title(f"ROC Curve Fold {fold}")
plt.legend()
plt.grid()
plt.tight_layout()
plt.savefig(f"{SAVE_DIR}/roc_curve_fold{fold}.png")
plt.close()


### Fold-wise Metric Summary for LSTM

This block creates a summary DataFrame of precision, recall, F1-score, and AUC for each fold, saves it as `lstm_fold_metrics.csv`, and visualizes the results in a grouped bar plot saved as `lstm_metric_bars.png`.


In [None]:
# Create summary dataframe
metrics_df = pd.DataFrame(fold_metrics)

# Save to CSV if needed
metrics_df.to_csv(f"{SAVE_DIR}/lstm_fold_metrics.csv", index=False)

# Bar plots
fig, ax = plt.subplots(figsize=(10, 6))
bar_width = 0.25
x = np.arange(len(metrics_df))

ax.bar(x - bar_width, metrics_df["Precision"], width=bar_width, label="Precision")
ax.bar(x, metrics_df["Recall"], width=bar_width, label="Recall")
ax.bar(x + bar_width, metrics_df["F1-Score"], width=bar_width, label="F1-Score")

ax.set_xticks(x)
ax.set_xticklabels([f"Fold {i}" for i in metrics_df["Fold"]])
ax.set_ylim(0, 1.05)
ax.set_title("Precision, Recall, and F1-score per Fold")
ax.set_ylabel("Score")
ax.legend()
plt.grid(axis='y')
plt.tight_layout()
plt.savefig(f"{SAVE_DIR}/lstm_metric_bars.png")
plt.close()


###  Spectrogram Metadata Extraction

This script recursively walks through the `mel_augmented` spectrogram folder, collecting the relative path, task, label (PD/HC), and speaker ID from each `.png` image. It filters for valid labels and saves the extracted metadata to `spectrogram_metadata.csv`.


In [None]:
import os
import pandas as pd

IMG_DIR = "/content/drive/MyDrive/pcgita_images/mel_augmented/"
output_rows = []

for root, dirs, files in os.walk(IMG_DIR):
    for fname in files:
        if fname.endswith(".png"):
            full_path = os.path.join(root, fname)
            rel_path = os.path.relpath(full_path, IMG_DIR)
            parts = rel_path.split(os.sep)

            if len(parts) >= 3:
                task, label, speaker_id = parts[:3]
                output_rows.append({
                    "relative_path": rel_path,
                    "label": label,
                    "speaker_id": speaker_id,
                    "task": task
                })

df = pd.DataFrame(output_rows)
df = df[df["label"].isin(["PD", "HC"])]
df.to_csv("/content/drive/MyDrive/PCGITA_RESULTS/spectrogram_metadata.csv", index=False)
print(" Saved spectrogram_metadata.csv with", len(df), "rows.")


✅ Saved spectrogram_metadata.csv with 19212 rows.


### Summary

This script scans the spectrogram image directory and generates a metadata CSV file with task, label, speaker ID, filename, and relative path. It then performs a **stratified split by speaker** into train, validation, and test sets (85% train/val, 15% test; of train/val, 15% is validation). The final annotated metadata is saved to `spectrogram_metadata_with_split.csv`.


In [None]:
import os
import pandas as pd
from pathlib import Path
from sklearn.model_selection import train_test_split

# --- CONFIG ---
SPECTROGRAM_DIR = Path("/content/drive/MyDrive/pcgita_images/mel")
OUTPUT_CSV = Path("/content/drive/MyDrive/PCGITA_RESULTS/spectrogram_metadata_with_split.csv")

# --- STEP 1: Walk through and collect metadata ---
records = []

for task in sorted(os.listdir(SPECTROGRAM_DIR)):
    task_path = SPECTROGRAM_DIR / task
    if not task_path.is_dir():
        continue
    for label in ["HC", "PD"]:
        label_path = task_path / label
        if not label_path.is_dir():
            continue
        for speaker_id in os.listdir(label_path):
            speaker_path = label_path / speaker_id
            if not speaker_path.is_dir():
                continue
            for fname in os.listdir(speaker_path):
                if fname.endswith(".png"):
                    rel_path = os.path.join(task, label, speaker_id, fname)
                    records.append({
                        "task": task,
                        "label": label,
                        "speaker_id": speaker_id,
                        "filename": fname,
                        "relative_path": rel_path
                    })

df = pd.DataFrame(records)

# --- STEP 2: Assign split (Stratified by speaker) ---
speakers = df[["speaker_id", "label"]].drop_duplicates()

train_spk, test_spk = train_test_split(
    speakers, test_size=0.15, stratify=speakers["label"], random_state=42
)
train_spk, val_spk = train_test_split(
    train_spk, test_size=0.1765, stratify=train_spk["label"], random_state=42
)
# 0.1765 of 85% ≈ 15%

# Map splits
split_map = {}
split_map.update({spk: "train" for spk in train_spk["speaker_id"]})
split_map.update({spk: "val" for spk in val_spk["speaker_id"]})
split_map.update({spk: "test" for spk in test_spk["speaker_id"]})

df["split"] = df["speaker_id"].map(split_map)

# --- STEP 3: Save to CSV ---
df.to_csv(OUTPUT_CSV, index=False)
print(f" Saved to: {OUTPUT_CSV}")
print(df["split"].value_counts())


✅ Saved to: /content/drive/MyDrive/PCGITA_RESULTS/spectrogram_metadata_with_split.csv
split
train    4648
test      882
val       881
Name: count, dtype: int64
