In [2]:
#Initialization for colab
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [3]:
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
from scipy import signal


#file: an .atr file
#output: two lists of lists. d_periods is a list of two value lists representing
#         the start and end of dummy periods and c_periods is a list of the same
#         but for contraction periods.
def segment(file):
  data = np.loadtxt(file, dtype=str)
  p = 0
  C_ints = []
  D_ints = []
  while(p < data.shape[0]):
    t = []
    t.append(int(data[p,1])-1)
    t.append(int(data[p+1,1])-1)
    if(data[p][6] == 'BD'):
      D_ints.append(t)
    else:
      C_ints.append(t)
    p += 2
  return D_ints, C_ints


#file: a .dat file containing the frequency info
#cutoff: a tuple of the lower and upper cutoff frequencies
#dataset: which dataset we're using. 0 refers to 'TPEHGT', any other value
#         refers to 'TPEHG'
#d_periods: default is None. If not None, it must be a list of two value lists
#         representing the start and end of dummy periods during measurement.
#c_periods: default is None. If d_periods is None, this must be None as
#         well. If not None, it must be a list of two-value lists representing
#         the start and end of contraction periods during measurement.
#Output: If d_periods is None, returns a nx4 array, with each column representing
#         each signal filtered through four pole bi-directional bandpass 
#         butterworth filtering with sample frequency of 20Hz.
#        If d_periods is not None, returns two lists, each containing four
#         column arrays of segments from the above, representing the filtered
#         signals over a specific period of time. First list represents dummy
#         periods and the second contraction periods.
def filter(file, cutoff, dataset, d_periods=None, c_periods=None):
  data = np.loadtxt(file)
  if(dataset == 0):
    data_ind = [1,3,5]
  else:
    data_ind = [1,5,9]
  data_signals = np.empty([data.shape[0], 4])
  data_signals[:,0:3] = data[:,data_ind]
  data_signals[:,3] = data_signals[:,0] - data_signals[:,1] +data_signals[:,2]

  sig_out = np.empty(data_signals.shape)
  a,b = signal.butter(4, cutoff, btype='bandpass', fs=20)
  for i in range(4):
    sig_out[:,i] = signal.filtfilt(a,b,data_signals[:,i])
  if(d_periods == None):
    assert(c_periods == None)
    return sig_out
  else:
    assert(c_periods != None)
    D_freqs = []
    C_freqs = []
    for d in d_periods:
      D_freqs.append(sig_out[d[0]:d[1]+1,:])
    for c in c_periods:
      C_freqs.append(sig_out[c[0]:c[1]+1,:])
    return D_freqs, C_freqs



def getStats(path):
    f=open(path)
    lines=f.readlines()
    lin_gst = lines[15].split(' ')[5]
    lin_rtm = lines[16].split(' ')[5]
    lin_age = lines[17].split(' ')[5]
    res = []
    #if(lin_gst=='None\n'):
    #    res.append(-1);
    #else:
    res.append(float(lin_gst))
    #if(lin_rtm=='None\n'):
    #    res.append(-1);
    #else:
    res.append(float(lin_rtm))
    if(lin_age=='None\n'):
        res.append(-1);
    else:
        res.append(float(lin_age))
    return res



In [4]:
path_head_pre = "/content/gdrive/MyDrive/lvrk_data2/tpehg"
path_head_post = ".hea"
path_text_pre = "/content/gdrive/MyDrive/lvrk_data2/txt/tpehg"
path_text_post = ".txt"

Length = 15000
n = 300

X = np.zeros([n, Length, 16])
y = np.zeros(n, dtype='int')

cutoff1 = [0.08,1]
cutoff2 = [1, 2.2]
cutoff3 = [2.2, 3.5]
cutoff4 = [3.5, 5]



j = 0
for i in range(540, 1760):
    path_txt = path_text_pre + str(i) + path_text_post
    path_head = path_head_pre + str(i) +path_head_post
    data_ind = [1,5,9]
    if(Path(path_txt).is_file()):        
        A = filter(path_txt, cutoff1, 1)
        B = filter(path_txt, cutoff2, 1)
        C = filter(path_txt, cutoff3, 1)
        D = filter(path_txt, cutoff4, 1)

        A = A[60:(Length+60), :]
        B = B[60:(Length+60), :]
        C = C[60:(Length+60), :]
        D = D[60:(Length+60), :]

        X[j, :, 0:4] = A
        X[j, :, 4:8] = B
        X[j, :, 8:12] = C
        X[j, :, 12:16] = D

        d = getStats(path_head)
        if(d[0] < 37):
          y[j] = 1
        else:
          y[j] = 0
        j = j+1


print(X.shape)    
print(y.shape)

(300, 15000, 16)
(300,)


In [5]:
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split

print(X.shape, y.shape)

x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=50, random_state = 48)

#Upsampling:
ones = np.where(y_train == 1)
zeros = np .where(y_train == 0)

x_ones = x_train[ones[0], :, :]
x_zeros = x_train[zeros[0], :, :]

one_size = x_ones.shape[0]
zero_size = x_zeros.shape[0]

x_train_usp = x_zeros
y_train_usp = np.zeros(zero_size)
y_ones = np.ones(one_size)

target_size = zero_size*2

while(y_train_usp.shape[0] < target_size):
  x_train_usp = np.concatenate((x_train_usp, x_ones), axis=0)
  y_train_usp = np.concatenate((y_train_usp, y_ones), axis=0)

x_train_usp, y_train_usp = shuffle(x_train_usp, y_train_usp)

#print(x_train_usp)
#print(y_train_usp)


(300, 15000, 16) (300,)


In [19]:
#https://machinelearningmastery.com/cnn-models-for-human-activity-recognition-time-series-classification/
### LOAD PACKAGES
import numpy as np
from pandas import read_csv, DataFrame
from sklearn.preprocessing import minmax_scale
from keras.layers.convolutional import Conv1D, MaxPooling1D
from keras.optimizers import SGD, Adam
from keras.models import Sequential
from keras.regularizers import l2
from keras.layers import Dense, Flatten, Dropout
from keras.utils.np_utils import to_categorical

np.random.seed(1984)
model = Sequential()
model.add(Conv1D(filters=32, kernel_size=3, activation='relu', input_shape=[Length, 16]))
model.add(Conv1D(filters=32, kernel_size=3, activation='relu'))
model.add(Dropout(0.5))
model.add(MaxPooling1D(pool_size=2))
model.add(Flatten())
model.add(Dense(100, activation='relu'))
model.add(Dropout(0.5))
reg = l2(0.2)
model.add(Dense(2, activation='softmax', kernel_regularizer=reg))
opt = Adam(learning_rate=0.00001)
model.compile(loss='binary_crossentropy', optimizer=opt, metrics=['accuracy'])
model.fit(x_train_usp, to_categorical(y_train_usp), batch_size = 500, epochs = 20, verbose = 1)


Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<tensorflow.python.keras.callbacks.History at 0x7f03d00f8a50>

In [20]:
from sklearn.metrics import balanced_accuracy_score

y_pred = model.predict_classes(x_test)
score = balanced_accuracy_score(y_test, y_pred)
print("Accuracy: ", score)



Accuracy:  0.5




In [None]:
import numpy as np
from pandas import read_csv, DataFrame
from sklearn.preprocessing import minmax_scale
from keras.layers.convolutional import Conv1D, MaxPooling1D
from keras.optimizers import SGD, Adam
from keras.regularizers import l2
from keras.models import Sequential
from keras.layers import Dense, Flatten, Dropout
from keras.utils.np_utils import to_categorical

np.random.seed(1984)
conv = Sequential()
conv.add(Conv1D(20, 4, input_shape = [Length, 16], activation='relu'))
conv.add(MaxPooling1D(2))
conv.add(Flatten())
reg = l2(0.001)
conv.add(Dense(2, activation='sigmoid', kernel_regularizer=reg))
#sgd = SGD(lr = 0.1, momentum = 0.9, decay = 0, nesterov = False)
opt = Adam(learning_rate=0.00001)
conv.compile(loss = 'binary_crossentropy', optimizer = opt, metrics = ['accuracy'])
conv. fit(x_train_usp, to_categorical(y_train_usp), batch_size = 250, epochs = 20, verbose = 1)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<tensorflow.python.keras.callbacks.History at 0x7f24f5f951d0>