In [1]:
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import pandas as pd
from ripser import ripser
from persim import plot_diagrams
from scipy.spatial.distance import pdist, squareform
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.svm import LinearSVC, SVC, SVR
from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.metrics import confusion_matrix
from gtda.time_series import TakensEmbedding
from PyEMD import EMD
from statsmodels.tsa.stattools import adfuller
from pylab import mpl
from sklearn import preprocessing
from scipy.io import arff
import scipy
import sklearn
import torch
from tqdm import tqdm
from gtda.time_series import TakensEmbedding
from gtda.diagrams import BettiCurve

%matplotlib qt

In [2]:
filepath = "../dataset/EEGEyeState.arff"
data = arff.loadarff(filepath)
df = pd.DataFrame(data[0])

sample = df.values[:, 0:len(df.values[0])-1]
label = df.values[:, -1].astype(int).reshape(-1,1)
data = np.concatenate((sample, label),axis=1)
print(data.shape)

(14980, 15)


In [3]:
sample = sklearn.preprocessing.scale(sample, axis=1)
print(sample)

[[ 0.06779371 -1.64898784 -0.14680398 ... -0.19358628  1.71306058
   0.41447629]
 [ 0.06355243 -1.64953356 -0.10117149 ... -0.17804623  1.71346838
   0.38197229]
 [ 0.08411988 -1.65479873 -0.09089869 ... -0.16310536  1.71475557
   0.4174731 ]
 ...
 [-0.05419791 -1.54833733 -0.2145728  ... -0.15578084  1.58421115
   0.26921002]
 [-0.0467027  -1.57767428 -0.22101063 ... -0.13788237  1.58339541
   0.29914223]
 [-0.05442066 -1.57772692 -0.19974487 ... -0.12437991  1.57390319
   0.27663931]]


In [4]:
# DL part

class VanillaRNN(torch.nn.Module):
    def __init__(self, input_size, hidden_size, num_layer=2, batch_first=True):
        super(VanillaRNN, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layer = num_layer
        self.batch_first = batch_first

        self.lstm = torch.nn.LSTM(input_size=input_size, hidden_size=hidden_size, \
            num_layers=num_layer, batch_first=batch_first)
        self.mlp = torch.nn.Linear(in_features=hidden_size, out_features=1)
        self.sigmoid = torch.nn.Sigmoid()

    def forward(self, x):
        # x: [batch_size, seq_len, input_size=1] need check
        x = x.unsqueeze(2)    
        x, _ = self.lstm(x)
        x = x[:, -1, :]
        x = self.mlp(x)
        x = x.view(-1, 1)
        x = self.sigmoid(x)
        return x

In [5]:
input_size = 1
seq_len = 14
batch_size = 20
hidden_size = 28
epochs = 120
device=torch.device("cuda" if torch.cuda.is_available() else "cpu")

data = np.concatenate((sample, label),axis=1)
data_train, data_test = train_test_split(data)

train_loader = torch.utils.data.DataLoader(
    dataset = torch.utils.data.TensorDataset(torch.Tensor(data_train[:, :-1]).to(device), torch.Tensor(data_train[:, -1]).to(device)),
    batch_size = batch_size,
    shuffle = True
)

model = VanillaRNN(input_size=input_size, hidden_size=hidden_size)
loss_function = torch.nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

model.to(device)

model.train()
for i in range(epochs):
    for seq, labels in train_loader:
        optimizer.zero_grad()
        y_pred = model(seq).squeeze()
        single_loss = loss_function(y_pred, labels)
        single_loss.backward()
        optimizer.step()
    print("Train Step:", i, " loss: ", single_loss)


Train Step: 0  loss:  tensor(0.7631, device='cuda:0', grad_fn=<BinaryCrossEntropyBackward0>)
Train Step: 1  loss:  tensor(0.6677, device='cuda:0', grad_fn=<BinaryCrossEntropyBackward0>)
Train Step: 2  loss:  tensor(0.7050, device='cuda:0', grad_fn=<BinaryCrossEntropyBackward0>)
Train Step: 3  loss:  tensor(0.6287, device='cuda:0', grad_fn=<BinaryCrossEntropyBackward0>)
Train Step: 4  loss:  tensor(0.6068, device='cuda:0', grad_fn=<BinaryCrossEntropyBackward0>)
Train Step: 5  loss:  tensor(0.5991, device='cuda:0', grad_fn=<BinaryCrossEntropyBackward0>)
Train Step: 6  loss:  tensor(0.5171, device='cuda:0', grad_fn=<BinaryCrossEntropyBackward0>)
Train Step: 7  loss:  tensor(0.7072, device='cuda:0', grad_fn=<BinaryCrossEntropyBackward0>)
Train Step: 8  loss:  tensor(0.6694, device='cuda:0', grad_fn=<BinaryCrossEntropyBackward0>)
Train Step: 9  loss:  tensor(0.4735, device='cuda:0', grad_fn=<BinaryCrossEntropyBackward0>)
Train Step: 10  loss:  tensor(0.5220, device='cuda:0', grad_fn=<Binary

In [6]:
model.eval()
test_loader = torch.utils.data.DataLoader(
    dataset = torch.utils.data.TensorDataset(torch.Tensor(data_test[:, :-1]).to(device), torch.Tensor(data_test[:, -1]).to(device)),
    batch_size = batch_size,
    shuffle = False
)

pred = np.zeros(batch_size)
true = np.zeros(batch_size)

for seq, labels in test_loader:
    y_pred = model(seq).squeeze()
    pred = np.append(pred, np.round(y_pred.cpu().detach().numpy()))
    true = np.append(true, labels.cpu().detach().numpy())

print(accuracy_score(true, pred))
print(precision_score(true, pred))
print(recall_score(true, pred))
print(f1_score(true, pred))

0.8764940239043825
0.8477389811104751
0.881547619047619
0.8643128100379341


In [8]:
te = TakensEmbedding(time_delay=1, dimension=3)
sample_train_tda = te.fit_transform(data_train[:, :-1])
sample_test_tda = te.fit_transform(data_test[:, :-1])

feature_train_list = []
feature_test_list = []

for i in tqdm(range(sample_train_tda.shape[0])):
    r = ripser(sample_train_tda[i, :, :])
    feature = np.delete(r['dgms'][0], -1, axis=0)[:, 1]
    # feature2 = r['dgms'][1]
    f = [feature.shape[0], np.sum(feature), np.mean(feature), np.std(feature), np.max(feature), np.min(feature), len([_ for _ in feature if _>0.5*np.max(feature)]), len([_ for _ in feature if _>np.mean(feature)])]
    f = [np.round(_, 2) for _ in f]
    feature_train_list.append(np.array(f))

for i in tqdm(range(sample_test_tda.shape[0])):
    r = ripser(sample_test_tda[i, :, :])
    feature = np.delete(r['dgms'][0], -1, axis=0)[:, 1]
    f = [feature.shape[0], np.sum(feature), np.mean(feature), np.std(feature), np.max(feature), np.min(feature), len([_ for _ in feature if _>0.5*np.max(feature)]), len([_ for _ in feature if _>np.mean(feature)])]
    f = [np.round(_, 2) for _ in f]
    feature_test_list.append(np.array(f))

100%|██████████| 11235/11235 [00:05<00:00, 2229.36it/s]
100%|██████████| 3745/3745 [00:01<00:00, 2223.77it/s]


In [97]:
i = 1
r = ripser(sample_test_tda[i, :, :])
print(r['dgms'][1])
print(data_train[i, -1])

feature2 = r['dgms'][1]
sub = np.zeros(1)
for i in range(feature2.shape[0]):
    sub = np.append(sub, feature2[i, 1]-feature2[i, 0])



# print(feature2.shape[0], np.min(feature2.flatten()), np.max(feature2.flatten()))

[[2.11420989 2.27933598]
 [2.09824705 2.13405943]]
0.0
[0.16512609 0.03581238]


In [None]:
feature_train_pd = pd.DataFrame(np.concatenate((np.array(feature_train_list), data_train[:, -1].reshape(-1, 1)), axis=1), \
    columns=['Number of TDA barcode points', 'Sum of lifetime', 'Average of lifetime', 'Std of lifetime', 'Max of lifetime', \
        'Min of lifetime', 'Number of points bigger than 0.5*max', 'Number of points bigger than average', 'Class'])

print(feature_train_pd.corr())

In [18]:
X_train = feature_train_pd[['Number of TDA barcode points', 'Sum of lifetime', 'Average of lifetime', 'Std of lifetime', 'Max of lifetime', \
        'Min of lifetime', 'Number of points bigger than 0.5*max', 'Number of points bigger than average']]
Y_train = feature_train_pd[['Class']]

clf = SVC(C=0.3, kernel='linear')
clf.fit(X_train, np.ravel(Y_train))

SVC(C=0.3, kernel='linear')

In [19]:
feature_test_pd = pd.DataFrame(np.concatenate((np.array(feature_test_list), data_test[:, -1].reshape(-1, 1)), axis=1), \
    columns=['Number of TDA barcode points', 'Sum of lifetime', 'Average of lifetime', 'Std of lifetime', 'Max of lifetime', \
        'Min of lifetime', 'Number of points bigger than 0.5*max', 'Number of points bigger than average', 'Class'])
X_test = feature_test_pd[['Number of TDA barcode points', 'Sum of lifetime', 'Average of lifetime', 'Std of lifetime', 'Max of lifetime', \
        'Min of lifetime', 'Number of points bigger than 0.5*max', 'Number of points bigger than average']]

Y_test = feature_test_pd[['Class']]

Y_pred = clf.predict(X_test)
print(accuracy_score(y_pred=Y_pred, y_true=Y_test))

0.5703604806408544
