In [None]:
import os
import zipfile
import subprocess
import numpy as np
from scipy.io import loadmat
from scipy.signal import butter, lfilter
import torch
from torch.utils.data import Dataset, DataLoader
from torch import nn, optim
import gdown

# Step 1: Clone the GitHub Repository
if not os.path.exists("MHyEEG"):
    !git clone https://github.com/alessioborgi/MHyEEG.git

# Navigate to the repository
ios.chdir("MHyEEG")

# Step 2: Download and Extract DEAP Dataset
def download_and_extract_deap(dataset_url, output_dir="data"):
    os.makedirs(output_dir, exist_ok=True)
    zip_path = os.path.join(output_dir, "DEAP.zip")
    gdown.download(dataset_url, zip_path, quiet=False)
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(output_dir)
    print(f"Dataset extracted to {output_dir}")

# Provide the dataset URL
DEAP_URL = "https://drive.google.com/uc?id=YOUR_FILE_ID"  # Replace YOUR_FILE_ID with actual ID
data_dir = "data"
download_and_extract_deap(DEAP_URL, data_dir)

# Step 3: Preprocess the Dataset
def butter_bandpass(lowcut, highcut, fs, order=5):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return b, a

def bandpass_filter(data, lowcut, highcut, fs, order=5):
    b, a = butter_bandpass(lowcut, highcut, fs, order=order)
    return lfilter(b, a, data, axis=0)

def preprocess_deap(input_dir, output_file):
    eeg_data = []
    labels = []
    for file_name in os.listdir(input_dir):
        if file_name.endswith('.mat'):
            data = loadmat(os.path.join(input_dir, file_name))
            eeg = data['data'][:, :32]  # First 32 channels are EEG
            label = data['labels'][:, :2]  # First two labels are valence and arousal
            fs = 128  # Sampling frequency for DEAP
            eeg = bandpass_filter(eeg, 1, 45, fs)
            eeg_data.append(eeg)
            labels.append(label)
    np.savez(output_file, eeg=np.array(eeg_data), labels=np.array(labels))

input_dir = os.path.join(data_dir, "DEAP")
output_file = os.path.join(data_dir, "preprocessed_deap_data.npz")
preprocess_deap(input_dir, output_file)

# Step 4: Create PyTorch Dataset Loader
class DEAPDataset(Dataset):
    def __init__(self, data_file):
        data = np.load(data_file)
        self.eeg = data['eeg']
        self.labels = data['labels']

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        eeg = self.eeg[idx]
        label = self.labels[idx][:2]  # Valence and Arousal
        return torch.tensor(eeg, dtype=torch.float32), torch.tensor(label, dtype=torch.float32)

# Step 5: Reuse Hypercomplex Model
from models.hyperfusenet import HyperFuseNet  # Assuming HyperFuseNet is in models

# Step 6: Define Training Script
def train(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    for eeg, labels in dataloader:
        eeg, labels = eeg.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(eeg)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(dataloader)

# Training Configuration
def main():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    dataset = DEAPDataset(output_file)
    dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

    model = HyperFuseNet(input_dims={'eeg': 32}, hidden_dim=128, output_dim=2, n=4).to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.MSELoss()

    for epoch in range(20):
        loss = train(model, dataloader, optimizer, criterion, device)
        print(f"Epoch {epoch + 1}, Loss: {loss:.4f}")

main()
