In [0]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=email%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdocs.test%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive.photos.readonly%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /content/drive


In [0]:
import numpy as np
import pandas as pd
from numpy import mean
from numpy import std
from numpy import dstack
from pandas import read_csv
import keras
from numpy import array
from numpy import argmax
from keras.layers import Layer
from keras.models import Sequential
from keras.models import Model
from keras.layers import Dense, Input, Flatten, Dropout, Reshape, Concatenate
from keras.layers import LSTM, ConvLSTM2D, TimeDistributed, Conv1D, CuDNNGRU 
from keras.layers import MaxPooling1D, BatchNormalization
from keras import initializers, regularizers, constraints, optimizers
from keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from keras.utils import to_categorical
from keras import backend as K
from matplotlib import pyplot as plt
import seaborn as sn
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix
%matplotlib inline

In [0]:
class AttentionWithoutContext(Layer):
    def __init__(self, step_dim,
                 W_regularizer=None, b_regularizer=None,
                 W_constraint=None, b_constraint=None,
                 bias=True, **kwargs):
        self.init = initializers.get('glorot_uniform')

        self.W_regularizer = regularizers.get(W_regularizer)
        self.b_regularizer = regularizers.get(b_regularizer)

        self.W_constraint = constraints.get(W_constraint)
        self.b_constraint = constraints.get(b_constraint)

        self.bias = bias
        self.step_dim = step_dim
        self.features_dim = 0
        super(AttentionWithoutContext, self).__init__(**kwargs)

    def build(self, input_shape):
        if(len(input_shape) != 3):
            print(len(input_shape))
        assert len(input_shape) == 3

        self.W = self.add_weight((input_shape[-1],),
                                 initializer=self.init,
                                 name='{}_W'.format(self.name),
                                 regularizer=self.W_regularizer,
                                 constraint=self.W_constraint)
        self.features_dim = input_shape[-1]

        if self.bias:
            self.b = self.add_weight((input_shape[1],),
                                     initializer='zero',
                                     name='{}_b'.format(self.name),
                                     regularizer=self.b_regularizer,
                                     constraint=self.b_constraint)
        else:
            self.b = None

        self.built = True


    def call(self, x):
        features_dim = self.features_dim
        step_dim = self.step_dim

        eij = K.reshape(K.dot(K.reshape(x, (-1, features_dim)),
                        K.reshape(self.W, (features_dim, 1))), (-1, step_dim))

        if self.bias:
            eij += self.b

        eij = K.tanh(eij)
        a = K.exp(eij)

        a /= K.cast(K.sum(a, axis=1, keepdims=True) + K.epsilon(), K.floatx())

        a = K.expand_dims(a)
        weighted_input = x * a
        return K.sum(weighted_input, axis=1)

    def compute_output_shape(self, input_shape):
        return input_shape[0],  self.features_dim


In [0]:
def dot_product(x, kernel):
    """
    Wrapper for dot product operation, in order to be compatible with both
    Theano and Tensorflow
    Args:
        x (): input
        kernel (): weights
    Returns:
    """
    if K.backend() == 'tensorflow':
        return K.squeeze(K.dot(x, K.expand_dims(kernel)), axis=-1)
    else:
        return K.dot(x, kernel)
    

class AttentionWithContext(Layer):
    """
        Attention operation, with a context/query vector, for temporal data.
        Supports Masking.
        Follows the work of Yang et al. [https://www.cs.cmu.edu/~diyiy/docs/naacl16.pdf]
        "Hierarchical Attention Networks for Document Classification"
        by using a context vector to assist the attention
        # Input shape
            3D tensor with shape: `(samples, steps, features)`.
        # Output shape
            2D tensor with shape: `(samples, features)`.
        :param kwargs:
        Just put it on top of an RNN Layer (GRU/LSTM/SimpleRNN) with return_sequences=True.
        The dimensions are inferred based on the output shape of the RNN.
        Example:
            model.add(LSTM(64, return_sequences=True))
            model.add(AttentionWithContext())
        """

    def __init__(self,
                 W_regularizer=None, u_regularizer=None, b_regularizer=None,
                 W_constraint=None, u_constraint=None, b_constraint=None,
                 bias=True,
                 return_attention=False, **kwargs):

        self.supports_masking = True
        self.return_attention = return_attention
        self.init = initializers.get('glorot_uniform')

        self.W_regularizer = regularizers.get(W_regularizer)
        self.u_regularizer = regularizers.get(u_regularizer)
        self.b_regularizer = regularizers.get(b_regularizer)

        self.W_constraint = constraints.get(W_constraint)
        self.u_constraint = constraints.get(u_constraint)
        self.b_constraint = constraints.get(b_constraint)

        self.bias = bias
        super(AttentionWithContext, self).__init__(**kwargs)

    def build(self, input_shape):
        assert len(input_shape) == 3

        self.W = self.add_weight((input_shape[-1], input_shape[-1],),
                                 initializer=self.init,
                                 name='{}_W'.format(self.name),
                                 regularizer=self.W_regularizer,
                                 constraint=self.W_constraint)
        if self.bias:
            self.b = self.add_weight((input_shape[-1],),
                                     initializer='zero',
                                     name='{}_b'.format(self.name),
                                     regularizer=self.b_regularizer,
                                     constraint=self.b_constraint)

        self.u = self.add_weight((input_shape[-1],),
                                 initializer=self.init,
                                 name='{}_u'.format(self.name),
                                 regularizer=self.u_regularizer,
                                 constraint=self.u_constraint)

        super(AttentionWithContext, self).build(input_shape)

    def compute_mask(self, input, input_mask=None):
        # do not pass the mask to the next layers
        return None

    def call(self, x, mask=None):
        uit = dot_product(x, self.W)

        if self.bias:
            uit += self.b

        uit = K.tanh(uit)
        # ait = K.dot(uit, self.u)
        ait = dot_product(uit, self.u)

        a = K.exp(ait)

        # apply mask after the exp. will be re-normalized next
        if mask is not None:
            # Cast the mask to floatX to avoid float64 upcasting in theano
            a *= K.cast(mask, K.floatx())

        # in some cases especially in the early stages of training the sum may be almost zero
        # and this results in NaN's. A workaround is to add a very small positive number ε to the sum.
        # a /= K.cast(K.sum(a, axis=1, keepdims=True), K.floatx())
        a /= K.cast(K.sum(a, axis=1, keepdims=True) + K.epsilon(), K.floatx())

        a = K.expand_dims(a)
        weighted_input = x * a
        result = K.sum(weighted_input, axis=1)

        if self.return_attention:
            return [result, a]
        return result

    def compute_output_shape(self, input_shape):
        if self.return_attention:
            return [(input_shape[0], input_shape[-1]),
                    (input_shape[0], input_shape[1])]
        else:
            return input_shape[0], input_shape[-1]

In [0]:
#gru model with attention
def get_attn_lstm_model(n_timesteps, n_features, n_outputs):
    inputs = Input(shape=(n_timesteps,n_features,))
    gru = CuDNNGRU(196,return_sequences=True) (inputs)
    gru = BatchNormalization() (gru)
    x1 = AttentionWithoutContext(n_timesteps)  (gru)
    x2 = AttentionWithContext() (gru)
    x = Concatenate() ([x1,x2])
    x = BatchNormalization() (x)
    x = Dense(512, activation='relu') (x)
    x = Dropout(0.2) (x)
    x = Dense(128, activation='relu') (x)
    predictions = Dense(n_outputs, activation='softmax') (x)
    model = Model(inputs=inputs, outputs=predictions)
    return model
    

In [0]:
#2 layer gru model with attention
def get_attn_lstm_model2(n_timesteps, n_features, n_outputs):
    inputs = Input(shape=(n_timesteps,n_features,))
    gru1 = CuDNNGRU(200,return_sequences=True) (inputs)
    gru1 = BatchNormalization() (gru1)
    xg1 = AttentionWithoutContext(n_timesteps)  (gru1)
    xg2 = AttentionWithContext() (gru1)
    gru2 = CuDNNGRU(200,return_sequences=True) (gru1)
    gru2 = BatchNormalization() (gru2)
    xg3 = AttentionWithoutContext(n_timesteps)  (gru2)
    xg4 = AttentionWithContext() (gru2)
    x = Concatenate() ([xg1,xg2,xg3,xg4])
    x = BatchNormalization() (x)
    x = Dense(512, activation='relu') (x)
    x = Dropout(0.25) (x)
    x = Dense(128, activation='relu') (x)
    x = Dropout(0.1) (x)
    predictions = Dense(n_outputs, activation='softmax') (x)
    model = Model(inputs=inputs, outputs=predictions)
    return model
    

In [0]:




def getxy(df,sub,act,n):
  
  b=df[act]
  a=df.drop(columns=[sub,act])
  
  
  x=a.values
  y=b.values
  
  a=x.reshape(x.shape[0]//n,n,-1)
  b=y.reshape(y.shape[0]//n,n,-1)
  
  return a,b


def to_cata(col):
  data = col.values
  data = array(data)
  encoded = to_categorical(data)
  
  return encoded
  
  
  
def recall_m(y_true, y_pred):
        true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
        possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
        recall = true_positives / (possible_positives + K.epsilon())
        return recall

def precision_m(y_true, y_pred):
        true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
        predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
        precision = true_positives / (predicted_positives + K.epsilon())
        return precision

def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))



def getc(y_test,y_pred):
  matrix = confusion_matrix(y_test, y_pred)
  return matrix

In [None]:
verbose, epochs, batch_size = 0, 10, 128
earlyStopping = EarlyStopping(monitor='val_acc', patience=20,verbose=0, mode='max')
mcp_save = ModelCheckpoint('test_3_best.hdf5', save_best_only=True, monitor='val_acc', mode='max')
reduce_lr_loss = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=6, verbose=0, min_delta=1e-4, mode='min')
from sklearn.metrics import roc_auc_score, precision_score, recall_score,accuracy_score, confusion_matrix, f1_score



from keras import backend as K
from sklearn.metrics import classification_report, confusion_matrix




df=pd.read_csv('drive/My Drive/total.csv')

a=df['subject'].unique()
N=50
#n_timesteps, n_features, n_outputs = train_x.shape[1], train_x.shape[2], train_y.shape[1]
#model = get_attn_lstm_model2(n_timesteps, n_features, n_outputs)

#model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

avac=[]
avrec=[]
avprec=[]
avf=[]

for i in range(6):
  train=df[(df.subject != a[i])]
  test=df.loc[df['subject'].isin([a[i]])]
  
  train_x,train_y=getxy(train,'subject','activityID',N)
  
  test_x,test_y=getxy(test,'subject','activityID',N)
  train_y = to_categorical(train_y[:,0,0])
  testgt = test_y[:,0,0]
  test_y = to_categorical(test_y[:,0,0])
  
  n_timesteps, n_features, n_outputs = train_x.shape[1], train_x.shape[2], train_y.shape[1]
  model = get_attn_lstm_model2(n_timesteps, n_features, n_outputs)
  model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
  
  
  
  model.fit(train_x, train_y, epochs=epochs, batch_size=batch_size,
              verbose=verbose, validation_data=(test_x, test_y),
             callbacks=[earlyStopping, mcp_save,reduce_lr_loss])

#predict
  test_model = get_attn_lstm_model2(n_timesteps,n_features,n_outputs)
  test_model.load_weights('test_3_best.hdf5')
#   modelpred = test_model.predict(test_x,batch_size=batch_size, verbose=0)
#   pred = prediction(modelpred)
#   print(test_x.shape, testgt.shape, pred.shape)

# evaluate model
  
  test_model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['acc',f1_m,precision_m, recall_m])
  loss, accuracy, f1_score, precision, recall= test_model.evaluate(test_x, test_y, batch_size=batch_size, verbose=0)
  print("folding",i+1)
  print("loss",loss)
  print('accuracy: ' + str(accuracy))
  print('precision: ' + str(precision))
  print('recall: ' + str(recall))
  print('F score: ' + str(f1_score))
  print(" ")
  avac.append(accuracy)
  avrec.append(recall)
  avprec.append(precision)
  avf.append(f1_score)
  

print("************average result********")
print(" ")
print('accuracy: ' + str(mean(avac)))
print('precision: ' + str(mean(avprec)))
print('recall: ' + str(mean(avrec)))
print('F score: ' + str(mean(avf)))
print(" ")