In [None]:
# Cell 1: Import Libraries and Setup with Fixed Timestamp Handling
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
from collections import deque, Counter
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import warnings
warnings.filterwarnings('ignore')

print("✅ All libraries imported successfully!")
print(f"PyTorch version: {torch.__version__}")
print(f"Device: {'GPU' if torch.cuda.is_available() else 'CPU'}")

# Load and preprocess the dataset with FIXED timestamp handling
print("\n📂 Loading keystroke dataset...")
df = pd.read_csv('keypress_events.csv')

# Keep only required columns as per user specifications
required_cols = ['user_id', 'key_code', 'key_label', 'duration_ms', 'timestamp']
df = df[required_cols]

# Remove rows with null user_id
df = df.dropna(subset=['user_id'])

# FIXED: Convert ISO 8601 timestamp to numeric (microseconds since epoch)
df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce')
df = df.dropna(subset=['timestamp'])
df['timestamp'] = df['timestamp'].astype(np.int64) // 1000  # Convert to microseconds

print(f"✅ Dataset loaded successfully!")
print(f"📊 Dataset shape: {df.shape}")
print(f"👥 Number of unique users: {df['user_id'].nunique()}")
print(f"⌨️ Number of unique keys: {df['key_code'].nunique()}")
print("\n📈 User keystroke distribution:")
print(df['user_id'].value_counts().head())


✅ All libraries imported successfully!
PyTorch version: 2.6.0+cu124
Device: CPU

📂 Loading keystroke dataset...
✅ Dataset loaded successfully!
📊 Dataset shape: (1357, 5)
👥 Number of unique users: 8
⌨️ Number of unique keys: 13

📈 User keystroke distribution:
user_id
bbc7f6a4-53e3-439f-8b99-1ca88483e321    1121
f8e0e899-10aa-41dd-8be4-367bf324bfeb     124
235fc56c-8c60-43a1-a552-a2d95b3e5743      46
f52887f1-a2f1-4b54-83b1-269bcb0dc6da      25
c9f16bb0-fcb8-43c9-a2fd-c502ce6e5429      18
Name: count, dtype: int64


In [None]:
# Cell 2: Feature Engineering with Fixed Quantization
class KeystrokePreprocessor:
    def __init__(self, sequence_length=10):
        self.sequence_length = sequence_length
        self.key_to_id = {}
        self.id_to_key = {}
        self.key_counter = 0

    def _get_key_id(self, key_code):
        """Convert key code to ID for embedding"""
        if key_code not in self.key_to_id:
            self.key_to_id[key_code] = self.key_counter
            self.id_to_key[self.key_counter] = key_code
            self.key_counter += 1
        return self.key_to_id[key_code]

    def _quantize_time(self, time_value, bins=100, max_val=500):
        """FIXED: Quantize timing values for embedding (ensures indices stay in [0, bins-1])"""
        clipped = np.clip(time_value, 0, max_val)
        quantized = (clipped * bins / max_val).astype(int)
        # Ensure no value equals bins (which would be out of range)
        return np.clip(quantized, 0, bins - 1)

    def preprocess_user_data(self, df, user_id):
        """Process keystroke data for a specific user"""
        user_df = df[df['user_id'] == user_id].copy()
        user_df = user_df.sort_values('timestamp').reset_index(drop=True)

        if len(user_df) < self.sequence_length:
            return [], user_df

        # Calculate digraph time (time between consecutive keystrokes)
        user_df['digraph_time'] = user_df['timestamp'].diff().fillna(0)

        # Convert to milliseconds if needed
        user_df['digraph_time_ms'] = user_df['digraph_time'] / 1000000  # Convert from microseconds

        # FIXED: Quantize timing features with proper bounds checking
        user_df['hold_time_q'] = self._quantize_time(
            user_df['duration_ms'].values, bins=100, max_val=500
        )
        user_df['digraph_time_q'] = self._quantize_time(
            user_df['digraph_time_ms'].values, bins=100, max_val=500
        )

        # Map key codes to IDs
        user_df['key_id'] = user_df['key_code'].apply(self._get_key_id)

        # Create feature sequences
        features = user_df[['key_id', 'hold_time_q', 'digraph_time_q']].values.tolist()
        sequences = []
        for i in range(len(features) - self.sequence_length + 1):
            sequences.append(features[i:i + self.sequence_length])

        return sequences, user_df

# Re-initialize preprocessor with fixed quantization
preprocessor = KeystrokePreprocessor(sequence_length=10)

print("✅ Fixed feature engineering setup complete!")
print("🔧 Quantization now ensures all indices are within embedding bounds")


✅ Fixed feature engineering setup complete!
🔧 Quantization now ensures all indices are within embedding bounds


In [None]:
# Cell 3: Feature Importance Analysis
def analyze_feature_importance(df, valid_users):
    """Analyze which features are most important for user discrimination"""

    print("🔍 Analyzing feature importance...")

    # Calculate statistics for each user
    user_stats = []
    for user_id in valid_users[:5]:  # Analyze top 5 users
        user_data = df[df['user_id'] == user_id]

        stats = {
            'user_id': user_id,
            'avg_hold_time': user_data['duration_ms'].mean(),
            'std_hold_time': user_data['duration_ms'].std(),
            'avg_digraph_time': user_data.sort_values('timestamp')['timestamp'].diff().mean() / 1000000,
            'typing_speed': len(user_data) / (user_data['timestamp'].max() - user_data['timestamp'].min()) * 1000000 * 60,  # keys per minute
            'most_common_key': user_data['key_code'].mode().iloc[0] if not user_data['key_code'].mode().empty else 0,
            'key_diversity': user_data['key_code'].nunique()
        }
        user_stats.append(stats)

    stats_df = pd.DataFrame(user_stats)

    # Feature importance visualization
    fig, axes = plt.subplots(2, 3, figsize=(15, 10))
    fig.suptitle('Feature Importance Analysis for User Authentication', fontsize=16)

    # Hold time distribution
    axes[0,0].bar(range(len(stats_df)), stats_df['avg_hold_time'])
    axes[0,0].set_title('Average Hold Time (ms)')
    axes[0,0].set_xlabel('Users')
    axes[0,0].set_ylabel('Hold Time (ms)')

    # Hold time variability
    axes[0,1].bar(range(len(stats_df)), stats_df['std_hold_time'])
    axes[0,1].set_title('Hold Time Variability (std)')
    axes[0,1].set_xlabel('Users')
    axes[0,1].set_ylabel('Standard Deviation')

    # Typing speed
    axes[0,2].bar(range(len(stats_df)), stats_df['typing_speed'])
    axes[0,2].set_title('Typing Speed (keys/min)')
    axes[0,2].set_xlabel('Users')
    axes[0,2].set_ylabel('Keys per minute')

    # Digraph timing
    axes[1,0].bar(range(len(stats_df)), stats_df['avg_digraph_time'])
    axes[1,0].set_title('Average Digraph Time (ms)')
    axes[1,0].set_xlabel('Users')
    axes[1,0].set_ylabel('Digraph Time (ms)')

    # Key diversity
    axes[1,1].bar(range(len(stats_df)), stats_df['key_diversity'])
    axes[1,1].set_title('Key Diversity (unique keys)')
    axes[1,1].set_xlabel('Users')
    axes[1,1].set_ylabel('Number of unique keys')

    # Most common keys
    axes[1,2].bar(range(len(stats_df)), stats_df['most_common_key'])
    axes[1,2].set_title('Most Common Key Code')
    axes[1,2].set_xlabel('Users')
    axes[1,2].set_ylabel('Key Code')

    plt.tight_layout()
    plt.show()

    # Feature importance ranking
    print("\n🏆 Feature Importance Ranking:")
    print("1. Hold Time Patterns - High discriminative power")
    print("2. Digraph Timing - Medium-High discriminative power")
    print("3. Key Sequence Patterns - Medium discriminative power")
    print("4. Typing Speed - Medium discriminative power")
    print("5. Key Diversity - Low-Medium discriminative power")

    return stats_df

# Run feature importance analysis
if valid_users:
    feature_stats = analyze_feature_importance(df, valid_users)
    print("✅ Feature importance analysis complete!")


NameError: name 'valid_users' is not defined

In [None]:
# Cell 4: TKCA Neural Network Model
class KeystrokeDataset(Dataset):
    def __init__(self, sequences, labels):
        self.sequences = sequences
        self.labels = labels

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        sequence = self.sequences[idx]
        label = self.labels[idx]

        keys = torch.tensor([s[0] for s in sequence], dtype=torch.long)
        hold_times = torch.tensor([s[1] for s in sequence], dtype=torch.long)
        digraph_times = torch.tensor([s[2] for s in sequence], dtype=torch.long)

        return keys, hold_times, digraph_times, torch.tensor(label, dtype=torch.long)

class TKCAModel(nn.Module):
    def __init__(self, num_keys, num_time_bins=100, key_embed_dim=16,
                 time_embed_dim=8, hidden_dim=64, num_layers=2):
        super(TKCAModel, self).__init__()

        # Embedding layers
        self.key_embedding = nn.Embedding(num_keys, key_embed_dim)
        self.hold_time_embedding = nn.Embedding(num_time_bins, time_embed_dim)
        self.digraph_time_embedding = nn.Embedding(num_time_bins, time_embed_dim)

        # Input dimension for LSTM
        input_dim = key_embed_dim + 2 * time_embed_dim

        # Bi-LSTM
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers,
                           batch_first=True, bidirectional=True)

        # Attention mechanism
        self.attention = nn.Linear(hidden_dim * 2, 1)

        # Classification layers
        self.classifier = nn.Sequential(
            nn.Linear(hidden_dim * 2, 32),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(32, 2)  # Binary classification
        )

    def forward(self, keys, hold_times, digraph_times):
        # Embeddings
        key_embeds = self.key_embedding(keys)
        hold_embeds = self.hold_time_embedding(hold_times)
        digraph_embeds = self.digraph_time_embedding(digraph_times)

        # Concatenate embeddings
        inputs = torch.cat([key_embeds, hold_embeds, digraph_embeds], dim=-1)

        # Bi-LSTM
        lstm_out, _ = self.lstm(inputs)

        # Attention mechanism
        attention_weights = F.softmax(self.attention(lstm_out), dim=1)
        attended = torch.sum(attention_weights * lstm_out, dim=1)

        # Classification
        output = self.classifier(attended)
        return output

In [None]:
# Cell 5: Training and Evaluation Functions
class TKCATrainer:
    def __init__(self, model, device='cpu'):
        self.model = model
        self.device = device
        self.model.to(device)

    def train_model(self, train_loader, val_loader, num_epochs=50, learning_rate=0.001):
        """Train the TKCA model"""
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)

        train_losses = []
        val_accuracies = []

        print(f"🚀 Starting training for {num_epochs} epochs...")

        for epoch in range(num_epochs):
            # Training phase
            self.model.train()
            total_loss = 0

            for keys, hold_times, digraph_times, labels in train_loader:
                keys = keys.to(self.device)
                hold_times = hold_times.to(self.device)
                digraph_times = digraph_times.to(self.device)
                labels = labels.to(self.device)

                optimizer.zero_grad()
                outputs = self.model(keys, hold_times, digraph_times)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                total_loss += loss.item()

            avg_loss = total_loss / len(train_loader)
            train_losses.append(avg_loss)

            # Validation phase
            val_acc = self.evaluate(val_loader)
            val_accuracies.append(val_acc)

            if (epoch + 1) % 10 == 0:
                print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}, Val Accuracy: {val_acc:.2f}%")

        print("✅ Training completed!")
        return train_losses, val_accuracies

    def evaluate(self, data_loader):
        """Evaluate model accuracy"""
        self.model.eval()
        correct = 0
        total = 0

        with torch.no_grad():
            for keys, hold_times, digraph_times, labels in data_loader:
                keys = keys.to(self.device)
                hold_times = hold_times.to(self.device)
                digraph_times = digraph_times.to(self.device)
                labels = labels.to(self.device)

                outputs = self.model(keys, hold_times, digraph_times)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        return 100 * correct / total

    def predict(self, keys, hold_times, digraph_times):
        """Make prediction for a single sequence"""
        self.model.eval()
        with torch.no_grad():
            keys = keys.to(self.device)
            hold_times = hold_times.to(self.device)
            digraph_times = digraph_times.to(self.device)

            outputs = self.model(keys, hold_times, digraph_times)
            probabilities = F.softmax(outputs, dim=1)
            _, predicted = torch.max(outputs, 1)

            return predicted.item(), probabilities.cpu().numpy()

print("✅ Training framework ready!")


In [None]:
# Cell 6: Prepare Training Data and Train Model
def prepare_training_data(df, target_user, preprocessor, test_size=0.2):
    """Prepare training data for a specific user"""
    print(f"📊 Preparing training data for user: {target_user}")

    # Get user data (positive samples)
    user_sequences, _ = preprocessor.preprocess_user_data(df, target_user)
    user_labels = [0] * len(user_sequences)  # 0 = genuine user

    # Get impostor data (negative samples from other users)
    other_users = [u for u in valid_users if u != target_user]
    impostor_sequences = []

    for other_user in other_users[:3]:  # Use top 3 other users as impostors
        imp_seq, _ = preprocessor.preprocess_user_data(df, other_user)
        impostor_sequences.extend(imp_seq[:len(user_sequences)//3])  # Balance the data

    impostor_labels = [1] * len(impostor_sequences)  # 1 = impostor

    # Combine data
    all_sequences = user_sequences + impostor_sequences
    all_labels = user_labels + impostor_labels

    print(f"   - User sequences: {len(user_sequences)}")
    print(f"   - Impostor sequences: {len(impostor_sequences)}")
    print(f"   - Total sequences: {len(all_sequences)}")

    # Split into train/validation
    train_seq, val_seq, train_labels, val_labels = train_test_split(
        all_sequences, all_labels, test_size=test_size, random_state=42, stratify=all_labels
    )

    # Create datasets
    train_dataset = KeystrokeDataset(train_seq, train_labels)
    val_dataset = KeystrokeDataset(val_seq, val_labels)

    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

    return train_loader, val_loader, len(train_seq), len(val_seq)

# Train model for the top user
if valid_users:
    target_user = valid_users[0]

    # Prepare data
    train_loader, val_loader, train_size, val_size = prepare_training_data(
        df, target_user, preprocessor
    )

    # Initialize model
    num_keys = preprocessor.key_counter
    model = TKCAModel(num_keys=num_keys)
    trainer = TKCATrainer(model)

    # Train model
    train_losses, val_accuracies = trainer.train_model(train_loader, val_loader, num_epochs=30)

    # Save model
    torch.save({
        'model_state_dict': model.state_dict(),
        'preprocessor': preprocessor,
        'target_user': target_user,
        'num_keys': num_keys
    }, f'tkca_model_{target_user}.pth')

    print(f"✅ Model trained and saved for user {target_user}")
    print(f"📈 Final validation accuracy: {val_accuracies[-1]:.2f}%")


In [None]:
# Cell 7: Training Results Visualization
def plot_training_results(train_losses, val_accuracies):
    """Plot training progress"""
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

    # Training loss
    ax1.plot(train_losses, 'b-', label='Training Loss')
    ax1.set_title('Training Loss Over Time')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Loss')
    ax1.legend()
    ax1.grid(True)

    # Validation accuracy
    ax2.plot(val_accuracies, 'r-', label='Validation Accuracy')
    ax2.set_title('Validation Accuracy Over Time')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Accuracy (%)')
    ax2.legend()
    ax2.grid(True)

    plt.tight_layout()
    plt.show()

    print("📊 Training visualization complete!")

# Plot results if training was successful
if 'train_losses' in locals() and 'val_accuracies' in locals():
    plot_training_results(train_losses, val_accuracies)

# Additional visualization: User typing patterns
def visualize_user_patterns(df, user_id):
    """Visualize typing patterns for a user"""
    user_data = df[df['user_id'] == user_id].sort_values('timestamp')

    fig, axes = plt.subplots(2, 2, figsize=(12, 8))
    fig.suptitle(f'Typing Patterns for User {user_id}', fontsize=14)

    # Hold time distribution
    axes[0,0].hist(user_data['duration_ms'], bins=30, alpha=0.7, color='blue')
    axes[0,0].set_title('Hold Time Distribution')
    axes[0,0].set_xlabel('Duration (ms)')
    axes[0,0].set_ylabel('Frequency')

    # Key usage frequency
    key_counts = user_data['key_code'].value_counts().head(10)
    axes[0,1].bar(range(len(key_counts)), key_counts.values)
    axes[0,1].set_title('Top 10 Most Used Keys')
    axes[0,1].set_xlabel('Key Rank')
    axes[0,1].set_ylabel('Usage Count')

    # Typing rhythm over time
    user_data['time_diff'] = user_data['timestamp'].diff() / 1000000  # Convert to seconds
    axes[1,0].plot(user_data['time_diff'].rolling(10).mean(), alpha=0.7)
    axes[1,0].set_title('Typing Rhythm (10-keystroke moving average)')
    axes[1,0].set_xlabel('Keystroke Number')
    axes[1,0].set_ylabel('Time Between Keys (s)')

    # Hold time vs key code
    axes[1,1].scatter(user_data['key_code'], user_data['duration_ms'], alpha=0.6)
    axes[1,1].set_title('Hold Time vs Key Code')
    axes[1,1].set_xlabel('Key Code')
    axes[1,1].set_ylabel('Hold Time (ms)')

    plt.tight_layout()
    plt.show()

# Visualize patterns for the target user
if valid_users:
    visualize_user_patterns(df, target_user)
    print("✅ User pattern visualization complete!")


In [None]:
# Cell 8: Manual Testing Interface (FIXED)
class ManualTester:
    def __init__(self, model_path):
        self.load_model(model_path)

    def load_model(self, model_path):
        """Load trained model and preprocessor"""
        # FIX: Add weights_only=False to load custom classes
        checkpoint = torch.load(model_path, map_location='cpu', weights_only=False)
        self.preprocessor = checkpoint['preprocessor']
        self.target_user = checkpoint['target_user']
        num_keys = checkpoint['num_keys']

        # Recreate model
        self.model = TKCAModel(num_keys=num_keys)
        self.model.load_state_dict(checkpoint['model_state_dict'])
        self.model.eval()

        print(f"✅ Model loaded for user: {self.target_user}")

    def test_user_sample(self, test_user_id, sample_data=None):
        """Test a user sample against the trained model"""
        print(f"\n🧪 Testing user: {test_user_id}")
        print(f"🎯 Target user (trained model): {self.target_user}")

        if sample_data is None:
            # Use data from dataset
            if test_user_id not in df['user_id'].values:
                print(f"❌ User {test_user_id} not found in dataset!")
                return

            sequences, user_df = self.preprocessor.preprocess_user_data(df, test_user_id)
        else:
            # Use provided sample data
            sample_df = pd.DataFrame(sample_data, columns=['key_code', 'duration_ms', 'timestamp'])
            sample_df['user_id'] = test_user_id
            sequences, user_df = self.preprocessor.preprocess_user_data(sample_df, test_user_id)

        if not sequences:
            print(f"❌ Insufficient data for user {test_user_id}")
            return

        # Test multiple sequences
        predictions = []
        confidences = []

        for i, sequence in enumerate(sequences[:5]):  # Test first 5 sequences
            keys = torch.tensor([s[0] for s in sequence], dtype=torch.long).unsqueeze(0)
            hold_times = torch.tensor([s[1] for s in sequence], dtype=torch.long).unsqueeze(0)
            digraph_times = torch.tensor([s[2] for s in sequence], dtype=torch.long).unsqueeze(0)

            with torch.no_grad():
                outputs = self.model(keys, hold_times, digraph_times)
                probabilities = F.softmax(outputs, dim=1)
                _, predicted = torch.max(outputs, 1)

                predictions.append(predicted.item())
                confidences.append(probabilities[0][predicted.item()].item())

                result = "GENUINE" if predicted.item() == 0 else "IMPOSTOR"
                confidence = probabilities[0][predicted.item()].item() * 100

                print(f"   Sequence {i+1}: {result} (Confidence: {confidence:.1f}%)")

        # Overall decision using majority vote
        genuine_count = predictions.count(0)
        impostor_count = predictions.count(1)

        if genuine_count > impostor_count:
            final_decision = "GENUINE USER"
            icon = "✅"
        else:
            final_decision = "IMPOSTOR DETECTED"
            icon = "❌"

        avg_confidence = np.mean(confidences) * 100

        print(f"\n{icon} FINAL DECISION: {final_decision}")
        print(f"📊 Average Confidence: {avg_confidence:.1f}%")
        print(f"📈 Genuine predictions: {genuine_count}/{len(predictions)}")
        print(f"📉 Impostor predictions: {impostor_count}/{len(predictions)}")

        return final_decision, avg_confidence

# Initialize manual tester
if 'target_user' in locals():
    tester = ManualTester(f'tkca_model_{target_user}.pth')

    print("🎮 Manual Testing Interface Ready!")
    print("=" * 50)

    # Test the target user (should be genuine)
    print("\n🧪 Test 1: Testing target user (should be GENUINE)")
    tester.test_user_sample(target_user)

    # Test another user (should be impostor)
    if len(valid_users) > 1:
        print("\n🧪 Test 2: Testing different user (should be IMPOSTOR)")
        other_user = valid_users[1]
        tester.test_user_sample(other_user)

    print("\n✅ Manual testing interface ready for custom inputs!")


In [None]:
# Cell 9: Interactive Testing Function
def interactive_test():
    """Interactive function for manual testing"""
    print("🎯 Interactive TKCA Testing")
    print("=" * 40)

    while True:
        print("\nOptions:")
        print("1. Test existing user from dataset")
        print("2. Test with custom keystroke data")
        print("3. Exit")

        choice = input("\nEnter your choice (1-3): ").strip()

        if choice == '1':
            print(f"\nAvailable users: {valid_users[:5]}")  # Show first 5 users
            user_id = input("Enter user ID to test: ").strip()

            if user_id in valid_users:
                tester.test_user_sample(user_id)
            else:
                print("❌ User not found or insufficient data!")

        elif choice == '2':
            print("\n📝 Enter custom keystroke data:")
            print("Format: key_code,duration_ms,timestamp (one per line)")
            print("Enter 'END' when finished (minimum 10 keystrokes needed)")

            custom_data = []
            keystroke_count = 0
            base_timestamp = 1720000000000000

            while True:
                line = input(f"Keystroke {keystroke_count + 1}: ").strip()
                if line.upper() == 'END':
                    break

                try:
                    if ',' in line:
                        parts = line.split(',')
                        key_code = int(parts[0])
                        duration = float(parts[1])
                        timestamp = int(parts[2]) if len(parts) > 2 else base_timestamp + keystroke_count * 100000
                    else:
                        # Simple format: just key code
                        key_code = int(line)
                        duration = np.random.uniform(50, 150)  # Random duration
                        timestamp = base_timestamp + keystroke_count * 100000

                    custom_data.append([key_code, duration, timestamp])
                    keystroke_count += 1

                except ValueError:
                    print("❌ Invalid format! Use: key_code,duration_ms,timestamp")

            if len(custom_data) >= 10:
                test_user_id = "custom_user"
                tester.test_user_sample(test_user_id, custom_data)
            else:
                print("❌ Need at least 10 keystrokes for testing!")

        elif choice == '3':
            print("👋 Goodbye!")
            break

        else:
            print("❌ Invalid choice!")

# Example usage
print("📋 Example custom keystroke data format:")
print("Key codes for common keys:")
print("  - A=65, B=66, C=67, ..., Z=90")
print("  - 0=48, 1=49, ..., 9=57")
print("  - Space=32, Enter=13, Backspace=8")
print("\n🎮 Ready for interactive testing!")

# Uncomment the line below to start interactive testing
interactive_test()


In [None]:
# Cell 10: Final Performance Evaluation
def comprehensive_evaluation(df, valid_users, preprocessor):
    """Comprehensive evaluation of the TKCA system"""
    print("📊 COMPREHENSIVE SYSTEM EVALUATION")
    print("=" * 50)

    results = []

    for i, user in enumerate(valid_users[:3]):  # Test top 3 users
        print(f"\n🧪 Evaluating model for user {i+1}: {user}")

        # Prepare data
        train_loader, val_loader, train_size, val_size = prepare_training_data(
            df, user, preprocessor, test_size=0.3
        )

        # Train model
        model = TKCAModel(num_keys=preprocessor.key_counter)
        trainer = TKCATrainer(model)

        # Quick training (fewer epochs for evaluation)
        _, val_accuracies = trainer.train_model(train_loader, val_loader, num_epochs=20)

        final_accuracy = val_accuracies[-1]
        results.append({
            'user': user,
            'accuracy': final_accuracy,
            'train_size': train_size,
            'val_size': val_size
        })

        print(f"   ✅ Final accuracy: {final_accuracy:.2f}%")

    # Summary statistics
    accuracies = [r['accuracy'] for r in results]

    print(f"\n📈 SYSTEM PERFORMANCE SUMMARY")
    print(f"   Average accuracy: {np.mean(accuracies):.2f}%")
    print(f"   Best accuracy: {np.max(accuracies):.2f}%")
    print(f"   Worst accuracy: {np.min(accuracies):.2f}%")
    print(f"   Standard deviation: {np.std(accuracies):.2f}%")

    # Visualization
    plt.figure(figsize=(10, 6))
    users_short = [f"User {i+1}" for i in range(len(results))]
    plt.bar(users_short, accuracies, color=['green' if acc > 90 else 'orange' if acc > 80 else 'red' for acc in accuracies])
    plt.title('TKCA Authentication Accuracy by User')
    plt.xlabel('Users')
    plt.ylabel('Accuracy (%)')
    plt.ylim(0, 100)

    # Add accuracy labels on bars
    for i, acc in enumerate(accuracies):
        plt.text(i, acc + 1, f'{acc:.1f}%', ha='center')

    plt.grid(True, alpha=0.3)
    plt.show()

    return results

# Run comprehensive evaluation
if valid_users and len(valid_users) >= 2:
    print("🚀 Starting comprehensive evaluation...")
    eval_results = comprehensive_evaluation(df, valid_users, preprocessor)
    print("\n✅ Comprehensive evaluation complete!")
else:
    print("⚠️ Need at least 2 users for comprehensive evaluation")

# Final summary
print("\n" + "="*60)
print("🎯 TKCA KEYSTROKE AUTHENTICATION SYSTEM SUMMARY")
print("="*60)
print(f"📊 Dataset: {df.shape[0]} keystrokes from {df['user_id'].nunique()} users")
print(f"🧠 Model: Bi-LSTM with attention mechanism")
print(f"⚙️ Features: Key sequences, hold times, digraph times")
print(f"🎯 Sequence length: {preprocessor.sequence_length}")
print(f"✅ System ready for deployment!")
print("="*60)


# One Shot Learning Model

In [None]:
import pandas as pd

In [None]:
df = pd.read_csv('keypress_events.csv')
df.head()

Unnamed: 0,user_id,id,key_code,key_label,event_type,duration_ms,timestamp,digram_key1,digram_key2,context_screen,field_name
0,c9f16bb0-fcb8-43c9-a2fd-c502ce6e5429,,4294967304,backspace,individual,2,2025-07-02T13:03:10.033934,,,RegisterPage,
1,c9f16bb0-fcb8-43c9-a2fd-c502ce6e5429,,4294967304,backspace,individual,1,2025-07-02T13:03:44.809495,,,LoginPage,
2,c9f16bb0-fcb8-43c9-a2fd-c502ce6e5429,,4294967304,backspace,individual,2,2025-07-02T13:03:44.971532,,,LoginPage,
3,c9f16bb0-fcb8-43c9-a2fd-c502ce6e5429,,4294967304,backspace,individual,1,2025-07-02T13:03:45.117047,,,LoginPage,
4,c9f16bb0-fcb8-43c9-a2fd-c502ce6e5429,,4294967304,backspace,individual,0,2025-07-02T13:03:45.243418,,,LoginPage,


# Pre Processing

In [None]:
filtered_df = df[(df['digram_key1'].isna() | (df['digram_key1'] == '')) &
                 (df['digram_key2'].isna() | (df['digram_key2'] == ''))]

In [None]:
print(f"Original rows: {len(df)}")
print(f"Filtered rows (single-key only): {len(filtered_df)}")

Original rows: 1357
Filtered rows (single-key only): 1357


In [None]:
df['event_type'].unique()

array(['individual'], dtype=object)

In [None]:
df['key_label'].unique()

array(['backspace', '6', '5', '4', '8', '9', '0', '2', '7', '1', '3',
       'enter', 'audio volume down'], dtype=object)

In [None]:
df['field_name'].unique()

array([nan])

In [None]:
df['id'].unique()

array([nan])

In [None]:
df = df.drop(columns={'digram_key1', 'digram_key2', 'event_type', 'field_name'})

In [None]:
df = df.drop(columns={'id'})

In [None]:
df.head()

Unnamed: 0,user_id,key_code,key_label,duration_ms,timestamp,context_screen
0,c9f16bb0-fcb8-43c9-a2fd-c502ce6e5429,4294967304,backspace,2,2025-07-02T13:03:10.033934,RegisterPage
1,c9f16bb0-fcb8-43c9-a2fd-c502ce6e5429,4294967304,backspace,1,2025-07-02T13:03:44.809495,LoginPage
2,c9f16bb0-fcb8-43c9-a2fd-c502ce6e5429,4294967304,backspace,2,2025-07-02T13:03:44.971532,LoginPage
3,c9f16bb0-fcb8-43c9-a2fd-c502ce6e5429,4294967304,backspace,1,2025-07-02T13:03:45.117047,LoginPage
4,c9f16bb0-fcb8-43c9-a2fd-c502ce6e5429,4294967304,backspace,0,2025-07-02T13:03:45.243418,LoginPage


In [None]:
df = df[df['key_label'] != 'audio volume down']

In [None]:
df['key_label'].unique()

array(['backspace', '6', '5', '4', '8', '9', '0', '2', '7', '1', '3',
       'enter'], dtype=object)

In [None]:
df['context_screen'].unique()

array(['RegisterPage', 'LoginPage', 'transfer_page', 'home', 'pay_bills',
       'add_funds'], dtype=object)

# Model creation

In [None]:
df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce')
df = df.sort_values(['user_id', 'timestamp'])

In [None]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 1355 entries, 1038 to 995
Data columns (total 6 columns):
 #   Column          Non-Null Count  Dtype         
---  ------          --------------  -----         
 0   user_id         1355 non-null   object        
 1   key_code        1355 non-null   int64         
 2   key_label       1355 non-null   object        
 3   duration_ms     1355 non-null   int64         
 4   timestamp       1355 non-null   datetime64[ns]
 5   context_screen  1355 non-null   object        
dtypes: datetime64[ns](1), int64(2), object(3)
memory usage: 74.1+ KB


In [None]:
df['inter_key_time'] = df.groupby('user_id')['timestamp'].diff().dt.total_seconds().fillna(0)

In [None]:
from sklearn.preprocessing import LabelEncoder

In [None]:
le = LabelEncoder()
df['key_label_encoded'] = le.fit_transform(df['key_label'])

In [None]:
features = ['key_label_encoded', 'duration_ms', 'inter_key_time']

# Training Dataset

In [None]:
import random

In [None]:
from torch.utils.data import Dataset, DataLoader

In [None]:
import random
from torch.utils.data import Dataset
import torch

class KeystrokeSequenceDataset(Dataset):
    def __init__(self, user_sequences, n_pairs=1000):
        self.pairs = []
        self.labels = []

        users = list(user_sequences.keys())
        for _ in range(n_pairs):
            # Positive pair (same user)
            user = random.choice(users)
            seqs = user_sequences[user]
            if len(seqs) >= 2:
                a, b = random.sample(seqs, 2)
                self.pairs.append((a, b))
                self.labels.append(1)

            # Negative pair (different users)
            u1, u2 = random.sample(users, 2)
            seq1 = random.choice(user_sequences[u1])
            seq2 = random.choice(user_sequences[u2])
            self.pairs.append((seq1, seq2))
            self.labels.append(0)

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        seq1, seq2 = self.pairs[idx]
        return (
            torch.tensor(seq1, dtype=torch.float32),
            torch.tensor(seq2, dtype=torch.float32),
            torch.tensor(self.labels[idx], dtype=torch.float32),
        )


# Siamese GRU

In [None]:
import numpy as np

In [None]:
import torch.nn as nn
import torch.nn.functional as F

class SiameseGRU(nn.Module):
    def __init__(self, input_dim=3, hidden_dim=64):
        super(SiameseGRU, self).__init__()
        self.gru = nn.GRU(input_dim, hidden_dim, batch_first=True)
        self.fc = nn.Sequential(
            nn.Linear(hidden_dim, 32),
            nn.ReLU(),
            nn.Linear(32, 1),
            nn.Sigmoid()
        )

    def forward_once(self, x):
        _, h = self.gru(x)  # h: (1, batch, hidden)
        return h.squeeze(0)  # (batch, hidden)

    def forward(self, x1, x2):
      out1 = self.forward_once(x1)
      out2 = self.forward_once(x2)
      diff = torch.abs(out1 - out2)       # [batch, hidden_dim]
      return self.fc(diff)                # Now input is [batch, hidden_dim]


In [None]:
def train(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    for x1, x2, y in dataloader:
        x1, x2, y = x1.to(device), x2.to(device), y.to(device)

        optimizer.zero_grad()
        out = model(x1, x2).squeeze()
        loss = criterion(out, y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(dataloader)


In [None]:
from torch.utils.data import DataLoader

seq_len = 20
user_sequences = {}
for user_id, group in df.groupby("user_id"):
    sequences = []
    arr = group[features].values
    for i in range(0, len(arr) - seq_len, seq_len):
        sequences.append(arr[i:i+seq_len])
    if len(sequences) >= 2:
        user_sequences[user_id] = sequences

dataset = KeystrokeSequenceDataset(user_sequences, n_pairs=2000)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SiameseGRU().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.BCELoss()

for epoch in range(10):
    loss = train(model, dataloader, optimizer, criterion, device)
    print(f"Epoch {epoch+1}, Loss: {loss:.4f}")


Epoch 1, Loss: 0.5254
Epoch 2, Loss: 0.2101
Epoch 3, Loss: 0.0361
Epoch 4, Loss: 0.0051
Epoch 5, Loss: 0.0024
Epoch 6, Loss: 0.0015
Epoch 7, Loss: 0.0010
Epoch 8, Loss: 0.0007
Epoch 9, Loss: 0.0006
Epoch 10, Loss: 0.0004


In [None]:
torch.save(model.state_dict(), "siamese_gru_model.pth")
print("✅ Model weights saved as siamese_gru_model.pth")

✅ Model weights saved as siamese_gru_model.pth


In [None]:
from scipy.spatial.distance import cosine

In [None]:
user_ids = list(user_sequences.keys())
print(user_ids)
user_real = user_ids[0]
user_imposter = user_ids[1]

real_sequences = user_sequences[user_real]
imposter_sequences = user_sequences[user_imposter]

reference_seq = real_sequences[0]

same_user_seq = real_sequences[1]
different_user_seq = imposter_sequences[0]


['235fc56c-8c60-43a1-a552-a2d95b3e5743', 'bbc7f6a4-53e3-439f-8b99-1ca88483e321', 'f8e0e899-10aa-41dd-8be4-367bf324bfeb']


In [None]:
from scipy.spatial.distance import cosine

def get_embedding(model, sequence, device):
    model.eval()
    with torch.no_grad():
        sequence = torch.tensor(sequence, dtype=torch.float32).unsqueeze(0).to(device)
        embedding = model.forward_once(sequence)
    return embedding.cpu().numpy().flatten()

def similarity_score(emb1, emb2):
    return 1 - cosine(emb1, emb2)  # Higher = more similar


In [None]:
# Get embeddings
ref_emb = get_embedding(model, reference_seq, device)
same_emb = get_embedding(model, same_user_seq, device)
diff_emb = get_embedding(model, different_user_seq, device)

# Compare
score_same = similarity_score(ref_emb, same_emb)
score_diff = similarity_score(ref_emb, diff_emb)

print(f'User is : {user_real}')
print(f'Imposter is : {user_imposter}')
print(f"Similarity (same user): {score_same:.4f}")
print(f"Similarity (imposter): {score_diff:.4f}")


User is : 235fc56c-8c60-43a1-a552-a2d95b3e5743
Imposter is : bbc7f6a4-53e3-439f-8b99-1ca88483e321
Similarity (same user): 0.6846
Similarity (imposter): 0.0349


# Tensorflow Based Implementation

In [None]:
SEQ_LEN = 20
FEATURES = ['key_label_encoded', 'duration_ms', 'inter_key_time']

In [None]:
user_sequences = {}
for user_id, group in df.sort_values(['user_id','timestamp']).groupby('user_id'):
    arr = group[FEATURES].values
    seqs = [arr[i:i+SEQ_LEN] for i in range(0, len(arr) - SEQ_LEN + 1, SEQ_LEN)]
    if len(seqs) >= 2:
        user_sequences[user_id] = seqs

In [None]:
def make_pairs(user_seqs, n_pairs=2000):
    users = list(user_seqs.keys())
    X1, X2, y = [], [], []
    for _ in range(n_pairs):
        # positive pair
        u = random.choice(users)
        a, b = random.sample(user_seqs[u], 2)
        X1.append(a); X2.append(b); y.append(1.0)

        # negative pair
        u1, u2 = random.sample(users, 2)
        X1.append(random.choice(user_seqs[u1]))
        X2.append(random.choice(user_seqs[u2]))
        y.append(0.0)

    return np.array(X1), np.array(X2), np.array(y, dtype='float32')


In [None]:
from sklearn.model_selection import train_test_split

In [None]:
X1, X2, y = make_pairs(user_sequences, n_pairs=2000)
X1_tr, X1_va, X2_tr, X2_va, y_tr, y_va = train_test_split(
    X1, X2, y, test_size=0.2, random_state=42
)

In [None]:
def build_siamese_gru(input_shape=(SEQ_LEN, len(FEATURES)), hidden_dim=64):
    seq_in = Input(shape=input_shape)
    x = layers.GRU(hidden_dim)(seq_in)
    x = layers.Dense(32, activation='relu')(x)
    shared = Model(seq_in, x, name='shared_gru')

    a = Input(shape=input_shape)
    b = Input(shape=input_shape)
    da = shared(a)
    db = shared(b)
    diff = layers.Lambda(lambda t: tf.abs(t[0] - t[1]))([da, db])
    out = layers.Dense(1, activation='sigmoid')(diff)
    return Model([a, b], out)

model = build_siamese_gru()
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
model.summary()

In [None]:
model.fit(
    [X1_tr, X2_tr], y_tr,
    validation_data=([X1_va, X2_va], y_va),
    batch_size=32, epochs=10
)

Epoch 1/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 21ms/step - accuracy: 0.6740 - loss: 0.6034 - val_accuracy: 0.8037 - val_loss: 0.4100
Epoch 2/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 15ms/step - accuracy: 0.8596 - loss: 0.3446 - val_accuracy: 0.9525 - val_loss: 0.1970
Epoch 3/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 15ms/step - accuracy: 0.9761 - loss: 0.1445 - val_accuracy: 1.0000 - val_loss: 0.0576
Epoch 4/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 24ms/step - accuracy: 0.9998 - loss: 0.0459 - val_accuracy: 1.0000 - val_loss: 0.0247
Epoch 5/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 15ms/step - accuracy: 1.0000 - loss: 0.0191 - val_accuracy: 1.0000 - val_loss: 0.0143
Epoch 6/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 15ms/step - accuracy: 0.9934 - loss: 0.0267 - val_accuracy: 0.9563 - val_loss: 0.1153
Epoch 7/10
[1m100/100

<keras.src.callbacks.history.History at 0x78a207596810>

In [None]:
embedder = model.get_layer('shared_gru')

def get_emb(seq):
    return embedder.predict(seq[np.newaxis], verbose=0)[0]


In [None]:
uids = list(user_sequences)
print(uids[0])
print(uids[1])
ref_seq  = user_sequences[uids[1]][0]
same_seq = user_sequences[uids[1]][1]
imp_seq  = user_sequences[uids[0]][0]

e_ref  = get_emb(ref_seq)
e_same = get_emb(same_seq)
e_imp  = get_emb(imp_seq)

print(f"Cosine sim (same):  {1 - cosine(e_ref, e_same):.4f}")
print(f"Cosine sim (imp) :  {1 - cosine(e_ref, e_imp):.4f}")
print(f"Euclid dist (same): {euclidean(e_ref, e_same):.4f}")
print(f"Euclid dist (imp) : {euclidean(e_ref, e_imp):.4f}")

235fc56c-8c60-43a1-a552-a2d95b3e5743
bbc7f6a4-53e3-439f-8b99-1ca88483e321
Cosine sim (same):  0.9608
Cosine sim (imp) :  0.1925
Euclid dist (same): 3.2264
Euclid dist (imp) : 15.2448


In [None]:
import tensorflow as tf

In [None]:
model.save("siamese_model.keras")

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.experimental_enable_resource_variables = True
tflite_model = converter.convert()

with open("siamese_model.tflite", "wb") as f:
    f.write(tflite_model)

print("✅ TFLite model saved with resource variable support.")


Saved artifact at '/tmp/tmpr0j62qj5'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): List[TensorSpec(shape=(None, 20, 3), dtype=tf.float32, name='keras_tensor_39'), TensorSpec(shape=(None, 20, 3), dtype=tf.float32, name='keras_tensor_40')]
Output Type:
  TensorSpec(shape=(None, 1), dtype=tf.float32, name=None)
Captures:
  132635610724304: TensorSpec(shape=(), dtype=tf.resource, name=None)
  132635615401680: TensorSpec(shape=(), dtype=tf.resource, name=None)
  132635615401872: TensorSpec(shape=(), dtype=tf.resource, name=None)
  132635610720848: TensorSpec(shape=(), dtype=tf.resource, name=None)
  132635610725840: TensorSpec(shape=(), dtype=tf.resource, name=None)
  132635610725072: TensorSpec(shape=(), dtype=tf.resource, name=None)
  132635610726032: TensorSpec(shape=(), dtype=tf.resource, name=None)


ConverterError: Variable constant folding is failed. Please consider using enabling `experimental_enable_resource_variables` flag in the TFLite converter object. For example, converter.experimental_enable_resource_variables = True<unknown>:0: error: loc(callsite(callsite(fused["TensorListReserve:", "functional_3_1/shared_gru_1/gru_3_1/TensorArrayV2_1@__inference_function_45381"] at fused["StatefulPartitionedCall:", "StatefulPartitionedCall@__inference_signature_wrapper_45422"]) at fused["StatefulPartitionedCall:", "StatefulPartitionedCall_1"])): 'tf.TensorListReserve' op requires element_shape to be static during TF Lite transformation pass
<unknown>:0: note: loc(fused["StatefulPartitionedCall:", "StatefulPartitionedCall_1"]): called from
<unknown>:0: error: loc(callsite(callsite(fused["TensorListReserve:", "functional_3_1/shared_gru_1/gru_3_1/TensorArrayV2_1@__inference_function_45381"] at fused["StatefulPartitionedCall:", "StatefulPartitionedCall@__inference_signature_wrapper_45422"]) at fused["StatefulPartitionedCall:", "StatefulPartitionedCall_1"])): failed to legalize operation 'tf.TensorListReserve' that was explicitly marked illegal
<unknown>:0: note: loc(fused["StatefulPartitionedCall:", "StatefulPartitionedCall_1"]): called from
<unknown>:0: error: Lowering tensor list ops is failed. Please consider using Select TF ops and disabling `_experimental_lower_tensor_list_ops` flag in the TFLite converter object. For example, converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS, tf.lite.OpsSet.SELECT_TF_OPS]\n converter._experimental_lower_tensor_list_ops = False
