# Keystroke Dynamics - BiLSTM + Embedding Model Training

This notebook implements the training pipeline for a Multi-Input BiLSTM model used for keystroke dynamics authentication.

### Architecture:
1.  **Input A (Timing)**: `(N, 50, 5)` Float32 - Dwell, Flight, Latency, etc.
2.  **Input B (Key ID)**: `(N, 50)` Int32 - Character codes for Embedding.
3.  **Layers**: Masking -> BiLSTM (Timing) + Embedding -> LSTM (Keys) -> Concatenate -> Dense.

In [None]:
import json
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models, regularizers, optimizers, callbacks
from pathlib import Path
import glob
import random

print("TensorFlow Version:", tf.__version__)

## 1. Robust Key Encoding (Shared Logic)

This logic MUST match `web-app/app/services.py` exactly to ensure consistent inputs.

In [None]:
def get_stable_key_id(key: str) -> int:
    if len(key) == 1:
        # Shift ASCII by 2 to reserve 0 and 1
        return ord(key) + 2
    
    special_map = {
        "Backspace": 8 + 2,
        "Tab": 9 + 2,
        "Enter": 13 + 2,
        "Shift": 16 + 2,
        "Control": 17 + 2,
        "Alt": 18 + 2,
        "CapsLock": 20 + 2,
        "Escape": 27 + 2,
        "Space": 32 + 2,
        "PageUp": 33 + 2,
        "PageDown": 34 + 2,
        "End": 35 + 2,
        "Home": 36 + 2,
        "ArrowLeft": 37 + 2,
        "ArrowUp": 38 + 2,
        "ArrowRight": 39 + 2,
        "ArrowDown": 40 + 2,
        "Insert": 45 + 2,
        "Delete": 127 + 2
    }
    return special_map.get(key, 1) # 1 = UNK


## 2. Data Loading & Sequence Generation (Clean Split)

In [None]:
TARGET_LENGTH = 50

def process_json_file(filepath):
    with open(filepath, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    keystrokes = data.get('keystrokes', [])
    username = data.get('username')
    
    if not keystrokes:
        return None, None, None

    # Feature Engineering Constants
    TIME_SCALE = 7.0
    MAX_LATENCY = 3000.0

    # Separate lists for the two inputs
    sequence_time = []
    sequence_keys = []
    
    keydown_events = {}
    previous_keyup_time = None
    previous_keydown_time = None

    for idx, event in enumerate(keystrokes):
        key = event['key']
        event_type = event['event_type']
        timestamp = event['timestamp']

        # -- Time Feature Calculation Logic --
        if event_type == "keydown":
            keydown_events[key] = timestamp
            previous_keydown_time = timestamp
            if key in {"Backspace", "Shift"}: continue

        elif event_type == "keyup":
            if key in {"Shift", "Backspace"}:
                if key in keydown_events: del keydown_events[key]
                continue

            dwell_time = 0.0
            keydown_time = keydown_events.get(key)
            if keydown_time:
                dwell_time = timestamp - keydown_time
                del keydown_events[key]
            
            flight_time = (keydown_time - previous_keyup_time) if (previous_keyup_time and keydown_time) else 0.0
            inter_key_delay = (timestamp - keystrokes[idx - 1]['timestamp']) if idx > 0 else 0.0

            # Clamping (Outlier Removal)
            dwell_time = min(dwell_time, MAX_LATENCY)
            flight_time = min(flight_time, MAX_LATENCY)
            inter_key_delay = min(inter_key_delay, MAX_LATENCY)

            # Log1p Normalization using TIME_SCALE
            norm_dwell = np.log1p(dwell_time) / TIME_SCALE
            norm_flight = np.log1p(flight_time) / TIME_SCALE if flight_time > 0 else 0.0
            norm_delay = np.log1p(inter_key_delay) / TIME_SCALE
            
            # Base features: [Dwell, Flight, Delay]
            time_vector = [float(norm_dwell), float(norm_flight), float(norm_delay)]
            
            # Feature 4: Pressure (Simulated)
            time_vector.append(min(dwell_time / 200.0, 1.0))
            
            # Feature 5: Down-Down Latency
            if previous_keydown_time and keydown_time:
                dd_lat = keydown_time - previous_keydown_time
                dd_lat = min(dd_lat, MAX_LATENCY) # Clamp
                time_vector.append(np.log1p(dd_lat) / TIME_SCALE)
            else:
                time_vector.append(0.0)

            # KEY ID stored separately
            key_id = get_stable_key_id(key)

            sequence_time.append(time_vector)
            sequence_keys.append(key_id)
            
            previous_keyup_time = timestamp

    # Sliding Window Logic for BOTH lists
    final_time_sequences = []
    final_key_sequences = []

    def get_padded_window(seq, is_key=False):
        res = list(seq)
        if len(res) < TARGET_LENGTH:
            pad_val = 0 if is_key else [0.0] * 5
            while len(res) < TARGET_LENGTH:
                res.append(pad_val)
        return res[:TARGET_LENGTH]

    if len(sequence_time) < TARGET_LENGTH:
        final_time_sequences.append(get_padded_window(sequence_time, is_key=False))
        final_key_sequences.append(get_padded_window(sequence_keys, is_key=True))
    else:
        step = TARGET_LENGTH // 2
        for i in range(0, len(sequence_time) - TARGET_LENGTH + 1, step):
            final_time_sequences.append(sequence_time[i : i + TARGET_LENGTH])
            final_key_sequences.append(sequence_keys[i : i + TARGET_LENGTH])

        remaining = len(sequence_time)
        if remaining > TARGET_LENGTH and (remaining - TARGET_LENGTH) % step != 0:
             final_time_sequences.append(sequence_time[-TARGET_LENGTH:])
             final_key_sequences.append(sequence_keys[-TARGET_LENGTH:])
    
    return np.array(final_time_sequences), np.array(final_key_sequences), username


## 3. Load All Data

In [None]:
# Set path to your raw json files
# Adjust this path to match your Colab or Local structure
# If using Colab, upload files to a folder named 'keystroke_data'
data_path = Path("../web-app/keystroke_data") 
json_files = list(data_path.rglob("*.json"))

all_X_time = []
all_X_key = []
all_y = []

label_map = {}
current_label = 0

for f in json_files:
    xt, xk, user = process_json_file(f)
    if xt is not None and user is not None:
        if user not in label_map:
            label_map[user] = current_label
            current_label += 1
        
        label_id = label_map[user]
        
        for i in range(len(xt)):
            all_X_time.append(xt[i])
            all_X_key.append(xk[i])
            all_y.append(label_id)

X_time_train = np.array(all_X_time, dtype=np.float32)
X_key_train = np.array(all_X_key, dtype=np.int32)
y_train = np.array(all_y, dtype=np.int32)

print("Classes:", label_map)
print("X_time shape:", X_time_train.shape)
print("X_key shape:", X_key_train.shape)
print("y shape:", y_train.shape)

## 4. Build Multi-Input Model

In [None]:
# Inputs
input_time = layers.Input(shape=(50, 5), name="input_time")
input_key = layers.Input(shape=(50,), dtype="int32", name="input_key")

# Branch A: Timing (Float)
# Masking for 0.0 values (padding)
masked_time = layers.Masking(mask_value=0.0)(input_time)
bilstm_time = layers.Bidirectional(layers.LSTM(64, return_sequences=False))(masked_time)
bilstm_time = layers.Dropout(0.3)(bilstm_time)

# Branch B: Key IDs (Embedding)
# Vocab Size = 1000 (Safe for Turkish/Unicode), Dim = 16
embedding = layers.Embedding(input_dim=1000, output_dim=16, mask_zero=True)(input_key)
lstm_key = layers.LSTM(32, return_sequences=False)(embedding)
lstm_key = layers.Dropout(0.3)(lstm_key)

# Merge
merged = layers.Concatenate()([bilstm_time, lstm_key])
dense = layers.Dense(32, activation="relu")(merged)

# Output
num_classes = len(label_map)
if num_classes == 2:
    output = layers.Dense(1, activation="sigmoid")(dense)
    loss = "binary_crossentropy"
else:
    output = layers.Dense(num_classes, activation="softmax")(dense)
    loss = "sparse_categorical_crossentropy"

model = models.Model(inputs=[input_time, input_key], outputs=output)

model.compile(optimizer="adam", loss=loss, metrics=["accuracy"])
model.summary()

## 5. Train

In [None]:
history = model.fit(
    [X_time_train, X_key_train],
    y_train,
    epochs=50,
    batch_size=16,
    validation_split=0.2
)

## 6. Save Model

In [None]:
model.save("keystroke_lstm_model.h5")
print("Model saved as keystroke_lstm_model.h5")