In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix
from sklearn.utils import shuffle
from keras.callbacks import EarlyStopping

import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc

In [None]:
targetUser = 's002'

In [None]:
data = pd.read_csv('DSL-StrongPasswordData.csv') # import the dataset
trainSet = data.copy() # create a copy that will be used to split the test and train sets

In [None]:
X_test = np.empty((0, 31))
y_test = np.empty(0,)

In [None]:
for user in data['subject'].unique()[0:3]:
    
    # Select 50 random records for this user
    user_records = trainSet[trainSet['subject'] == user].sample(n=50, random_state=42)
    
    #Remove record from trainSet
    trainSet = trainSet.drop(user_records.index)
    
    # Append the typing data to X_test
    X_test = np.vstack((X_test, user_records[['H.period', 'DD.period.t', 'UD.period.t', 'H.t', 'DD.t.i', 'UD.t.i', 'H.i', 'DD.i.e', 'UD.i.e', 'H.e', 'DD.e.five', 'UD.e.five', 'H.five', 'DD.five.Shift.r', 'UD.five.Shift.r', 'H.Shift.r', 'DD.Shift.r.o', 'UD.Shift.r.o', 'H.o', 'DD.o.a', 'UD.o.a', 'H.a', 'DD.a.n', 'UD.a.n', 'H.n', 'DD.n.l', 'UD.n.l', 'H.l', 'DD.l.Return', 'UD.l.Return', 'H.Return']].values))
    
    # Append the subject labels to y_test
    y_test = np.hstack((y_test, np.where(user_records['subject'] == targetUser, 1, 0)))
    
trainSet['subject'] = np.where(trainSet['subject'] == targetUser, 1, 0)

In [None]:
#scaler = StandardScaler()
#X_test = scaler.fit_transform(X_test) # Normalising X for testSet

In [None]:
model = tf.keras.models.Sequential()

In [None]:
model.add(tf.keras.layers.Dense(128, input_shape=X_test.shape[1:], activation='relu'))
model.add(tf.keras.layers.Dense(128, activation='relu'))
model.add(tf.keras.layers.Dense(128, activation='relu'))
model.add(tf.keras.layers.Dense(1, activation='sigmoid'))

In [None]:
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

In [None]:
early_stopping = EarlyStopping(
    monitor='val_loss',  # metric to monitor
    patience=10,  # number of epochs with no improvement after which training will be stopped
    verbose=1,  # whether to print updates to the console
    restore_best_weights=True  # whether to restore the weights from the epoch with the best monitored metric
)

In [None]:
train_sizes = [25, 50, 75, 100, 125, 150, 175, 200, 225, 250, 275, 300, 325, 350]

roc_auc_scores = []
roc_auc_binary_scores = []

# Loop through each training data subset size and train/evaluate the model
for size in train_sizes:
    # Reset the model to an untrained state
    model.reset_states()
    
    user_records = shuffle(trainSet[trainSet['subject'] == 1], random_state = 42)
    non_user_records = shuffle(trainSet[trainSet['subject'] == 0], random_state = 42)
    
    train_user_records = user_records.groupby('subject').apply(lambda x: x.sample(size))
    train_non_user_records = non_user_records.groupby('subject').apply(lambda x: x.sample(size))
    
    # Subset the training data to the specified size
    train_data_subset = pd.concat([train_user_records, train_non_user_records])
    
    # Shuffle the data
    train_data_subset = shuffle(train_data_subset, random_state = 42)
    
    # Split dataset into X and y
    X_train = train_data_subset.drop(['subject', 'sessionIndex', 'rep'], axis=1).values
    y_train = train_data_subset['subject']
    
    # Normalising X
    #X_train = scaler.fit_transform(X_train) # Normalising X for trainSet
    
    
    # Train the model on the subset of training data
    model.fit(X_train, y_train, epochs=50, validation_data=(X_test, y_test), callbacks=[early_stopping])
    
    # Predict the test set
    prediction = model.predict(X_test)
    
    # Classify the prediction results
    prediction_binary = np.where(prediction > 0.90, 1, 0)
    
    # Calculate AUC based on prediction
    # Calculate false-pos and true-pos rates
    fpr, tpr, thresholds = roc_curve(y_test, prediction_binary)
    roc_auc = auc(fpr, tpr)    
    
    tn, fp, fn, tp = confusion_matrix(y_test, prediction_binary, labels=[0,1]).ravel()
    
    if tp + fp > 0:
        precision = tp / (tp + fp)
    else:
        precision = 0
    
    if tp + fn > 0:
        recall = tp / (tp + fn)
    else:
        recall = 0

    if precision + recall > 0:
        f1 = 2 * (precision * recall) / (precision + recall)
    else:
        f1 = 0
    
    # Evaluate the model on the test set and record the ROC AUC score
    roc_auc_binary_scores.append({'Size': size, 'FPR': fpr, 'TPR': tpr, 'AUC': roc_auc, 'F1': f1, 'P': precision, 'R': recall})
    # Calculate AUC based on prediction
    # Calculate false-pos and true-pos rates
    fpr, tpr, thresholds = roc_curve(y_test, prediction)
    roc_auc = auc(fpr, tpr)
    
    # Evaluate the model on the test set and record the ROC AUC score
    roc_auc_scores.append({'Size': size, 'FPR': fpr, 'TPR': tpr, 'AUC': roc_auc})


In [None]:
for x in roc_auc_scores:
    plt.plot(x['FPR'], x['TPR'], label='{} Records Per User (AUC = {:.2f})'.format(x['Size'], x['AUC']))

    
plt.plot([0, 1], [0, 1], 'k--', label='Random guessing Line')  # Random guessing line
plt.xlabel('False Positive Rate (FPR)')
plt.ylabel('True Positive Rate (TPR)')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right", fontsize=9)
plt.show()

In [None]:
for x in roc_auc_binary_scores:
    plt.plot(x['FPR'], x['TPR'], label='{} Records Per User (AUC = {:.2f})'.format(x['Size'], x['AUC']))

plt.plot([0, 1], [0, 1], 'k--', label='Random guessing Line')  # Random guessing line
plt.xlabel('False Positive Rate (FPR)')
plt.ylabel('True Positive Rate (TPR)')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right", fontsize=8)
plt.show()

In [None]:
AUC = []
Binary_AUC = []
F1 = []
Precision = []
Recall = []

for x in roc_auc_scores:
    AUC.append(x['AUC'])
    
for x in roc_auc_binary_scores:
    
    Binary_AUC.append(x['AUC'])
    F1.append(x['F1'])
    Precision.append(x['P'])
    Recall.append(x['R'])

In [None]:
plt.plot(train_sizes, Binary_AUC, label="Classified AUC with 0.90 Threshold")
plt.xlabel('Number of Records Per Class')
plt.ylabel('AUC Value')
plt.title('Comparison of Area under ROC Curves')
plt.legend(loc="lower right", fontsize=8)
plt.show()

In [None]:
for x in range(1, len(Binary_AUC)):
    rateOfIncrease = ((Binary_AUC[x] - Binary_AUC[x-1]))
    print(str(round(rateOfIncrease, 4)))
    
for x in range(0, len(Binary_AUC)):
    print(round(Binary_AUC[x], 2))

In [None]:
plt.plot(train_sizes, F1, label="F1 Scores with 0.75 Threshold")
plt.plot(train_sizes, Precision, label="Precision Scores with 0.75 Threshold")
plt.plot(train_sizes, Recall, label="Recall Scores with 0.75 Threshold")
plt.title('F1, Precision and Recall Score')
plt.xlabel('Number of Records Per Class')
plt.ylabel('Score')
plt.legend(loc="lower right", fontsize=8)
plt.show()

In [None]:
print("F1:", F1)
print("Precision:", Precision)
print("Recall:", Recall)

print(len(prediction))

In [None]:
for x in range(1, len(F1)):
    rateOfIncrease = (F1[x] - F1[x-1])
    print(str(round(rateOfIncrease, 4)))
    
for x in range(0, len(F1)):
    print(round(F1[x], 2))