In [None]:
import json
import numpy as np
from datetime import datetime
from pathlib import Path
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

In [None]:
def load_data(json_path):
    with open(json_path, 'r') as f:
        return json.load(f)

In [None]:
def extract_features(user_data, win_size=10, step_size=5):
    """
    Extracts fixed-length time-windowed features per user.
    Combines accelerometer, gyroscope, and interaction event counts (tap/swipe/keypress).
    """
    sessions = {}
    for entry in user_data['unknown']:
        user_id = entry['id']
        sensor = entry['events']['sensor_events']
        taps = entry['events']['tap_events']
        swipes = entry['events']['swipe_events']
        keys = entry['events']['keypress_events']

        acc = [e for e in sensor if e['type'] == 'accelerometer']
        gyro = [e for e in sensor if e['type'] == 'gyroscope']
        features = []

        for a, g in zip(acc, gyro):
            try:
                ts_a = datetime.fromisoformat(a['timestamp'])
                ts_g = datetime.fromisoformat(g['timestamp'])
                if abs((ts_a - ts_g).total_seconds()) > 0.1:
                    continue
                f = [a['x'], a['y'], a['z'], g['x'], g['y'], g['z']]
                features.append((ts_a, f))
            except:
                continue

        if not features:
            continue

        features.sort()
        feats = [f for _, f in features]

        taps_ts = [datetime.fromisoformat(e['timestamp']) for e in taps]
        swipes_ts = [datetime.fromisoformat(e['timestamp']) for e in swipes]
        keys_ts = [datetime.fromisoformat(e['timestamp']) for e in keys]

        windows = []
        for i in range(0, len(feats) - win_size, step_size):
            window_feats = feats[i:i+win_size]
            timestamps = [features[i + j][0] for j in range(win_size)]
            start_ts, end_ts = timestamps[0], timestamps[-1]

            tap_count = sum(start_ts <= t <= end_ts for t in taps_ts)
            swipe_count = sum(start_ts <= t <= end_ts for t in swipes_ts)
            key_count = sum(start_ts <= t <= end_ts for t in keys_ts)

            flat_features = np.array(window_feats).flatten()
            full_vector = np.concatenate([flat_features, [tap_count, swipe_count, key_count]])
            windows.append(full_vector)

        if windows:
            if user_id not in sessions:
                sessions[user_id] = []
            sessions[user_id].extend(windows)

    return sessions

# Load the dataset according to user-imposter format

for each user
 - groups all entries belonging to the same user into the pairs variable
 - groups all entries not belonging to the particular user into the neg_users variable
 - repeat for each user

In [None]:
class SiameseDataset(Dataset):
    def __init__(self, features_by_user):
        self.pairs = []
        users = list(features_by_user.keys())
        for uid in users:
            samples = features_by_user[uid]
            if len(samples) < 2:
                continue
            for i in range(len(samples) - 1):
                # Positive pair (same user)
                self.pairs.append((samples[i], samples[i+1], 1.0))
                # Negative pair (different user)
                neg_users = [u for u in users if u != uid]
                if not neg_users:
                    continue
                neg_sample = features_by_user[neg_users[0]][0]
                self.pairs.append((samples[i], neg_sample, 0.0))

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        x1, x2, label = self.pairs[idx]
        return torch.tensor(x1, dtype=torch.float32), torch.tensor(x2, dtype=torch.float32), torch.tensor(label, dtype=torch.float32)

In [None]:
class SiameseNetwork(nn.Module):
    def __init__(self, input_size, embedding_dim=128):
        super(SiameseNetwork, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_size, 256),
            nn.ReLU(),
            nn.Linear(256, embedding_dim)
        )

    def forward_once(self, x):
        return self.fc(x)

    def forward(self, x1, x2):
        emb1 = self.forward_once(x1)
        emb2 = self.forward_once(x2)
        return emb1, emb2


In [None]:
def contrastive_loss(emb1, emb2, label, margin=1.0):
    distance = F.pairwise_distance(emb1, emb2)
    loss = label * distance.pow(2) + (1 - label) * F.relu(margin - distance).pow(2)
    return loss.mean()


In [None]:
def train(model, dataloader, epochs=10, lr=1e-3):
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for x1, x2, label in dataloader:
            emb1, emb2 = model(x1, x2)
            loss = contrastive_loss(emb1, emb2, label)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss:.4f}")


In [None]:
def verify(model, known_sample, test_sample, threshold=0.5):
    model.eval()
    with torch.no_grad():
        emb1, emb2 = model(
            torch.tensor(known_sample, dtype=torch.float32).unsqueeze(0),
            torch.tensor(test_sample, dtype=torch.float32).unsqueeze(0)
        )
        distance = F.pairwise_distance(emb1, emb2).item()
        return distance < threshold, distance


In [None]:
json_path = "data_store1.json"  # Change to your path if needed
raw_data = load_data(json_path)
feature_dict = extract_features(raw_data)

sample_dim = len(next(iter(feature_dict.values()))[0])
dataset = SiameseDataset(feature_dict)
dataloader = DataLoader(dataset, batch_size=16, shuffle=True)

model = SiameseNetwork(input_size=sample_dim)
train(model, dataloader, epochs=50)

# Save model
torch.save(model.state_dict(), "bba_siamese_model.pt")
print("✅ Model trained and saved.")


Epoch 1/50, Loss: 73.4508
Epoch 2/50, Loss: 17.7669
Epoch 3/50, Loss: 9.5836
Epoch 4/50, Loss: 6.4751
Epoch 5/50, Loss: 4.5523
Epoch 6/50, Loss: 3.5535
Epoch 7/50, Loss: 2.7685
Epoch 8/50, Loss: 2.2679
Epoch 9/50, Loss: 2.0403
Epoch 10/50, Loss: 1.6633
Epoch 11/50, Loss: 1.3765
Epoch 12/50, Loss: 1.2404
Epoch 13/50, Loss: 1.0839
Epoch 14/50, Loss: 0.9733
Epoch 15/50, Loss: 0.8985
Epoch 16/50, Loss: 0.7820
Epoch 17/50, Loss: 0.7550
Epoch 18/50, Loss: 0.7315
Epoch 19/50, Loss: 0.6479
Epoch 20/50, Loss: 0.5895
Epoch 21/50, Loss: 0.5705
Epoch 22/50, Loss: 0.5745
Epoch 23/50, Loss: 0.5802
Epoch 24/50, Loss: 0.5096
Epoch 25/50, Loss: 0.4909
Epoch 26/50, Loss: 0.4526
Epoch 27/50, Loss: 0.4456
Epoch 28/50, Loss: 0.5233
Epoch 29/50, Loss: 0.4242
Epoch 30/50, Loss: 0.6567
Epoch 31/50, Loss: 0.5790
Epoch 32/50, Loss: 0.4173
Epoch 33/50, Loss: 0.3159
Epoch 34/50, Loss: 0.3146
Epoch 35/50, Loss: 0.3208
Epoch 36/50, Loss: 0.2933
Epoch 37/50, Loss: 0.2755
Epoch 38/50, Loss: 0.2548
Epoch 39/50, Loss: 

In [None]:
import torch
import torch.nn.functional as F
import numpy as np

def load_model(model_path, input_size):
    model = SiameseNetwork(input_size=input_size)
    model.load_state_dict(torch.load(model_path))
    model.eval()
    return model


def verify(model, known_sample, test_sample, threshold=0.5):
    with torch.no_grad():
        emb1, emb2 = model(
            torch.tensor(known_sample, dtype=torch.float32).unsqueeze(0),
            torch.tensor(test_sample, dtype=torch.float32).unsqueeze(0)
        )
        dist = F.pairwise_distance(emb1, emb2).item()
        return dist < threshold, dist


data = load_data("data_store1.json")
feature_dict = extract_features(data)

user_ids = list(feature_dict.keys())
assert len(user_ids) >= 2, "Need at least 2 users for testing."

user1_samples = feature_dict[user_ids[0]]
user2_samples = feature_dict[user_ids[1]]

known_sample = user1_samples[0]

# Positive test: same user
test_sample_same = user1_samples[1]
# Negative test: different user
test_sample_diff = user2_samples[0]

input_size = len(known_sample)
model = load_model("bba_siamese_model.pt", input_size)

# Run tests
result_same, dist_same = verify(model, known_sample, test_sample_same)
result_diff, dist_diff = verify(model, known_sample, test_sample_diff)

print("\n📌 TEST RESULTS")
print(f"[SAME USER]   Distance: {dist_same:.4f} | Match: {result_same}")
print(f"[DIFF USER]   Distance: {dist_diff:.4f} | Match: {result_diff}")



📌 TEST RESULTS
[SAME USER]   Distance: 1.0530 | Match: False
[DIFF USER]   Distance: 1.7755 | Match: False


the above output shows distance of 1 is not enough to classify imposter

In [None]:
import torch
import torch.nn.functional as F
import numpy as np

def load_model(model_path, input_size):
    model = SiameseNetwork(input_size=input_size)
    model.load_state_dict(torch.load(model_path))
    model.eval()
    return model


def verify(model, known_sample, test_sample, threshold=0.5):
    with torch.no_grad():
        emb1, emb2 = model(
            torch.tensor(known_sample, dtype=torch.float32).unsqueeze(0),
            torch.tensor(test_sample, dtype=torch.float32).unsqueeze(0)
        )
        dist = F.pairwise_distance(emb1, emb2).item()
        return dist < threshold, dist


    # Load data again (same file used during training)
data = load_data("data_store1.json")
feature_dict = extract_features(data)

user_ids = list(feature_dict.keys())
assert len(user_ids) >= 2, "Need at least 2 users for testing."

user1_samples = feature_dict[user_ids[0]]
user2_samples = feature_dict[user_ids[1]]

# Use 1 sample from user 1 as known reference
known_sample = user1_samples[0]

# Positive test: same user
test_sample_same = user1_samples[1]
# Negative test: different user
test_sample_diff = user2_samples[0]

input_size = len(known_sample)
model = load_model("bba_siamese_model.pt", input_size)

thresholds = np.arange(0.3, 2.0, 0.1)
for t in thresholds:
    same_match, d_same = verify(model, known_sample, test_sample_same, threshold=t)
    diff_match, d_diff = verify(model, known_sample, test_sample_diff, threshold=t)
    print(f"Threshold {t:.1f} | SAME: {same_match} ({d_same:.3f}) | DIFF: {diff_match} ({d_diff:.3f})")



Threshold 0.3 | SAME: False (1.053) | DIFF: False (1.775)
Threshold 0.4 | SAME: False (1.053) | DIFF: False (1.775)
Threshold 0.5 | SAME: False (1.053) | DIFF: False (1.775)
Threshold 0.6 | SAME: False (1.053) | DIFF: False (1.775)
Threshold 0.7 | SAME: False (1.053) | DIFF: False (1.775)
Threshold 0.8 | SAME: False (1.053) | DIFF: False (1.775)
Threshold 0.9 | SAME: False (1.053) | DIFF: False (1.775)
Threshold 1.0 | SAME: False (1.053) | DIFF: False (1.775)
Threshold 1.1 | SAME: True (1.053) | DIFF: False (1.775)
Threshold 1.2 | SAME: True (1.053) | DIFF: False (1.775)
Threshold 1.3 | SAME: True (1.053) | DIFF: False (1.775)
Threshold 1.4 | SAME: True (1.053) | DIFF: False (1.775)
Threshold 1.5 | SAME: True (1.053) | DIFF: False (1.775)
Threshold 1.6 | SAME: True (1.053) | DIFF: False (1.775)
Threshold 1.7 | SAME: True (1.053) | DIFF: False (1.775)
Threshold 1.8 | SAME: True (1.053) | DIFF: True (1.775)
Threshold 1.9 | SAME: True (1.053) | DIFF: True (1.775)


above we tested different threshold values to see which predicted diff user as imposter

In [None]:
import torch
import torch.nn.functional as F
import numpy as np
from itertools import combinations

def load_model(model_path, input_size):
    model = SiameseNetwork(input_size=input_size)
    model.load_state_dict(torch.load(model_path))
    model.eval()
    return model


def verify(model, known_sample, test_sample, threshold=1.2):
    with torch.no_grad():
        emb1, emb2 = model(
            torch.tensor(known_sample, dtype=torch.float32).unsqueeze(0),
            torch.tensor(test_sample, dtype=torch.float32).unsqueeze(0)
        )
        dist = F.pairwise_distance(emb1, emb2).item()
        return dist < threshold, dist


data = load_data("data_store1.json")
feature_dict = extract_features(data)

user_ids = list(feature_dict.keys())
print(f"Found {len(user_ids)} users with data.")

# Filter out users with fewer than 2 samples
user_ids = [u for u in user_ids if len(feature_dict[u]) >= 2]

if len(user_ids) < 3:
    print("Need at least 3 users with enough data to test.")
    exit()

# Load trained model
sample_size = len(feature_dict[user_ids[0]][0])
model = load_model("bba_siamese_model.pt", input_size=sample_size)

threshold = 1.2
print(f"\n📊 Running multi-user tests with threshold = {threshold}\n")

# SAME-USER TESTS
print("🔁 SAME-USER PAIRS")
for uid in user_ids[:3]:
    samples = feature_dict[uid]
    known_sample = samples[0]
    test_sample = samples[1]
    match, dist = verify(model, known_sample, test_sample, threshold)
    print(f"[User: {uid[:6]}] Distance: {dist:.4f} | Match: {match}")

# DIFFERENT-USER TESTS
print("\n⚔️ DIFFERENT-USER PAIRS")
for u1, u2 in combinations(user_ids[:3], 2):
    s1 = feature_dict[u1][0]
    s2 = feature_dict[u2][0]
    match, dist = verify(model, s1, s2, threshold)
    print(f"[Users: {u1[:6]} vs {u2[:6]}] Distance: {dist:.4f} | Match: {match}")


Found 3 users with data.
Need at least 3 users with enough data to test.

📊 Running multi-user tests with threshold = 1.2

🔁 SAME-USER PAIRS
[User: bbc7f6] Distance: 1.0530 | Match: True
[User: c9f16b] Distance: 0.5300 | Match: True

⚔️ DIFFERENT-USER PAIRS
[Users: bbc7f6 vs c9f16b] Distance: 1.7755 | Match: False


# Above, a threshold of 1.2 was accurately able to seperate user data into real vs anomaly

---

