In [2]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense
from sklearn.preprocessing import StandardScaler
import math

In [None]:
#distance beteen 2 GPS coordinates

def calculate_distance(lat1, lon1, lat2, lon2):
    """
    Calculate the great-circle distance between two GPS points using the Haversine formula.

    Parameters:
    lat1, lon1 -- latitude and longitude of first point (in decimal degrees)
    lat2, lon2 -- latitude and longitude of second point (in decimal degrees)

    Returns:
    Distance in kilometers
    """
    # Earth radius in kilometers
    R = 6371.0

    # Convert degrees to radians
    phi1 = math.radians(lat1)
    phi2 = math.radians(lat2)
    delta_phi = math.radians(lat2 - lat1)
    delta_lambda = math.radians(lon2 - lon1)

    # Haversine formula
    a = math.sin(delta_phi / 2) ** 2 + \
        math.cos(phi1) * math.cos(phi2) * math.sin(delta_lambda / 2) ** 2

    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))

    distance = R * c

    return distance


In [None]:
# Example feature names: keystroke timing (ms), mouse speed (px/s), click rate (clicks/min)
feature_names = ['keystroke_mean', 'keystroke_std', 'mouse_speed', 'click_rate']

# Assume at registration you captured these for the user:
D_mean = np.array([120, 30, 500, 20])   # Example mean template
D_std = np.array([10, 5, 100, 5])       # Example std dev template

# Live session feature vector (captured every 30s)
v = np.array([150, 40, 600, 25])  # Example observed features

# Compute z-scores
z = np.abs((v - D_mean) / D_std)

# Anomaly score = average z-score (or could be weighted sum)
anomaly_score = np.mean(z)

print(f"Z-scores: {z}")
print(f"Anomaly score: {anomaly_score:.2f}")

# Rule-based checks
rule_flags = []

# Example: abnormal mouse speed
if v[2] > 1000:
    rule_flags.append("Abnormally high mouse speed")

# Example: keystroke mean outside realistic range
if v[0] > 300 or v[0] < 50:
    rule_flags.append("Keystroke timing abnormal")

loc_distance = calculate_distance() # values to be added
if loc_distance>10 and not travel_flag{
    rule_flags.append("Suspicion in location")
}

# Decision thresholds
THRESHOLD_PASS = 1.5
THRESHOLD_ESCALATE_T2 = 2.5

# Combine score + rules
if anomaly_score < THRESHOLD_PASS and not rule_flags:
    decision = "PASS"
elif anomaly_score < THRESHOLD_ESCALATE_T2 and not rule_flags:
    decision = "ESCALATE TO T2"
else:
    decision = "ESCALATE TO T3"

print(f"Decision: {decision}")
if rule_flags:
    print("Rule flags triggered:")
    for flag in rule_flags:
        print(f" - {flag}")


In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import random
from sklearn.preprocessing import StandardScaler

# === 1️⃣ Synthetic data generator ===
def generate_user_profile():
    accuracy = np.random.uniform(70, 95)
    flight_time = np.random.uniform(250, 400)
    hold_time = np.random.uniform(7, 12)
    tap_rhythm = np.random.uniform(250, 320)
    correct_chars = int(accuracy / 100 * 100)
    error_rate = 100 - accuracy
    total_time = np.random.uniform(30, 50)
    total_words = int(correct_chars / 5)
    cpm = correct_chars / (total_time / 60)
    wpm = total_words / (total_time / 60)
    return np.array([accuracy, flight_time, hold_time, tap_rhythm,
                     correct_chars, error_rate, total_time, total_words, cpm, wpm])

def generate_sample(user_base):
    jitter = np.array([
        np.random.normal(0, 2),
        np.random.normal(0, 10),
        np.random.normal(0, 0.5),
        np.random.normal(0, 10),
        np.random.normal(0, 2),
        np.random.normal(0, 1),
        np.random.normal(0, 1),
        np.random.normal(0, 1),
        np.random.normal(0, 5),
        np.random.normal(0, 1)
    ])
    return user_base + jitter

num_users = 20
samples_per_user = 15
input_dim = 10

users_data = {}
for u in range(num_users):
    base = generate_user_profile()
    users_data[u] = np.array([generate_sample(base) for _ in range(samples_per_user)])

# Normalize
all_samples = np.vstack([users_data[u] for u in users_data])
scaler = StandardScaler().fit(all_samples)
for u in users_data:
    users_data[u] = scaler.transform(users_data[u])

def create_triplets(users_data):
    triplets = []
    users = list(users_data.keys())
    for u in users:
        positives = users_data[u]
        for i in range(len(positives)):
            anchor = positives[i]
            pos = positives[np.random.choice([j for j in range(len(positives)) if j != i])]
            neg_user = np.random.choice([x for x in users if x != u])
            neg_pool = users_data[neg_user]
            distances = np.linalg.norm(neg_pool - anchor, axis=1)
            hard_neg = neg_pool[np.argmin(distances)]
            triplets.append((anchor, pos, hard_neg))
    return triplets

triplets = create_triplets(users_data)

# === 2️⃣ Complex model ===
class ComplexSiameseNet(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.block1 = nn.Sequential(
            nn.utils.weight_norm(nn.Linear(input_dim, 256)),
            nn.LayerNorm(256),
            nn.GELU(),
            nn.Dropout(0.3)
        )
        self.block2 = nn.Sequential(
            nn.utils.weight_norm(nn.Linear(256, 128)),
            nn.LayerNorm(128),
            nn.GELU(),
            nn.Dropout(0.3)
        )
        self.block3 = nn.Sequential(
            nn.utils.weight_norm(nn.Linear(128, 128)),
            nn.LayerNorm(128),
            nn.GELU(),
            nn.Dropout(0.3)
        )
        self.block4 = nn.Sequential(
            nn.utils.weight_norm(nn.Linear(128, 64)),
            nn.LayerNorm(64),
            nn.GELU(),
            nn.Dropout(0.3)
        )
        self.block5 = nn.Sequential(
            nn.utils.weight_norm(nn.Linear(64, 32)),
            nn.LayerNorm(32),
            nn.GELU()
        )
        self.block6 = nn.Linear(32, 8)

    def forward_once(self, x):
        x = self.block1(x)
        x = self.block2(x)
        x = x + self.block3(x)  # residual
        x = self.block4(x)
        x = self.block5(x)
        x = self.block6(x)
        return x

    def forward(self, a, p, n):
        return self.forward_once(a), self.forward_once(p), self.forward_once(n)

class TripletLoss(nn.Module):
    def __init__(self, margin=0.5):
        super().__init__()
        self.margin = margin

    def forward(self, a, p, n):
        pos_dist = F.pairwise_distance(a, p)
        neg_dist = F.pairwise_distance(a, n)
        return F.relu(pos_dist - neg_dist + self.margin).mean()

# === 3️⃣ Train ===
model = ComplexSiameseNet(input_dim)
criterion = TripletLoss()
optimizer = optim.AdamW(model.parameters(), lr=1e-3)

epochs = 50
batch_size = 16
for epoch in range(epochs):
    random.shuffle(triplets)
    total_loss = 0
    model.train()
    for i in range(0, len(triplets), batch_size):
        batch = triplets[i:i+batch_size]
        anchors = torch.tensor([x[0] for x in batch], dtype=torch.float32)
        pos = torch.tensor([x[1] for x in batch], dtype=torch.float32)
        neg = torch.tensor([x[2] for x in batch], dtype=torch.float32)

        optimizer.zero_grad()
        a_out, p_out, n_out = model(anchors, pos, neg)
        loss = criterion(a_out, p_out, n_out)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * len(batch)
    avg_loss = total_loss / len(triplets)
    print(f"Epoch {epoch+1}/{epochs} | Loss: {avg_loss:.4f}")

def compute_embedding(model, v):
    with torch.no_grad():
        v = torch.tensor(v, dtype=torch.float32).unsqueeze(0)
        emb = model.forward_once(v)
        return emb.squeeze().numpy()

# Example: compare user 0's sample with user 1's sample
emb1 = compute_embedding(model, users_data[0][0])
emb2 = compute_embedding(model, users_data[1][0])
distance = np.linalg.norm(emb1 - emb2)
print(f"Distance between user 0 and 1 sample: {distance:.4f}")

  WeightNorm.apply(module, name, dim)
  anchors = torch.tensor([x[0] for x in batch], dtype=torch.float32)


Epoch 1/50 | Loss: 0.3190
Epoch 2/50 | Loss: 0.1512
Epoch 3/50 | Loss: 0.1062
Epoch 4/50 | Loss: 0.0978
Epoch 5/50 | Loss: 0.0806
Epoch 6/50 | Loss: 0.0845
Epoch 7/50 | Loss: 0.0841
Epoch 8/50 | Loss: 0.0952
Epoch 9/50 | Loss: 0.0770
Epoch 10/50 | Loss: 0.0828
Epoch 11/50 | Loss: 0.0664
Epoch 12/50 | Loss: 0.0647
Epoch 13/50 | Loss: 0.0633
Epoch 14/50 | Loss: 0.0617
Epoch 15/50 | Loss: 0.0666
Epoch 16/50 | Loss: 0.0589
Epoch 17/50 | Loss: 0.0499
Epoch 18/50 | Loss: 0.0453
Epoch 19/50 | Loss: 0.0491
Epoch 20/50 | Loss: 0.0374
Epoch 21/50 | Loss: 0.0486
Epoch 22/50 | Loss: 0.0401
Epoch 23/50 | Loss: 0.0435
Epoch 24/50 | Loss: 0.0495
Epoch 25/50 | Loss: 0.0444
Epoch 26/50 | Loss: 0.0373
Epoch 27/50 | Loss: 0.0293
Epoch 28/50 | Loss: 0.0459
Epoch 29/50 | Loss: 0.0335
Epoch 30/50 | Loss: 0.0275
Epoch 31/50 | Loss: 0.0306
Epoch 32/50 | Loss: 0.0424
Epoch 33/50 | Loss: 0.0332
Epoch 34/50 | Loss: 0.0491
Epoch 35/50 | Loss: 0.0279
Epoch 36/50 | Loss: 0.0365
Epoch 37/50 | Loss: 0.0279
Epoch 38/5

In [12]:
def compute_embedding(model, v):
    with torch.no_grad():
        v = torch.tensor(v, dtype=torch.float32).unsqueeze(0)
        emb = model.forward_once(v)
        return emb.squeeze().numpy()

# Example: compare user 0's sample with user 1's sample
emb1 = compute_embedding(model, users_data[0][0])
emb2 = compute_embedding(model, users_data[1][0])
distance = np.linalg.norm(emb1 - emb2)
print(f"Distance between user 0 and 1 sample: {distance:.4f}")

Distance between user 0 and 1 sample: 3.4323
