<a href="https://colab.research.google.com/github/TheYoungBeast/Data-Augmentation-Keystrokes-Dynamics/blob/main/Data_Augmentation_Keystrokes_Dynamics.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import tensorflow as tf
from tensorflow import keras
import numpy as np

print(tf.__version__)

2.8.2


In [2]:
import math
from numpy.ma.core import array

<h1>Data Types</h1>


In [3]:
from typing import NamedTuple

class Set(NamedTuple):
  id: int
  keystrokes: array = []
  aug: bool = False

class Keystroke(NamedTuple):
  key: str
  up: int
  down: int
  dtime: int

<h1>Loading dataset</h1>

In [4]:
dataset = {};

for i in range(1, 15):
  dataset[str(i)] = []

  for j in range(0, 3):
    filepath = "drive/MyDrive/Keystrokes/#" + str(i).zfill(2) + "_" + str(j)  +".txt"
    dataset[str(i)].append(Set(i-1, []))

    with open(filepath, 'r') as f:
            line = f.readline()
            while line:

              if len(line.strip().split(',')) == 3:
                key, down, up = line.strip().split(',')
                keystroke = Keystroke(key, abs(int(up)), abs(int(down)), abs(int(down) - int(up)))
                dataset[str(i)][-1].keystrokes.append(keystroke)

              line = f.readline()

<h1>Preprocessing and Normalization</h1>

In [5]:
def preprocess(set):
  dict = {}
  count = {}

  for keystroke in set.keystrokes:
    if not keystroke.key:
      continue

    if keystroke.key not in dict:
      dict[keystroke.key] = 0
      count[keystroke.key] = 0

    dict[keystroke.key] += abs(keystroke.dtime) # Absolute value, dwell time
    count[keystroke.key] += 1

  # Average dwell time per Key
  avgs = []
  
  for key in dict:
    avgs.append(math.floor(dict[key]/count[key]))

  if len(avgs) is not 27:
    raise Exception('Dataset is corrupted', 'The number of unique keys in the set exceeds 27')

  return avgs, set.id

In [6]:
def normalize(data):
  norm_data = []
  max = np.max(data)

  for i in range(len(data)):
    norm_data.append(data[i] / max) # local normalization

  return norm_data

In [7]:
def convert_all(dataset):
  train_data = []
  test_data = []
  train_labels = []
  test_labels = []

  for key in dataset:
    for i in range(0, len(dataset[key])-1): # skip last set
      tdata, sid = preprocess(dataset[key][i])

      train_data.append(normalize(tdata)) # normalization
      train_labels.append(sid)
    
    tdata, sid = preprocess(dataset[key][-1]) # treat last set as test data
    test_data.append(normalize(tdata)) # normalizaton
    test_labels.append(sid)

  return train_data, train_labels, test_data, test_labels

In [8]:
train_data, train_labels, test_data, test_labels = convert_all(dataset)

assert len(test_data) == len(test_labels), 'The size of data is not equal'
assert len(train_data) == len(train_labels), 'The size of data is not equal'

print(train_data[0])

[0.30760095011876487, 0.23871733966745842, 0.29453681710213775, 0.38836104513064135, 0.47268408551068886, 0.35510688836104515, 0.3016627078384798, 0.34441805225653205, 0.3669833729216152, 0.45486935866983375, 0.48931116389548696, 0.23990498812351543, 0.5831353919239906, 0.40498812351543945, 0.2529691211401425, 0.3087885985748218, 0.38954869358669836, 0.47862232779097386, 0.4358669833729216, 0.27909738717339666, 0.3859857482185273, 0.19833729216152018, 0.7327790973871734, 0.9085510688836105, 0.9275534441805225, 1.0, 0.2850356294536817]


<h1>Neural Network Model</h1>

In [9]:
class TrainingGuardCallback(keras.callbacks.Callback):

  def on_train_batch_end(self, batch, logs=None):
    pass

  def on_epoch_end(self, epoch, logs={}):
    if logs.get('accuracy') is not None:
      if epoch > 200 or logs.get('accuracy') > 0.95:
        self.model.stop_training = True
        print("\nTraining goals met. Training has been stopped!!")   

In [10]:
model = keras.Sequential()
model.add(keras.layers.Flatten(input_shape = np.shape(train_data[0])))
model.add(keras.layers.Dropout(0.2))
model.add(keras.layers.Dense(1024, activation=keras.activations.relu)) # 1st hidden layer
model.add(keras.layers.Dropout(0.3) ) # helps prevent overfitting
model.add(keras.layers.Dense(2048, activation=keras.activations.relu)) # 2nd hidden layer
model.add(keras.layers.Dense(14, activation=keras.activations.softmax)) # output layer

In [11]:
model.compile(                                          # use adam optimizer
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
    metrics=['accuracy'], 
    loss=keras.losses.SparseCategoricalCrossentropy()   # labels provided as integers, for one-hot vectors use keras.losses.CategoricalCrossentropy
    )

In [None]:
history = model.fit(train_data, train_labels, verbose=1, epochs=999, callbacks=[TrainingGuardCallback()]) # train the model

In [13]:
print("NN accuracy: {:.2f}% \t loss: {:.5f}".format(history.history['accuracy'][-1]*100, history.history['loss'][-1]))

NN accuracy: 96.43% 	 loss: 0.42896


<h1>Testing the model</h1>

In [14]:
loss, accuracy = model.evaluate(test_data, test_labels, verbose=0)
print("Model loss (test data): {:.3f} \t Model accuracy (test data): {:.2f}%".format(loss, accuracy*100))

Model loss (test data): 1.805 	 Model accuracy (test data): 35.71%


<h1>Data augmentation</h1>


In [15]:
import random as r

def data_augmentation(set, **kwargs):
  aug_set = Set(set.id, [], True)
  valid = False

  # random Method
  if 'random' in kwargs:
    valid = True
    for i in range(0, len(set.keystrokes)):
      r1 = math.floor((r.random() * 100) - 50)
      r2 = math.floor((r.random() * 100) - 50)
      keystroke = Keystroke(set.keystrokes[i].key, 
                            math.floor(set.keystrokes[i].up + r1),#dst[i]),
                            math.floor(set.keystrokes[i].down + r2),# dst[i]),
                            math.floor(set.keystrokes[i].down + r2) - math.floor(set.keystrokes[i].up + r1))#dst[i]) - math.floor(set.keystrokes[i].up + dst[i]))
      
      aug_set.keystrokes.append(keystroke)

  # Gaussian dst Method
  if 'gaussian_rand' in kwargs:
    valid = True
    if 'mu' not in kwargs:
        mu = 25 # mean
    if 'sigma' not in kwargs:
        sigma = 50; # standard deviation

    dst = np.random.normal(mu, sigma, len(set.keystrokes))

    for i in range(0, len(set.keystrokes)):
      keystroke = Keystroke(set.keystrokes[i].key, 
                            math.floor(set.keystrokes[i].up + dst[i]),
                            math.floor(set.keystrokes[i].down + dst[i]),
                            math.floor(set.keystrokes[i].down + dst[i]) - math.floor(set.keystrokes[i].up + dst[i]))
      
      aug_set.keystrokes.append(keystroke)

  # Replace random samples Method
  if 'random_replace' in kwargs:
    valid = True

    if 'replace_rate' not in kwargs:
      rate = 10

    next = rate
    for i in range(len(set.keystrokes)):
      if i == next:
        index = i - math.ceil(r.random() * next)
        index = 0 if index < 0 else index
        sub = aug_set.keystrokes[index]

        keystroke = Keystroke(set.keystrokes[i].key, sub.up, sub.down, sub.dtime)
        keystrokeSub = Keystroke(sub.key, set.keystrokes[i].up, set.keystrokes[i].down, set.keystrokes[i].dtime)

        aug_set.keystrokes[index] = keystrokeSub
        aug_set.keystrokes.append(keystroke)
        
        next += rate
      else:
        aug_set.keystrokes.append(set.keystrokes[i])

  # Generated new values based on average value per sample, add stdev
  if 'average_gen' in kwargs:
    valid = True

    if 'dataset' not in kwargs:
      raise Exception('No dataset was provided for the average based generation method')

    dataset = kwargs['dataset']
    avgUp = np.zeros((len(set.keystrokes),))
    avgDown = np.zeros((len(set.keystrokes),))

    for kSet in dataset[str(set.id+1)]:
      for i in range(len(kSet.keystrokes)):
        avgUp[i] = avgUp[i] + kSet.keystrokes[i].up
        avgDown[i] = avgDown[i] + kSet.keystrokes[i].down

    assert len(avgUp) == len(avgDown)

    datasets_no = len(dataset[str(set.id+1)])
    for i in range(len(avgUp)):
      avgUp[i] = math.floor(avgUp[i] / datasets_no)
      avgDown[i] = math.floor(avgDown[i] / datasets_no)

    dst = np.random.normal(5, 5, len(avgUp))
    
    for i in range(len(set.keystrokes)):
      aug_set.keystrokes.append(Keystroke(set.keystrokes[i].key, 
                                math.floor(avgUp[i] + dst[i]),
                                math.floor(avgDown[i] + dst[i]),
                                math.floor(abs((avgDown[i] + dst[i]) - (avgUp[i] + dst[i])))))

  # None of the above was specified
  if valid is False:
    raise Exception('Augmentation method not chosen')

  return aug_set

In [16]:

for key in dataset:
  for i in range(len(dataset[key])):
    #aug = data_augmentation(dataset[key][i], random=True, gaussian_rand=True, random_replace=True)
    aug = data_augmentation(dataset[key][i], average_gen=True, dataset=dataset)
    dataset[key].insert(0, aug) # prepend
  
tdata, tlabels, testd, testl = convert_all(dataset)

#tdata += test_data
#tlabels += test_labels

print('Augmented dataset size: ', len(tdata))
assert len(tlabels) == len(tdata), 'Dataset corrupted'

print("format: ", testd[0])

Augmented dataset size:  70
format:  [0.42596810933940776, 0.21981776765375854, 0.49430523917995445, 0.2517084282460137, 0.31890660592255127, 0.3211845102505695, 0.1252847380410023, 0.7164009111617312, 0.40774487471526194, 0.4202733485193622, 0.30751708428246016, 0.23690205011389523, 0.23234624145785876, 0.5091116173120729, 0.31662870159453305, 0.7198177676537585, 0.2984054669703872, 0.3678815489749431, 0.30751708428246016, 0.5216400911161732, 0.10933940774487472, 0.36332574031890663, 0.47038724373576307, 0.7323462414578588, 0.06719817767653759, 1.0, 0.1947608200455581]


<h1>Retrain the model with augmented data</h1>


In [17]:
model.fit(tdata, tlabels, verbose=1, epochs=999, shuffle=True, callbacks=[TrainingGuardCallback()])

Epoch 1/999
Epoch 2/999
Epoch 3/999
Epoch 4/999
Epoch 5/999
Epoch 6/999
Epoch 7/999
Epoch 8/999
Epoch 9/999
Epoch 10/999
Epoch 11/999
Epoch 12/999
Epoch 13/999
Epoch 14/999
Epoch 15/999
Epoch 16/999
Epoch 17/999
Epoch 18/999
Epoch 19/999
Epoch 20/999
Epoch 21/999
Epoch 22/999
Epoch 23/999
Epoch 24/999
Epoch 25/999
Epoch 26/999
Epoch 27/999
Epoch 28/999
Epoch 29/999
Epoch 30/999
Epoch 31/999
Epoch 32/999
Epoch 33/999
Epoch 34/999
Epoch 35/999
Epoch 36/999
Epoch 37/999
Epoch 38/999
Epoch 39/999
Epoch 40/999
Epoch 41/999
Epoch 42/999
Epoch 43/999
Epoch 44/999
Epoch 45/999
Epoch 46/999
Epoch 47/999
Epoch 48/999
Epoch 49/999
Epoch 50/999
Epoch 51/999
Epoch 52/999
Epoch 53/999
Epoch 54/999
Epoch 55/999
Epoch 56/999
Epoch 57/999
Epoch 58/999
Epoch 59/999
Epoch 60/999
Epoch 61/999
Epoch 62/999
Epoch 63/999
Epoch 64/999
Epoch 65/999
Epoch 66/999
Epoch 67/999
Epoch 68/999
Epoch 69/999
Epoch 70/999
Epoch 71/999
Epoch 72/999
Epoch 73/999
Epoch 74/999
Epoch 75/999
Epoch 76/999
Epoch 77/999
Epoch 78

<keras.callbacks.History at 0x7f2a3ccc2dd0>

<h1>Testing the result</h1>

In [18]:
loss, accuracy = model.evaluate(testd, testl, verbose=0)
print("Model loss (test data): {:.3f} \t Model accuracy (test data): {:.2f}%".format(loss, accuracy*100))

Model loss (test data): 0.756 	 Model accuracy (test data): 71.43%


<h1>Save augmented sets</h1>

In [22]:
import os

def save_aug_sets(dataset, path, offset):
  os.makedirs(path, exist_ok=True) # create dir if not exists

  for key in dataset:
    sampleid = offset

    for set in dataset[key]:
      if set.aug is not True:
        continue

      filepath = path + "/#" + str(set.id+1).zfill(2) + "_" + str(sampleid)  + ".txt"
      sampleid += 1
      
      with open(filepath, 'w') as f:

        for k in set.keystrokes:
          f.write("\t{}, \t{}, \t{}".format(k.key, k.down, k.up))
          f.write('\n')

In [23]:
save_aug_sets(dataset, "drive/MyDrive/Keystrokes/Augmented", 3)