In [2]:
from __future__ import print_function

import warnings
import matplotlib.pyplot as plt
import numpy as np
from keras.utils import to_categorical
from sklearn import metrics
from sklearn.metrics import confusion_matrix
from sklearn.utils import compute_class_weight

warnings.filterwarnings("ignore")

import numpy as np
from keras.models import Sequential
from keras.layers import Dense, Dropout, LSTM, Bidirectional, LeakyReLU
from keras.optimizers import Adamax
from sklearn.model_selection import train_test_split
import sklearn.model_selection as model_selection

In [1]:
class BLSTM:
    def __init__(self, data, name="", batch_size=64):
        vectors = np.stack(data.iloc[:, 1].values)
        labels = data.iloc[:, 0].values
        positive_idxs = np.where(labels == 1)[0]
        negative_idxs = np.where(labels == 0)[0]
        undersampled_negative_idxs = np.random.choice(negative_idxs, len(positive_idxs), replace=False)
        resampled_idxs = np.concatenate([positive_idxs, undersampled_negative_idxs])
        X_train, X_test, y_train, y_test = train_test_split(vectors[resampled_idxs, ], labels[resampled_idxs],test_size=0.2, stratify=labels[resampled_idxs])
        
        print("\n\ndata: ",data)
        print("\n\nvectors: ",vectors)
        print("\n\n\nlabels: ",str(set(labels)))
        print("shape of input - training set", X_train.shape)
        print("shape of output - training set", y_train.shape)
        print("shape of input - testing set", X_test.shape)
        print("shape of output - testing set", y_test.shape)
        print('\n\n\n\n')

        self.X_train = X_train
        self.X_test = X_test
        self.y_train = to_categorical(y_train)
        self.y_test = to_categorical(y_test)
        self.name = name
        self.batch_size = batch_size

        model = Sequential()
        model.add(Bidirectional(LSTM(300), input_shape=(50, 50)))
        model.add(Dense(300))
        model.add(LeakyReLU())
        model.add(Dropout(0.5))
        model.add(Dense(300))
        model.add(LeakyReLU())
        model.add(Dropout(0.5))
        model.add(Dense(2, activation='softmax'))
        # Lower learning rate to prevent divergence
        adamax = Adamax(lr=0.002)
        model.compile(adamax, 'categorical_crossentropy', metrics=['accuracy'])
        self.model = model

    
    ###Trains model based on training data

    def train(self):
        self.model.fit(self.X_train, self.y_train, batch_size=self.batch_size, epochs=4)
        # , class_weight=self.class_weight)
        self.model.save_weights(self.name + "_model.h5")

    
    ### Tests accuracy of model based on test data
    ### Loads weights from file if no weights are attached to model object
    
    def test(self):
        print('\n\nTesting Model *****')
        self.model.load_weights(self.name + "_model.h5")
        values = self.model.evaluate(self.X_test, self.y_test, batch_size=self.batch_size)
        print("\nAccuracy is...", values[1])
        predictions = (self.model.predict(self.X_test, batch_size=self.batch_size)).round()

        tn, fp, fn, tp = confusion_matrix(np.argmax(self.y_test, axis=1), np.argmax(predictions, axis=1)).ravel()
        
        print("Confusion Matrix : ")
        print(f"[{tp}] [{fp}]")
        print(f"[{fn}] [{tn}]")
        
        data = [[tp, fp],[fn, tn]]
        heatmap = plt.pcolor(data)
        plt.colorbar(heatmap)
        plt.show()

        print('\nFalse positive rate is :', fp / (fp + tn))
        print('\nFalse negative rate is :', fn / (fn + tp))
        recall = tp / (tp + fn)
        print('\nTrue positive rate is  :', recall)
        precision = tp / (tp + fp)
        print('\nPrecision is: ', precision)
        print('\nF1 score is : ', (2 * precision * recall) / (precision + recall))
        
        
    def predict(self, data, batch_size=64):
        vectors = np.stack(data.iloc[:, 1].values)
        X = vectors
        predictions = (self.model.predict(X, batch_size=batch_size)).round()
        print(predictions)