<a href="https://colab.research.google.com/github/NolanRink/CS4540/blob/main/Project3/Project3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [15]:
import numpy as np
import h5py
from scipy import signal, stats
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import torch
import torch.nn as nn

files = {
    "Rat08_HPC": "Rat08-20130711_017.h5",
    "SubjectHB10_BLA": "Part1SubjectHB10.h5",
    "SubjectHB13_BLA": "Part2SubjectHB13.h5"
}


In [17]:
# Create a dictionary to store segments, labels, and sampling frequency for each subject.
# Here, WAKE is assigned label 0 and NREM label 1.
data_dict = {}

for rat, filename in files.items():
    segments = []  # to store individual recording segments
    labels = []    # to store corresponding state labels
    # Open the file for each subject.
    with h5py.File(filename, 'r') as f:
        print(f"{rat} keys: {list(f.keys())}")  # Should show ['NREM', 'WAKE']
        print(f['NREM'])
        for state in f.keys():
            label = 1 if state == "NREM" else 0  # assign state label
            group = f[state]
            for seg_id in group.keys():
                seg_data = group[seg_id][()]  # load the segment as a NumPy array
                segments.append(seg_data.astype(float))
                labels.append(label)

    with h5py.File(filename, 'r') as f:
        fs = f.attrs.get('fs', 1250)  # default to 1250 Hz if not found
    data_dict[rat] = {"segments": segments, "labels": np.array(labels), "fs": fs}
    print(f"{rat}: Loaded {len(segments)} segments")

Rat08_HPC keys: ['NREM', 'WAKE']
<HDF5 group "/NREM" (59 members)>
Rat08_HPC: Loaded 96 segments
SubjectHB10_BLA keys: ['NREM', 'WAKE']
<HDF5 group "/NREM" (17 members)>
SubjectHB10_BLA: Loaded 55 segments
SubjectHB13_BLA keys: ['NREM', 'WAKE']
<HDF5 group "/NREM" (19 members)>
SubjectHB13_BLA: Loaded 41 segments


In [18]:
# Define clip duration (in seconds) and calculate number of samples for 5 seconds.
default_fs = 1250  # Use 1250 Hz if no sampling frequency is provided in file
clip_duration = 5  # seconds
segment_length = int(clip_duration * default_fs)  # samples per clip

lfp_clips = {}       # dictionary to hold fixed-length clips (per subject)
clip_labels = {}     # dictionary to hold label for each clip

for rat, data in data_dict.items():
    fs = data.get("fs", default_fs)
    seg_list = data["segments"]  # list of variable-length segments
    label_list = data["labels"]  # list of corresponding labels
    clips = []
    labels_out = []
    for seg, lab in zip(seg_list, label_list):
        if len(seg) >= segment_length:
            n_clips = len(seg) // segment_length
            seg_trunc = seg[:n_clips * segment_length]  # truncate extra samples
            seg_clips = seg_trunc.reshape(n_clips, segment_length)
            clips.append(seg_clips)
            # Each clip gets the parent segment's label
            labels_out.extend([lab] * n_clips)
        else:
            continue  # skip segments that are too short
    # Concatenate clips along axis 0 for each subject
    if clips:
        clips = np.concatenate(clips, axis=0)
    else:
        clips = np.empty((0, segment_length))
    lfp_clips[rat] = clips
    clip_labels[rat] = np.array(labels_out)
    print(f"{rat}: segmented into {clips.shape[0]} clips of 5s each")


Rat08_HPC: segmented into 3682 clips of 5s each
SubjectHB10_BLA: segmented into 703 clips of 5s each
SubjectHB13_BLA: segmented into 713 clips of 5s each


In [19]:
# Define a function to extract features from a clip.
# Features: mean, std, skew, kurtosis, and band power in delta, theta, alpha, beta, gamma bands.
def extract_features(segment, fs):
    mean_val = np.mean(segment)
    std_val = np.std(segment)
    skew_val = stats.skew(segment)
    kurt_val = stats.kurtosis(segment, fisher=False)  # non-Fisher, so normal distribution ~3
    freqs, psd = signal.welch(segment, fs=fs, nperseg=fs*2)  # 2-second window
    band_powers = []
    bands = {"delta": (0.5, 4), "theta": (4, 8), "alpha": (8, 15), "beta": (15, 30), "gamma": (30, 100)}
    for band, (f_low, f_high) in bands.items():
        band_mask = (freqs >= f_low) & (freqs < f_high)
        # Use np.trapezoid (instead of the deprecated trapz)
        band_power = np.trapezoid(psd[band_mask], freqs[band_mask])
        band_powers.append(band_power)
    return [mean_val, std_val, skew_val, kurt_val] + band_powers

# Compute feature matrices for each subject.
feature_matrices = {}
for rat, clips in lfp_clips.items():
    fs = data_dict[rat].get("fs", default_fs)
    features = [extract_features(clip, fs) for clip in clips]
    feature_matrices[rat] = np.array(features)
    print(f"{rat}: Feature matrix shape = {feature_matrices[rat].shape}")


Rat08_HPC: Feature matrix shape = (3682, 9)
SubjectHB10_BLA: Feature matrix shape = (703, 9)
SubjectHB13_BLA: Feature matrix shape = (713, 9)


In [20]:
# We'll use the Rat08_HPC data as an example.
features_all = feature_matrices["Rat08_HPC"]   # shape (n_clips, 9)
labels_all   = clip_labels["Rat08_HPC"]

print("Total number of clips: ", features_all.shape[0])

# Split data into Train (70%), Validation (15%), and Test (15%)
X_train_feat, X_temp, y_train, y_temp = train_test_split(features_all, labels_all,
                                                         test_size=0.30, random_state=42, stratify=labels_all)
X_val_feat, X_test_feat, y_val, y_test = train_test_split(X_temp, y_temp,
                                                         test_size=0.5, random_state=42, stratify=y_temp)

print("Train features shape:", X_train_feat.shape)
print("Validation features shape:", X_val_feat.shape)
print("Test features shape:", X_test_feat.shape)

# Normalize using StandardScaler based on training features.
scaler = StandardScaler().fit(X_train_feat)
X_train_feat_norm = scaler.transform(X_train_feat)
X_val_feat_norm   = scaler.transform(X_val_feat)
X_test_feat_norm  = scaler.transform(X_test_feat)


Total number of clips:  3682
Train features shape: (2577, 9)
Validation features shape: (552, 9)
Test features shape: (553, 9)


In [21]:
# Define a simple feedforward neural network for sleep state classification.
class FFNeuralNet(nn.Module):
    def __init__(self, input_dim):
        super(FFNeuralNet, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 2)  # two classes: WAKE (0) and NREM (1)
        )
    def forward(self, x):
        return self.net(x)

# Instantiate the model (input_dim = 9 features)
model = FFNeuralNet(input_dim=9)
print(model)

# Convert numpy arrays to PyTorch tensors.
X_train_t = torch.from_numpy(X_train_feat_norm).float()
y_train_t = torch.from_numpy(y_train).long()
X_val_t   = torch.from_numpy(X_val_feat_norm).float()
y_val_t   = torch.from_numpy(y_val).long()

# Set up loss function and optimizer.
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
num_epochs = 50  # Adjust epochs as needed

# Train the model.
for epoch in range(1, num_epochs+1):
    model.train()
    optimizer.zero_grad()
    outputs = model(X_train_t)
    loss = criterion(outputs, y_train_t)
    loss.backward()
    optimizer.step()

    # Compute training accuracy
    _, train_preds = torch.max(outputs, 1)
    train_acc = (train_preds == y_train_t).float().mean().item()

    # Compute validation accuracy
    model.eval()
    with torch.no_grad():
        val_outputs = model(X_val_t)
        _, val_preds = torch.max(val_outputs, 1)
        val_acc = (val_preds == y_val_t).float().mean().item()

    if epoch % 5 == 0:
        print(f"Epoch {epoch}: Train Acc = {train_acc:.3f}, Val Acc = {val_acc:.3f}")


FFNeuralNet(
  (net): Sequential(
    (0): Linear(in_features=9, out_features=64, bias=True)
    (1): ReLU()
    (2): Linear(in_features=64, out_features=32, bias=True)
    (3): ReLU()
    (4): Linear(in_features=32, out_features=2, bias=True)
  )
)
Epoch 5: Train Acc = 0.787, Val Acc = 0.793
Epoch 10: Train Acc = 0.871, Val Acc = 0.841
Epoch 15: Train Acc = 0.894, Val Acc = 0.877
Epoch 20: Train Acc = 0.904, Val Acc = 0.880
Epoch 25: Train Acc = 0.908, Val Acc = 0.893
Epoch 30: Train Acc = 0.920, Val Acc = 0.902
Epoch 35: Train Acc = 0.919, Val Acc = 0.902
Epoch 40: Train Acc = 0.926, Val Acc = 0.911
Epoch 45: Train Acc = 0.931, Val Acc = 0.917
Epoch 50: Train Acc = 0.933, Val Acc = 0.909


In [22]:
# Convert test data to PyTorch and evaluate performance
X_test_t = torch.from_numpy(X_test_feat_norm).float()
y_test_t = torch.from_numpy(y_test).long()

model.eval()
with torch.no_grad():
    test_outputs = model(X_test_t)
    _, test_preds = torch.max(test_outputs, 1)
    test_acc = (test_preds == y_test_t).float().mean().item()

print(f"Test Accuracy: {test_acc:.3f}")


Test Accuracy: 0.937
