# This is a sample Jupyter Notebook

Below is an example of a code cell. 
Put your cursor into the cell and press Shift+Enter to execute it and select the next one, or click 'Run Cell' button.

Press Double Shift to search everywhere for classes, files, tool windows, actions, and settings.

To learn more about Jupyter Notebooks in PyCharm, see [help](https://www.jetbrains.com/help/pycharm/ipython-notebook-support.html).
For an overview of PyCharm, go to Help -> Learn IDE features or refer to [our documentation](https://www.jetbrains.com/help/pycharm/getting-started.html).

In [65]:
import torch
import torch.nn as nn
from numpy.random.mtrand import permutation

In [66]:
print(torch.backends.mps.is_available())

True


In [67]:
device = torch.device("cpu") # Defaults to CPU

# Example: Move a tensor or model to the MPS device
class TwoBranch(nn.Module):
    def __init__(self):
        super().__init__()
        model = nn.LSTM(input_size=3, hidden_size=8, batch_first=False, dropout=0.2, num_layers=1).to(device)
        self.left = model
        self.right = model
        self.combine = nn.Linear(16, 9).to(device)
        nn.init.xavier_uniform_(model.weight_ih_l0)
        nn.init.xavier_uniform_(model.weight_hh_l0)
    def forward(self, x_left, x_right):
        _, (l_h, _) = self.left(x_left)
        _, (r_h, _) = self.right(x_right)

        l = l_h[-1]  # last layer, shape: (batch, hidden_size)
        r = r_h[-1]

        cat = torch.cat([l, r], dim=-1)  # shape: (batch, 10)
        return self.combine(cat)          # shape: (batch, 10) -> Linear(10,1) -> (batch,1)
class BigModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.branch = TwoBranch()
        self.rest = nn.Sequential(
            nn.Linear(9, 9),
            nn.ReLU(),
            nn.Linear(9, 9),
            nn.ReLU(),
            nn.Linear(9, 1)
        ).to(device)

    def forward(self, x_left, x_right):
        x = self.branch(x_left, x_right)
        return self.rest(x)
model2=BigModel().to(device)





In [68]:
"""
batch = 32
seq_len = 64

x_left  = torch.randn(seq_len, batch, 3, device=device)
x_right = torch.randn(seq_len, batch, 3, device=device)

# labels depend on problem:
y = torch.randn(batch, 1, device=device)  # regression
"""

'\nbatch = 32\nseq_len = 64\n\nx_left  = torch.randn(seq_len, batch, 3, device=device)\nx_right = torch.randn(seq_len, batch, 3, device=device)\n\n# labels depend on problem:\ny = torch.randn(batch, 1, device=device)  # regression\n'

In [69]:
import pandas as pd
import glob
files = glob.glob("data/*.json")
people = []
for file in files:
    df = pd.read_json(file)
    drawings = []
    for drawing in df.values:
        drawing = drawing[0]
        allTriples = []
        for stroke in drawing:
            points = stroke["points"]
            triples = [(point["x"], point["y"], point["time"]) for point in points]
            allTriples.extend(triples)
        drawings.append(allTriples)
    people.append(drawings)


data = people


In [70]:
import json
from pathlib import Path

out_path = Path("output/data.json")

out_path.write_text(json.dumps(data, indent=2))

983106

In [71]:
out_path = Path("output/data.json")
loaded = json.load(out_path.open())

In [72]:
import numpy as np

training: list[list[list[tuple[int, int, int]]]] = loaded
pairedDeep = [[torch.tensor(drawing, device=device) for drawing in drawings] for drawings in training]
paired: list[torch.Tensor] = []
personIDs: list[int] = []
for personID, person in enumerate(pairedDeep):
    person = [drawing[:-1]-drawing[1:] for drawing in person]
    a = [personID] * len(person)
    personIDs.extend(a)
    paired.extend([((i[:, :] - i.min(dim=0, keepdim=True).values) / (
    (i.max(dim=0, keepdim=True).values - i.min(dim=0, keepdim=True).values)))*2-0.5 for i in person])
personIDs = np.array(personIDs)
lengths = torch.tensor([len(s) for s in paired])


def packPad(pairs: list[torch.Tensor], lengths: torch.Tensor, indeces):
    padded = nn.utils.rnn.pad_sequence([pairs[i].to(device) for i in indeces], batch_first=True).to(device)
    lengths = lengths[indeces]
    packed = nn.utils.rnn.pack_padded_sequence(
        padded,
        lengths,
        batch_first=True,
        enforce_sorted=False
    ).float().to(device)
    return packed



# Split at PERSON level to prevent any leakage
n_people = len(pairedDeep)
split_person_idx = int(n_people * 0.8)

# Get all indices for train people and test people
train_person_indices = []
test_person_indices = []

for person_id in range(n_people):
    person_mask = personIDs == person_id
    person_samples = np.where(person_mask)[0]

    if person_id < split_person_idx:
        train_person_indices.extend(person_samples)
    else:
        test_person_indices.extend(person_samples)

train_person_indices = np.array(train_person_indices)
test_person_indices = np.array(test_person_indices)

print(f"Training people: 0-{split_person_idx-1}")
print(f"Test people: {split_person_idx}-{n_people-1}")
print(f"Train samples: {len(train_person_indices)}")
print(f"Test samples: {len(test_person_indices)}")


def generate_balanced_pairs(indices, n_pairs, personIDs):
    """Generate 50/50 same-person vs different-person pairs"""
    left_indices = []
    right_indices = []

    n_same = n_pairs // 2
    n_diff = n_pairs - n_same

    # Same-person pairs
    for _ in range(n_same):
        idx = np.random.choice(indices)
        person_id = personIDs[idx]
        same_person_mask = personIDs[indices] == person_id
        same_person_indices = indices[same_person_mask]

        if len(same_person_indices) > 1:
            pair = np.random.choice(same_person_indices, size=2, replace=False)
            left_indices.append(pair[0])
            right_indices.append(pair[1])
        else:
            left_indices.append(idx)
            right_indices.append(idx)

    # Different-person pairs
    for _ in range(n_diff):
        idx1 = np.random.choice(indices)
        person1 = personIDs[idx1]
        diff_person_mask = personIDs[indices] != person1
        diff_person_indices = indices[diff_person_mask]

        if len(diff_person_indices) > 0:
            idx2 = np.random.choice(diff_person_indices)
            left_indices.append(idx1)
            right_indices.append(idx2)
        else:
            left_indices.append(idx1)
            right_indices.append(np.random.choice(indices))

    return np.array(left_indices), np.array(right_indices)


# Generate balanced pairs
n_train_pairs = len(train_person_indices)  # Reduce multiplier for stability
n_test_pairs = len(test_person_indices)

pLT, pRT = generate_balanced_pairs(train_person_indices, n_train_pairs, personIDs)
pLTest, pRTest = generate_balanced_pairs(test_person_indices, n_test_pairs, personIDs)

pairedNP = paired
lengthsNP = lengths
leftTrain = packPad(pairedNP, lengthsNP, pLT).to(device)
rightTrain = packPad(pairedNP, lengthsNP, pRT).to(device)
yTrain = (torch.tensor(personIDs[pLT] == personIDs[pRT])
          .reshape((-1, 1))
          .to(device, dtype=torch.float32))

leftTest = packPad(pairedNP, lengthsNP, pLTest).to(device)
rightTest = packPad(pairedNP, lengthsNP, pRTest).to(device)
yTest = (torch.tensor(personIDs[pLTest] == personIDs[pRTest])
         .reshape((-1, 1))
         .to(device, dtype=torch.float32))

print(f"\nTrain: {(yTrain == 1).sum().item() / len(yTrain) * 100:.1f}% same person")
print(f"Test:  {(yTest == 1).sum().item() / len(yTest) * 100:.1f}% same person")


Training people: 0-15
Test people: 16-20
Train samples: 214
Test samples: 63

Train: 50.0% same person
Test:  49.2% same person


In [73]:
str(((model2(leftTest, rightTest)>0)==(yTest>0.5)).float().mean().item())

'0.4920634925365448'

In [74]:
model2.branch.left.all_weights

[[Parameter containing:
  tensor([[-2.4465e-01,  2.7145e-01,  1.8613e-01],
          [ 1.5868e-01, -9.3852e-02,  3.0997e-01],
          [-3.2976e-01,  1.5481e-01, -4.0240e-01],
          [ 2.6991e-02, -3.4887e-01,  3.0414e-01],
          [ 5.2266e-02, -1.6494e-02, -1.3136e-01],
          [-1.6683e-01,  3.1976e-01,  2.5325e-01],
          [ 1.8633e-01,  2.3193e-01,  1.2269e-01],
          [-3.8317e-01, -1.8812e-01, -1.5951e-01],
          [ 1.6612e-01, -4.7830e-02,  2.2606e-04],
          [ 3.5345e-01,  2.2609e-01, -1.2092e-01],
          [ 2.5655e-01, -1.7458e-01,  3.8707e-01],
          [ 1.1227e-01, -1.8624e-01, -2.3755e-01],
          [ 3.4310e-01,  3.8119e-02, -4.9239e-02],
          [-8.6556e-02,  3.6173e-01,  1.2781e-01],
          [-2.5932e-01,  2.1361e-01,  3.6353e-01],
          [ 1.1038e-01, -1.4720e-01,  3.5833e-01],
          [ 3.2984e-01,  2.7034e-01,  3.6420e-01],
          [ 3.2912e-03, -5.7101e-02, -2.5856e-01],
          [-4.0496e-01, -3.8120e-01, -4.0681e-01],
       

In [75]:
epochs = 400
# BCELogits with L2 reg
criterion = nn.BCEWithLogitsLoss().to(device)
optimizer = torch.optim.AdamW(model2.parameters(), lr=0.001, weight_decay=1e-5)


In [76]:
annealer = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)
for epoch in range(epochs):
    pLTBatch, pRTBatch = generate_balanced_pairs(train_person_indices, n_train_pairs, personIDs)
    leftTrainBatch = packPad(pairedNP, lengthsNP, pLTBatch).to(device)
    rightTrainBatch = packPad(pairedNP, lengthsNP, pRTBatch).to(device)
    yTrainBatch = (torch.tensor(personIDs[pLTBatch] == personIDs[pRTBatch])
          .reshape((-1, 1))
          .to(device, dtype=torch.float32))

    epoch_loss = 0.0
    for i in range(4):
        optimizer.zero_grad()
        outputBatch = model2(leftTrainBatch, rightTrainBatch)
        lossBatch = criterion(outputBatch, yTrainBatch)

        lossBatch.backward()
        optimizer.step()
        optimizer.zero_grad()
        epoch_loss += lossBatch.item()
    annealer.step()
    epoch_loss /= 4

    # average loss for the epoch
    if epoch % 25 != 0 and epoch % 5 == 0:
        print(f"Epoch {epoch:3d} – loss: {epoch_loss:.4f} - fPC {((model2(leftTest, rightTest)>0)==(yTest>0.5)).float().mean():.4f}, {((model2(leftTrain, rightTrain)>0)==(yTrain>0.5)).float().mean():.4f}")
    if epoch % 25 == 0:
        output = model2(leftTrain, rightTrain)
        loss = criterion(output, yTrain)
        print(f"Epoch {epoch:3d} – loss: {loss:.4f} - fPC {((model2(leftTest, rightTest)>0)==(yTest>0.5)).float().mean():.4f}, {((model2(leftTrain, rightTrain)>0)==(yTrain>0.5)).float().mean():.4f}")
        if loss.item() < 0.59:
            break
output = model2(leftTrain, rightTrain)
loss = criterion(output, yTrain)
assert loss.item() <= 0.6 # if this assertion fails that means we got stuck and the model was unable to find the right strategy.
model2

Epoch   0 – loss: 0.6960 - fPC 0.4921, 0.5000


KeyboardInterrupt: 

In [None]:
output = model2(leftTrain, rightTrain)
loss = criterion(output, yTrain)
print(f"Epoch {epoch:3d} – loss: {loss:.4f} - fPC {((model2(leftTest, rightTest)>0)==(yTest>0.5)).float().mean():.4f}, {((model2(leftTrain, rightTrain)>0)==(yTrain>0.5)).float().mean():.4f}")

In [None]:
# if "pretrainedModel" in globals().keys():
#     model2 = pretrainedModel
# else:
#     pretrainedModel = model2


In [None]:
criterion = nn.BCEWithLogitsLoss().to(device)
optimizer = torch.optim.SGD(model2.parameters(), lr=0.0001, weight_decay=1e-4, momentum=0.9)

In [None]:
annealer = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=500)
for epoch in range(500):
    permute = np.random.permutation(train_person_indices.shape[0])[:8]
    pLTBatch, pRTBatch = generate_balanced_pairs(train_person_indices[permute], n_train_pairs*8, personIDs)
    leftTrainBatch = packPad(pairedNP, lengthsNP, pLTBatch).to(device)
    rightTrainBatch = packPad(pairedNP, lengthsNP, pRTBatch).to(device)
    yTrainBatch = (torch.tensor(personIDs[pLTBatch] == personIDs[pRTBatch])
          .reshape((-1, 1))
          .to(device, dtype=torch.float32))

    optimizer.zero_grad()
    epoch_loss = 0.0
    outputBatch = model2(leftTrainBatch, rightTrainBatch)
    lossBatch = criterion(outputBatch, yTrainBatch)

    lossBatch.backward()
    optimizer.step()
    annealer.step()
    optimizer.zero_grad()

    epoch_loss += lossBatch.item()

    # average loss for the epoch
    if epoch % 10 == 0:
        print(f"Epoch {epoch:3d} – loss: {epoch_loss:.4f} - fPC {((model2(leftTest, rightTest)>0)==(yTest>0.5)).float().mean():.4f}, {((model2(leftTrain, rightTrain)>0)==(yTrain>0.5)).float().mean():.4f}")



print(f"Epoch {epoch:3d} – loss: {epoch_loss:.4f} - fPC {((model2(leftTest, rightTest)>0)==(yTest>0.5)).float().mean():.4f}, {((model2(leftTrain, rightTrain)>0)==(yTrain>0.5)).float().mean():.4f}")

In [None]:
model = model2

In [None]:
ckpt = {
    'epoch'         : epoch,                     # current epoch number
    'model_state'   : model.state_dict(),        # model parameters
    'optimizer_state': optimizer.state_dict(),   # optimizer internals
    'scheduler_state': annealer.state_dict() if annealer else None,
}
torch.save(ckpt, 'model_other.pt')
torch.save(model.state_dict(), 'model_weights_other.pt')


In [None]:
import torch

def confusion_rates(yhat: torch.Tensor, y: torch.Tensor):
    tp = (yhat == 1) & (y == 1)
    tn = (yhat == 0) & (y == 0)
    fp = (yhat == 1) & (y == 0)
    fn = (yhat == 0) & (y == 1)

    TP = int(tp.sum().item())
    TN = int(tn.sum().item())
    FP = int(fp.sum().item())
    FN = int(fn.sum().item())

    eps = 1e-12
    TPR = TP / (TP + FN + eps)   # recall / sensitivity
    TNR = TN / (TN + FP + eps)   # specificity
    FPR = FP / (FP + TN + eps)   # 1 - specificity
    FNR = FN / (FN + TP + eps)   # 1 - recall

    return {"TP": TP, "TN": TN, "FP": FP, "FN": FN,
            "TPR": TPR, "TNR": TNR, "FPR": FPR, "FNR": FNR}

def metrics(model, leftTest, rightTest, yTest):
    yhat = model(leftTest, rightTest) > 0
    y = yTest
    return confusion_rates(yhat, y)

res = metrics(model2, leftTest, rightTest, yTest)

print(f"True Positive Rate (Recall) : {res['TPR']:.4f}; TP : {res['TP']}")
print(f"True Negative Rate (Spec.)  : {res['TNR']:.4f}; TN : {res['TN']}")
print(f"False Positive Rate         : {res['FPR']:.4f}; FP : {res['FP']}")
print(f"False Negative Rate         : {res['FNR']:.4f}; FN : {res['FN']}")


In [None]:
device = torch.device("cpu") # Defaults to CPU

# Example: Move a tensor or model to the MPS device
class TwoBranchFinal(nn.Module):
    def __init__(self):
        super().__init__()
        model = nn.LSTM(input_size=3, hidden_size=8, batch_first=False, num_layers=1).to(device)
        self.left = model
        self.right = model
        self.combine = nn.Linear(16, 9).to(device)
        nn.init.xavier_uniform_(model.weight_ih_l0)
        nn.init.xavier_uniform_(model.weight_hh_l0)
    def forward(self, x_left, x_right):
        _, (l_h, _) = self.left(x_left)
        _, (r_h, _) = self.right(x_right)

        l = l_h[-1]  # last layer, shape: (batch, hidden_size)
        r = r_h[-1]

        cat = torch.cat([l, r], dim=-1)  # shape: (batch, 10)
        return self.combine(cat)          # shape: (batch, 10) -> Linear(10,1) -> (batch,1)
class BigModelProd(nn.Module):
    def __init__(self):
        super().__init__()
        self.branch = TwoBranchFinal()
        self.rest = nn.Sequential(
            nn.Linear(9, 9),
            nn.ReLU(),
            nn.Linear(9, 9),
            nn.ReLU(),
            nn.Linear(9, 1)
        ).to(device)

    def forward(self, x_left, x_right):
        x = self.branch(x_left, x_right)
        return self.rest(x)

weights = torch.load('model_weights.pt', map_location=device)
model_new = BigModelProd()          # the same class you used during training
model_new.load_state_dict(weights)
model_new.to(device)

res = metrics(model_new, leftTest, rightTest, yTest)
print("Testing:")
print(f"True Positive Rate (Recall) : {res['TPR']:.4f}; TP : {res['TP']}")
print(f"True Negative Rate (Spec.)  : {res['TNR']:.4f}; TN : {res['TN']}")
print(f"False Positive Rate         : {res['FPR']:.4f}; FP : {res['FP']}")
print(f"False Negative Rate         : {res['FNR']:.4f}; FN : {res['FN']}")
print()
res = metrics(model_new, leftTrain, rightTrain, yTrain)
print("Training:")
print(f"True Positive Rate (Recall) : {res['TPR']:.4f}; TP : {res['TP']}")
print(f"True Negative Rate (Spec.)  : {res['TNR']:.4f}; TN : {res['TN']}")
print(f"False Positive Rate         : {res['FPR']:.4f}; FP : {res['FP']}")
print(f"False Negative Rate         : {res['FNR']:.4f}; FN : {res['FN']}")

positive: np.ndarray = (nn.Sigmoid()(model_new(leftTest, rightTest) - yTest)).detach().numpy()[yTest == 1]
negative: np.ndarray = (nn.Sigmoid()(model_new(leftTest, rightTest) - yTest)).detach().numpy()[yTest == 0]
positive.resize(max(negative.size,positive.size))
negative.resize(max(positive.size,negative.size))
np.vstack((positive,negative))
positive,negative