<a href="https://colab.research.google.com/github/EdDee296/seizure-forecasting-system/blob/EdDee/AI_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **0. Install dependencies**

**Install Dependencies**

In [None]:
# Install required libraries
!pip install tensorflow numpy pandas scipy scikit-learn matplotlib


**Verify Installation**

In [None]:
import tensorflow as tf
import numpy as np
import pandas as pd
import scipy
import sklearn

print("TensorFlow version:", tf.__version__)
print("NumPy version:", np.__version__)
print("Pandas version:", pd.__version__)
print("SciPy version:", scipy.__version__)
print("Scikit-Learn version:", sklearn.__version__)

# **1. Data Processing**
**Import Libraries**

In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from scipy import signal


**Feature Extraction Function**

In [None]:
def extract_features(window: pd.DataFrame, feature_config: dict) -> np.ndarray:
    features = []
    for signal_type, channels in feature_config.items():
        for channel in channels:
            data = window[f'{signal_type}_{channel}']
            # Time domain features
            features.extend([
                np.mean(data),
                np.std(data),
                np.median(data),
                np.max(data),
                np.min(data)
            ])
            # Frequency domain features
            if signal_type in ['acc', 'eda']:
                freqs, psd = signal.welch(data)
                features.extend([
                    np.sum(psd),
                    np.mean(psd),
                    freqs[np.argmax(psd)]
                ])
    return np.array(features)

**Data Processing Function**

In [None]:
def process_wearable_data(raw_data: pd.DataFrame) -> np.ndarray:
    features = {
        'acc': ['x', 'y', 'z'],
        'eda': ['conductance'],
        'hr': ['bpm'],
        'temp': ['celsius']
    }
    window_size = 60 * 30  # 30 minute windows
    overlap = 0.5          # 50% overlap
    processed_windows = []
    for start in range(0, len(raw_data), int(window_size * overlap)):
        window = raw_data.iloc[start:start + window_size]
        if len(window) == window_size:
            window_features = extract_features(window, features)
            processed_windows.append(window_features)
    return np.array(processed_windows)


# **2. LSTM Model Architecture**
**Import TensorFlow Libraries**

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.optimizers import Adam

**Build Model Function**

In [None]:
def build_model(input_shape: tuple, lstm_units: list = [64, 32], dropout: float = 0.5) -> tf.keras.Model:
    model = Sequential()
    model.add(LSTM(lstm_units[0], input_shape=input_shape, return_sequences=True))
    model.add(Dropout(dropout))
    for units in lstm_units[1:]:
        model.add(LSTM(units, return_sequences=False))
        model.add(Dropout(dropout))
    model.add(Dense(1, activation='sigmoid'))
    model.compile(optimizer=Adam(learning_rate=0.001), loss='binary_crossentropy', metrics=['AUC'])
    return model


# **3. Training Pipeline**
**Seizure Forecaster Class**

In [None]:
class SeizureForecaster:
    def __init__(self, prediction_horizon: int = 30, sequence_length: int = 60):
        self.prediction_horizon = prediction_horizon
        self.sequence_length = sequence_length
        self.model = None
        self.scaler = StandardScaler()

    def prepare_sequences(self, features: np.ndarray, labels: np.ndarray) -> tuple:
        X, y = [], []
        for i in range(len(features) - self.sequence_length):
            X.append(features[i:i + self.sequence_length])
            future_window = labels[i + self.sequence_length: i + self.sequence_length + self.prediction_horizon]
            y.append(1 if np.any(future_window) else 0)
        return np.array(X), np.array(y)

    def train(self, train_data: tuple, val_data: tuple, epochs: int = 100, batch_size: int = 32):
        X_train, y_train = train_data
        X_val, y_val = val_data
        X_train_reshaped = X_train.reshape(-1, X_train.shape[-1])
        X_val_reshaped = X_val.reshape(-1, X_val.shape[-1])
        self.scaler.fit(X_train_reshaped)
        X_train_scaled = self.scaler.transform(X_train_reshaped).reshape(X_train.shape)
        X_val_scaled = self.scaler.transform(X_val_reshaped).reshape(X_val.shape)
        self.model = build_model(input_shape=(self.sequence_length, X_train.shape[-1]))
        history = self.model.fit(
            X_train_scaled, y_train,
            validation_data=(X_val_scaled, y_val),
            epochs=epochs,
            batch_size=batch_size,
            callbacks=[tf.keras.callbacks.EarlyStopping(monitor='val_auc', patience=10, restore_best_weights=True)]
        )
        return history

    def predict(self, features: np.ndarray) -> np.ndarray:
        features_scaled = self.scaler.transform(features.reshape(-1, features.shape[-1])).reshape(features.shape)
        return self.model.predict(features_scaled)


# **4. Evaluation**
**Import Evaluation Libraries**

In [None]:
from sklearn.metrics import roc_auc_score, precision_recall_curve


**Evaluation Function**

In [None]:
def evaluate_forecaster(y_true: np.ndarray, y_pred: np.ndarray, random_predictions: np.ndarray) -> dict:
    actual_auc = roc_auc_score(y_true, y_pred)
    random_auc = roc_auc_score(y_true, random_predictions)
    precision, recall, thresholds = precision_recall_curve(y_true, y_pred)
    return {
        'auc_roc': actual_auc,
        'random_auc': random_auc,
        'precision': precision,
        'recall': recall,
        'thresholds': thresholds
    }


# **5. Usage Example**
**Load and Process Data**

In [None]:
raw_data = pd.read_csv('wearable_data.csv')  # Replace with actual file path
seizure_labels = pd.read_csv('seizure_events.csv')  # Replace with actual file path

processed_data = process_wearable_data(raw_data)


**Initialize and Train the Forecaster**

In [None]:
forecaster = SeizureForecaster(prediction_horizon=30, sequence_length=60)
X, y = forecaster.prepare_sequences(processed_data, seizure_labels)

# Split data into training and validation sets
train_idx = int(len(X) * 0.8)
X_train, y_train = X[:train_idx], y[:train_idx]
X_val, y_val = X[train_idx:], y[train_idx:]

# Train the model
history = forecaster.train(train_data=(X_train, y_train), val_data=(X_val, y_val))


**Generate Predictions and Evaluate**

In [None]:
predictions = forecaster.predict(X_val)
random_predictions = np.random.random(len(y_val))
metrics = evaluate_forecaster(y_val, predictions, random_predictions)

print(f"AUC-ROC: {metrics['auc_roc']:.2f}")
print(f"Random AUC-ROC: {metrics['random_auc']:.2f}")
