# Sleep Disorder Classification - Advanced Architecture Training

This notebook implements the training pipeline for 5 models: KNN, SVM, Random Forest, ANN, and CNN.
It uses Optuna for hyperparameter optimization and saves the best models to Google Drive.

## Quick Start
By default, this notebook contains all the necessary source code embedded within it. 
However, if you have pushed the project code to GitHub, you can use the **'Clone from GitHub'** section below instead of writing the files manually.

In [None]:
# 1. Setup Environment
!pip install optuna imbalanced-learn

In [None]:
# 2. Mount Google Drive
from google.colab import drive
import os

drive.mount('/content/drive')

# Create specific folder for this project results
project_path = '/content/drive/MyDrive/Sleep_Disorder_Project'
os.makedirs(project_path, exist_ok=True)
print(f"Project contents will be saved to: {project_path}")

In [None]:
# 3. Upload Dataset
from google.colab import files
import pandas as pd
import io

print("Please upload 'sleep_dataset.csv' from your local machine:")
uploaded = files.upload()

for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(
      name=fn, length=len(uploaded[fn])))
  # Ensure it is named correctly for the script
  if fn != 'sleep_dataset.csv':
      os.rename(fn, 'sleep_dataset.csv')
      print("Renamed uploaded file to 'sleep_dataset.csv'")

---

## Option A: Clone from GitHub
Use this if you have pushed the project code to a GitHub repository.
Replace `YOUR_GITHUB_USERNAME/REPO_NAME` with your actual repository URL.

In [None]:
# Uncomment and run this cell to clone your repo
!git clone https://github.com/Pradeep-1803/buhbuybnu.git
%cd buhbuybnu

---

## Option B: Embedded Source Code
Run these cells if you are NOT cloning from GitHub. This will create the necessary Python files directly in the Colab environment.

In [None]:
# 4. Create Source Directory
os.makedirs('src', exist_ok=True)

In [None]:
%%writefile src/data_loader.py
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from imblearn.over_sampling import SMOTE
import warnings

warnings.filterwarnings('ignore')

def load_and_process_data(filepath):
    print(f"Loading data from {filepath}...")
    df = pd.read_csv(filepath)
    
    if 'Person ID' in df.columns:
        df = df.drop(columns=['Person ID'])
    
    df['Sleep Disorder'] = df['Sleep Disorder'].fillna('None')
    print("Class distribution before balancing:")
    print(df['Sleep Disorder'].value_counts())

    if 'Blood Pressure' in df.columns:
        df[['Systolic_BP', 'Diastolic_BP']] = df['Blood Pressure'].str.split('/', expand=True).astype(float)
        df = df.drop(columns=['Blood Pressure'])
    
    if 'BMI Category' in df.columns:
        df['BMI Category'] = df['BMI Category'].replace({'Normal Weight': 'Normal'})

    target_col = 'Sleep Disorder'
    categorical_cols = ['Gender', 'Occupation', 'BMI Category']
    numeric_cols = [col for col in df.columns if col not in categorical_cols + [target_col]]
    
    numeric_transformer = Pipeline(steps=[('scaler', StandardScaler())])
    categorical_transformer = Pipeline(steps=[('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False))])

    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_cols),
            ('cat', categorical_transformer, categorical_cols)
        ]
    )
    
    X = df.drop(columns=[target_col])
    y = df[target_col]
    
    label_encoder = LabelEncoder()
    y_encoded = label_encoder.fit_transform(y)
    
    print("Preprocessing features...")
    X_processed = preprocessor.fit_transform(X)
    
    X_train, X_test, y_train, y_test = train_test_split(
        X_processed, y_encoded, test_size=0.3, random_state=42, stratify=y_encoded
    )
    
    print("Applying SMOTE to training data...")
    smote = SMOTE(random_state=42)
    X_train_resampled, y_train_resampled = smote.fit_resample(X_train, y_train)
    
    return X_train_resampled, X_test, y_train_resampled, y_test, label_encoder

In [None]:
%%writefile src/models.py
import torch
import torch.nn as nn
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from torch.utils.data import Dataset

class SleepDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

class ANN(nn.Module):
    def __init__(self, input_dim, hidden_layers=1, units_per_layer=24, dropout_rate=0.2, num_classes=3):
        super(ANN, self).__init__()
        layers = []
        layers.append(nn.Linear(input_dim, units_per_layer))
        layers.append(nn.BatchNorm1d(units_per_layer))
        layers.append(nn.ReLU())
        layers.append(nn.Dropout(dropout_rate))
        
        for _ in range(hidden_layers - 1):
            layers.append(nn.Linear(units_per_layer, units_per_layer))
            layers.append(nn.BatchNorm1d(units_per_layer))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout_rate))
            
        layers.append(nn.Linear(units_per_layer, num_classes))
        self.network = nn.Sequential(*layers)

    def forward(self, x):
        return self.network(x)

class CNN(nn.Module):
    def __init__(self, input_dim, filters=32, kernel_size=2, dropout_rate=0.3, num_classes=3):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=filters, kernel_size=kernel_size)
        self.bn1 = nn.BatchNorm1d(filters)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout_rate)
        self.pool = nn.MaxPool1d(kernel_size=2)
        
        conv_out_size = input_dim - kernel_size + 1
        pool_out_size = conv_out_size // 2
        if pool_out_size <= 0:
             self.pool = nn.Identity()
             pool_out_size = conv_out_size

        self.flatten = nn.Flatten()
        self.fc = nn.Linear(filters * pool_out_size, num_classes)

    def forward(self, x):
        x = x.unsqueeze(1)
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.pool(x)
        x = self.dropout(x)
        x = self.flatten(x)
        x = self.fc(x)
        return x

def get_sklearn_model(name, params):
    if name == 'KNN':
        return KNeighborsClassifier(**params, n_jobs=-1)
    elif name == 'SVM':
        return SVC(**params)
    elif name == 'RF':
        return RandomForestClassifier(**params, n_jobs=-1)
    else:
        raise ValueError(f"Unknown sklearn model: {name}")

In [None]:
# 5. Training and Evaluation Function
import optuna
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, classification_report
import numpy as np
import pandas as pd
from src.data_loader import load_and_process_data
from src.models import SleepDataset, ANN, CNN, get_sklearn_model
import joblib
import os

DATA_PATH = "sleep_dataset.csv"
N_TRIALS = 20  # You can increase this for better optimization
EPOCHS = 20
BATCH_SIZE = 64
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
SAVE_DIR = '/content/drive/MyDrive/Sleep_Disorder_Project'

def train_torch_model(model, train_loader, val_loader, epochs, lr):
    model.to(DEVICE)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    best_val_f1 = 0.0
    
    for epoch in range(epochs):
        model.train()
        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(DEVICE), y_batch.to(DEVICE)
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()
        
        model.eval()
        all_preds, all_labels = [], []
        with torch.no_grad():
            for X_batch, y_batch in val_loader:
                X_batch, y_batch = X_batch.to(DEVICE), y_batch.to(DEVICE)
                outputs = model(X_batch)
                _, preds = torch.max(outputs, 1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(y_batch.cpu().numpy())
        
        val_f1 = f1_score(all_labels, all_preds, average='weighted')
        if val_f1 > best_val_f1:
            best_val_f1 = val_f1
    return best_val_f1

def objective(trial, model_name, X_train, X_val, y_train, y_val, input_dim):
    if model_name == 'KNN':
        params = {'n_neighbors': trial.suggest_int('n_neighbors', 3, 20), 'weights': trial.suggest_categorical('weights', ['uniform', 'distance'])}
        model = get_sklearn_model('KNN', params)
        model.fit(X_train, y_train)
        return f1_score(y_val, model.predict(X_val), average='weighted')
    elif model_name == 'SVM':
        idx = np.random.choice(len(X_train), size=min(len(X_train), 5000), replace=False)
        X_sub, y_sub = X_train[idx], y_train[idx]
        params = {'C': trial.suggest_float('C', 0.1, 10.0, log=True), 'kernel': 'rbf'}
        model = get_sklearn_model('SVM', params)
        model.fit(X_sub, y_sub)
        return f1_score(y_val, model.predict(X_val), average='weighted')
    elif model_name == 'RF':
        params = {'n_estimators': trial.suggest_int('n_estimators', 50, 150), 'max_depth': trial.suggest_int('max_depth', 5, 20)}
        model = get_sklearn_model('RF', params)
        model.fit(X_train, y_train)
        return f1_score(y_val, model.predict(X_val), average='weighted')
    elif model_name == 'ANN' or model_name == 'CNN':
        lr = trial.suggest_float('lr', 1e-4, 1e-2, log=True)
        dropout = trial.suggest_float('dropout_rate', 0.1, 0.5)
        train_ds, val_ds = SleepDataset(X_train, y_train), SleepDataset(X_val, y_val)
        train_dl, val_dl = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True), DataLoader(val_ds, batch_size=BATCH_SIZE)
        if model_name == 'ANN':
            model = ANN(input_dim, trial.suggest_int('hidden_layers', 1, 3), trial.suggest_int('units_per_layer', 16, 128), dropout)
        else:
            model = CNN(input_dim, trial.suggest_int('filters', 16, 64), trial.suggest_int('kernel_size', 2, 3), dropout)
        return train_torch_model(model, train_dl, val_dl, 10, lr)

def run_all():
    X_full, X_test, y_full, y_test, le = load_and_process_data(DATA_PATH)
    input_dim = X_full.shape[1]
    
    # Validation split for Optuna
    test_split_idx = int(0.8 * len(X_full))
    X_train_opt, X_val_opt = X_full[:test_split_idx], X_full[test_split_idx:]
    y_train_opt, y_val_opt = y_full[:test_split_idx], y_full[test_split_idx:]
    
    models = ['KNN', 'SVM', 'RF', 'ANN', 'CNN']
    results = {}

    for m in models:
        print(f"\nOptimizing {m}...")
        study = optuna.create_study(direction='maximize')
        study.optimize(lambda t: objective(t, m, X_train_opt, X_val_opt, y_train_opt, y_val_opt, input_dim), n_trials=N_TRIALS)
        print(f"Best Params: {study.best_params}")
        
        # Retrain Best Model
        print(f"Training final {m}...")
        if m in ['KNN', 'SVM', 'RF']:
            model = get_sklearn_model(m, study.best_params)
            model.fit(X_full, y_full)
            preds = model.predict(X_test)
            # Save Model
            model_path = os.path.join(SAVE_DIR, f'{m}_best_model.joblib')
            joblib.dump(model, model_path)
            print(f"Saved {m} to {model_path}")
        else:
            p = study.best_params
            if m == 'ANN':
                model = ANN(input_dim, p['hidden_layers'], p['units_per_layer'], p['dropout_rate'])
            else:
                model = CNN(input_dim, p['filters'], p['kernel_size'], p['dropout_rate'])
            
            ds_train, ds_test = SleepDataset(X_full, y_full), SleepDataset(X_test, y_test)
            dl_train = DataLoader(ds_train, batch_size=BATCH_SIZE, shuffle=True)
            dl_test = DataLoader(ds_test, batch_size=BATCH_SIZE)
            
            train_torch_model(model, dl_train, dl_test, epochs=EPOCHS, lr=p['lr'])
            
            # Save PyTorch Model
            model_path = os.path.join(SAVE_DIR, f'{m}_best_model.pth')
            torch.save(model.state_dict(), model_path)
            print(f"Saved {m} to {model_path}")
            
            model.eval()
            preds = []
            with torch.no_grad():
                for Xb, _ in dl_test:
                    out = model(Xb.to(DEVICE))
                    preds.extend(torch.max(out, 1)[1].cpu().numpy())
        
        acc = accuracy_score(y_test, preds)
        f1 = f1_score(y_test, preds, average='weighted')
        results[m] = {'Accuracy': acc, 'F1': f1}
        print(f"{m} Result -> Accuracy: {acc:.4f}, F1: {f1:.4f}")
        
    print("\n--- Final Results ---")
    print(pd.DataFrame(results).T)

run_all()