# A Case Study - Seizure Detection

## 1. Seizure detection dataset

In [None]:
import torch
import memtorch
from torch.utils.data import Dataset
import torch.nn.functional as F
import torch.nn as nn
import pandas as pd
import numpy as np
import sklearn
from sklearn import preprocessing


class SeizureDataset(Dataset):

    def __init__(self, path_to_csv):
        self.features = pd.read_csv(path_to_csv)
        self.labels = self.features.pop('y')
        self.features = preprocessing.scale(self.features.iloc[:, 1:], axis=0)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, i):
        if self.labels[i] == 1:
            label = 1
        else:
            label = 0

        return np.asarray(self.features[i, :]).astype(np.float), label

csv_path = 'http://archive.ics.uci.edu/ml/machine-learning-databases/00388/data.csv'
dataset = SeizureDataset(path_to_csv=csv_path)

## 2. Network architecture

In [None]:
class EEGNet(nn.Module):
    def __init__(self):
        super(EEGNet, self).__init__()
        self.fc1 = nn.Linear(178, 200)
        self.batchnorm1 = nn.BatchNorm1d(200)
        self.fc2 = nn.Linear(200, 200)
        self.batchnorm2 = nn.BatchNorm1d(200)
        self.fc3 = nn.Linear(200, 200)
        self.batchnorm3 = nn.BatchNorm1d(200)
        self.fc4 = nn.Linear(200, 2)
        self.batchnorm4 = nn.BatchNorm1d(2)

    def forward(self, x):
        x = self.batchnorm1(F.relu(self.fc1(x)))
        x = self.batchnorm2(F.relu(self.fc2(x)))
        x = self.batchnorm3(F.relu(self.fc3(x)))
        x = self.batchnorm4(self.fc4(x))
        return F.log_softmax(x, dim=1)

## 3. Training methodology

In [None]:
import sklearn
from sklearn.model_selection import KFold


init_lr = 1e-1
batch_size = 1024
device = torch.device('cpu' if 'cpu' in memtorch.__version__ else 'cuda')

def adjust_lr(optimizer, epoch):
    lr = init_lr * (0.1 ** (epoch // 20))
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr

    return lr

def train(net, train_loader, test_loader, epochs=10, fold=0):
    print('fold %d' % fold)
    best_f1_score = 0
    for epoch in range(epochs):
        lr = adjust_lr(optimizer, epoch)
        running_loss = 0
        for data in train_loader:
            inputs, labels = data
            inputs = inputs.float()
            if device == torch.device('cuda'):
                inputs = inputs.cuda()
                labels = labels.cuda()

            optimizer.zero_grad()
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        f1_score = test(net, test_loader)
        if f1_score > best_f1_score:
            torch.save(net.state_dict(), 'trained_net_fold_%d.pt' % fold)
            best_f1_score = f1_score

        print('[Epoch %d] lr: %.4f f1_score: %.4f\ttraining loss: %.4f' % (epoch + 1, lr, f1_score, running_loss / len(train_loader)))

def test(net, test_loader):
    confusion_matrix = torch.zeros(2, 2)
    correct = 0
    total = 0
    with torch.no_grad():
        for data in test_loader:
            inputs, labels = data
            inputs = inputs.float()
            if device == torch.device('cuda'):
                inputs = inputs.cuda()
                labels = labels.cuda()

            outputs = net(inputs)
            _, predicted = torch.max(outputs.data, 1)
            for t, p in zip(labels.view(-1), predicted.view(-1)):
                confusion_matrix[t.long(), p.long()] += 1

            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    f1_score = 2 * confusion_matrix[0][0] / (2 * confusion_matrix[0][0] + confusion_matrix[0][1] + confusion_matrix[1][0])
    return f1_score.item()

dataset = SeizureDataset(csv_path)
torch.manual_seed(0)
kf = KFold(n_splits=5, shuffle=True)
train_loaders = []
test_loaders = []
for i, (train_index, test_index) in enumerate(kf.split(dataset)):
    train_ = torch.utils.data.Subset(dataset, train_index)
    test_ = torch.utils.data.Subset(dataset, test_index)
    train_loaders.append(torch.utils.data.DataLoader(train_, batch_size=batch_size, shuffle=True))
    test_loaders.append(torch.utils.data.DataLoader(test_, batch_size=batch_size, shuffle=False))

torch.manual_seed(torch.initial_seed())
torch.save(test_loaders, 'test_loaders.pth')
assert(len(train_loaders) == len(test_loaders))

# Determine the baseline F1 score
df = pd.DataFrame(columns=['indentifier', 'fold', 'f1_score'])
for indentifier in range(100):
    test_loaders = torch.load('test_loaders.pth')
    for fold in range(len(test_loaders)):
        net = EEGNet().to(device)
        f1_score = test(net, test_loaders[fold])
        df = df.append({'indentifier': indentifier, 'fold': fold, 'f1_score': f1_score}, ignore_index=True)

    df.to_csv('baseline.csv', index=False)

# Determine the F1 score
for fold in range(len(train_loaders)):
    net = EEGNet().to(device)
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(net.parameters(), lr=init_lr)
    train(net, train_loaders[fold], test_loaders[fold], epochs=50, fold=fold)

fold_f1_scores = []
for fold in range(len(train_loaders)):
    net = EEGNet().to(device)
    net.load_state_dict(torch.load('trained_net_fold_%d.pt' % fold), strict=True)
    fold_f1_scores.append(test(net, test_loaders[fold]))
    print('f1_score of fold %d: %0.4f' % (fold, fold_f1_scores[fold]))

print('baseline -> mean: %0.4f\tstddev: %0.4f' % (np.mean(df['f1_score'].values), np.std(df['f1_score'].values)))
print('trained  -> mean: %0.4f\tstddev: %0.4f' % (np.mean(fold_f1_scores), np.std(fold_f1_scores)))

## 4. Network conversion

In [None]:
from memtorch.mn.Module import patch_model
from memtorch.map.Parameter import naive_map
from memtorch.bh.crossbar.Program import naive_program
from memtorch.bh.nonideality.NonIdeality import apply_nonidealities
import copy


test_loaders = torch.load('test_loaders.pth')
reference_memristor = memtorch.bh.memristor.VTEAM
reference_memristor_params = {'time_series_resolution': 1e-6,
                              'alpha_off': 1,
                              'alpha_on': 3,
                              'v_off': 0.5,
                              'v_on': -0.53,
                              'r_off': 2.5e3,
                              'r_on': 100,
                              'k_off': 4.03e-8,
                              'k_on': -80,
                              'd': 10e-9,
                              'x_on': 0,
                              'x_off': 10e-9}
    
# Determine the first baseline F1 score
df = pd.DataFrame(columns=['indentifier', 'fold', 'f1_score'])
for indentifier in range(100):
    for fold in range(len(test_loaders)):
        net = EEGNet()
        net.load_state_dict(torch.load('trained_net_fold_%d.pt' % fold), strict=True).to(device)
        patched_net = patch_model(copy.deepcopy(net),
                                  memristor_model=reference_memristor,
                                  memristor_model_params=reference_memristor_params,
                                  module_parameters_to_patch=[torch.nn.Linear],
                                  mapping_routine=naive_map,
                                  transistor=True,
                                  programming_routine=None,
                                  scheme=memtorch.bh.Scheme.DoubleColumn)

        for i, (name, m) in enumerate(list(patched_net.named_modules())):
            if isinstance(m, memtorch.mn.Linear):
                m.crossbars[0].conductance_matrix = m.crossbars[0].conductance_matrix.uniform_(1 / 2.5e3, 1 / 100)
                m.crossbars[1].conductance_matrix = m.crossbars[1].conductance_matrix.uniform_(1 / 2.5e3, 1 / 100)

        patched_net.tune_()
        f1_score = test(patched_net, test_loaders[fold])
        df = df.append({'indentifier': indentifier, 'fold': fold, 'f1_score': f1_score}, ignore_index=True)

    df.to_csv('baseline_variability.csv', index=False)

# Determine the second baseline F1 score
df_2 = pd.DataFrame(columns=['indentifier', 'fold', 'f1_score'])
for indentifier in range(100):
    for fold in range(len(test_loaders)):
        net = EEGNet().to(device)
        patched_net = patch_model(copy.deepcopy(net),
                                  memristor_model=reference_memristor,
                                  memristor_model_params=reference_memristor_params,
                                  module_parameters_to_patch=[torch.nn.Linear],
                                  mapping_routine=naive_map,
                                  transistor=True,
                                  programming_routine=None,
                                  scheme=memtorch.bh.Scheme.DoubleColumn)

        for i, (name, m) in enumerate(list(patched_net.named_modules())):
            if isinstance(m, memtorch.mn.Linear):
                m.crossbars[0].conductance_matrix = m.crossbars[0].conductance_matrix.uniform_(1 / 2.5e3, 1 / 100)
                m.crossbars[1].conductance_matrix = m.crossbars[1].conductance_matrix.uniform_(1 / 2.5e3, 1 / 100)

        f1_score = test(patched_net, test_loaders[fold])
        df_2 = df_2.append({'indentifier': indentifier, 'fold': fold, 'f1_score': f1_score}, ignore_index=True)

    df_2.to_csv('baseline_variability_no_tune.csv', index=False)

# Determine the F1 score
fold_f1_scores = []
for fold in range(len(test_loaders)):
    net = EEGNet()
    net.load_state_dict(torch.load('trained_net_fold_%d.pt' % fold), strict=False).to(device)
    patched_net = patch_model(net,
                              memristor_model=reference_memristor,
                              memristor_model_params=reference_memristor_params,
                              module_parameters_to_patch=[torch.nn.Linear],
                              mapping_routine=naive_map,
                              transistor=True,
                              programming_routine=None,
                              scheme=memtorch.bh.Scheme.DoubleColumn)

    patched_net.tune_()
    f1_score = test(patched_net, test_loaders[fold])
    fold_f1_scores.append(f1_score)

tuned_baseline = np.mean(df['f1_score'].values)
print('baseline          -> mean: %0.4f\tstddev: %0.4f' % (np.mean(df_2['f1_score'].values), np.std(df_2['f1_score'].values)))    
print('tuned baseline    -> mean: %0.4f\tstddev: %0.4f' % (np.mean(df['f1_score'].values), np.std(df['f1_score'].values)))
print('trained and tuned -> mean: %0.4f\tstddev: %0.4f' % (np.mean(fold_f1_scores), np.std(fold_f1_scores)))

In [None]:
import matplotlib.pyplot as plt


# Plot the hysteresis loop
hysteresis_loop_reference_memristor_params = {'time_series_resolution': 1e-6,
                              'alpha_off': 1,
                              'alpha_on': 3,
                              'v_off': 0.5,
                              'v_on': -0.53,
                              'r_off': memtorch.bh.StochasticParameter(loc=2.5e3, scale=50, min=2),
                              'r_on': memtorch.bh.StochasticParameter(loc=100, scale=25, min=1),
                              'k_off': 4.03e-8,
                              'k_on': -80,
                              'd': 10e-9,
                              'x_on': 0,
                              'x_off': 10e-9}

palette = ["#DA4453", "#8CC152", "#4A89DC", "#F6BB42", "#B600B0", "#535353"]
f = plt.figure(figsize=(16/3, 4))
plt.title('Hysteresis Loop')
plt.xlabel('Voltage (V)')
plt.ylabel('Current (A)')
j = 0
for i in range(10):
    j = j + 1
    if j == 6:
        j = 0

    memristor = reference_memristor(**hysteresis_loop_reference_memristor_params)
    voltage_signal, current_signal = memristor.plot_hysteresis_loop(duration=2, voltage_signal_amplitude=1, voltage_signal_frequency = 0.5, return_result=True)
    plt.plot(voltage_signal, current_signal, color=palette[j])

plt.grid()
plt.show()

## 5. Device-to-device variability investigation

In [None]:
# Determine the F1 score
df = pd.DataFrame(columns=['sigma', 'mean', 'std'])
sigma_values = np.linspace(0, 500, 21)
for sigma in sigma_values:
    non_linear_reference_memristor_params = {'time_series_resolution': 1e-6,
                              'alpha_off': 1,
                              'alpha_on': 3,
                              'v_off': 0.5,
                              'v_on': -0.53,
                              'r_off': memtorch.bh.StochasticParameter(loc=2.5e3, scale=sigma*2, min=1),
                              'r_on': memtorch.bh.StochasticParameter(loc=100, scale=sigma, min=1),
                              'k_off': 4.03e-8,
                              'k_on': -80,
                              'd': 10e-9,
                              'x_on': 0,
                              'x_off': 10e-9}
    f1_scores = []
    for fold in range(len(test_loaders)):
        net = EEGNet()
        net.load_state_dict(torch.load('trained_net_fold_%d.pt' % fold), strict=True).to(device)
        patched_net = patch_model(copy.deepcopy(net),
                                  memristor_model=reference_memristor,
                                  memristor_model_params=non_linear_reference_memristor_params,
                                  module_parameters_to_patch=[torch.nn.Linear],
                                  mapping_routine=naive_map,
                                  transistor=True,
                                  programming_routine=None,
                                  scheme=memtorch.bh.Scheme.DoubleColumn)

        patched_net.tune_()
        f1_score = test(patched_net, test_loaders[fold])
        f1_scores.append(f1_score)

    df = df.append({'sigma': sigma, 'mean': np.mean(f1_scores), 'std': np.std(f1_scores)}, ignore_index=True)

df.to_csv('variability.csv', index=False)

In [None]:
f = plt.figure(figsize=(16/3, 4))
plt.axhline(y=tuned_baseline, color='k', linestyle='--', zorder=1)
b = plt.bar(df['sigma'], df['mean'], width=12.5, zorder=2)
plt.xlabel('$\sigma$')
plt.ylabel('F1 Score')
for bar in b:
    bar.set_edgecolor('black')
    bar.set_facecolor(palette[0])
    bar.set_linewidth(1)

f.tight_layout()
plt.grid()
plt.ylim([0.9, 1.0])
plt.show()

## 6. Non-linear IV characteristics investigation

In [None]:
# Determine the F1 score
df = pd.DataFrame(columns=['sigma', 'mean', 'std'])
sigma_values = np.linspace(0, 500, 11)
f1_scores = []
for fold in range(len(test_loaders)):
    net = EEGNet()
    net.load_state_dict(torch.load('trained_net_fold_%d.pt' % fold), strict=False).to(device)
    patched_net = patch_model(net,
                              memristor_model=reference_memristor,
                              memristor_model_params=reference_memristor_params,
                              module_parameters_to_patch=[torch.nn.Linear],
                              mapping_routine=naive_map,
                              transistor=True,
                              programming_routine=None,
                              scheme=memtorch.bh.Scheme.DoubleColumn)

    patched_net = apply_nonidealities(patched_net,
                    non_idealities=[memtorch.bh.nonideality.NonIdeality.NonLinear],
                    sweep_duration=2,
                    sweep_voltage_signal_amplitude=1,
                    sweep_voltage_signal_frequency=0.5)

    patched_net.tune_()
    f1_score = test(patched_net, test_loaders[fold])
    f1_scores.append(f1_score)

print('mean: %0.4f\tstddev: %0.4f' % (np.mean(f1_scores), np.std(f1_scores)))   