In [1]:
import pickle
import numpy as np
import random
np.random.seed(42)

In [2]:
X = pickle.load(open('data/X.pkl', 'rb'))

In [3]:
samples = 2000
train_videos = {}
test_videos = {}

for video in X.keys():
    actor = int(video[9:12])
    action = int(video[17:20])
    
    if len(X[video]) == 0: continue
    if action > 60:
        if actor not in test_videos:
            test_videos[actor] = []
        test_videos[actor].append(X[video])    
    else:
        if actor not in train_videos:
            train_videos[actor] = []
        train_videos[actor].append(X[video])

In [4]:
train = []
test = []

# Same
for i in range(samples):
    actor = random.choice(list(train_videos.keys()))
    vid1 = random.choice(train_videos[actor])
    vid2 = random.choice(train_videos[actor])
    train.append([(vid1, vid2), 1])

    actor = random.choice(list(test_videos.keys()))
    vid1 = random.choice(test_videos[actor])
    vid2 = random.choice(test_videos[actor])
    test.append([(vid1, vid2), 1])

# Diff
for i in range(samples):
    actor = random.choice(list(train_videos.keys()))
    vid1 = random.choice(train_videos[actor])
    actor2 = random.choice(list(train_videos.keys()))
    while actor != actor2:
        actor2 = random.choice(list(train_videos.keys()))
    vid2 = random.choice(train_videos[actor2])
    train.append([(vid1, vid2), 0])

    actor = random.choice(list(test_videos.keys()))
    vid1 = random.choice(test_videos[actor])
    actor2 = random.choice(list(test_videos.keys()))
    while actor != actor2:
        actor2 = random.choice(list(test_videos.keys()))
    vid2 = random.choice(test_videos[actor2])
    test.append([(vid1, vid2), 0])


In [5]:
from torch.utils.data import Dataset

class NTUData(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        d = self.data[idx]
        return np.array(d[0][0]).flatten(), np.array(d[0][1]).flatten(), d[1]

In [6]:
train_data = NTUData(train)
test_data = NTUData(test)

In [7]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class LinkageAttackModel(nn.Module):
    def __init__(self):
        super(LinkageAttackModel, self).__init__()
        self.linear1 = nn.Linear(22500, 512)
        self.linear2 = nn.Linear(2 * 512, 1)

    def forward(self, x1, x2):
        x1 = x1.view(x1.shape[0], -1)
        x2 = x2.view(x2.shape[0], -1)
        x1 = F.relu(self.linear1(x1))
        x2 = F.relu(self.linear1(x2))
        x = torch.cat([x1, x2], 1)
        x = self.linear2(x)
        return torch.sigmoid(x)

In [8]:
def init_weights(m):
    if type(m) == nn.Linear:
        nn.init.xavier_uniform_(m.weight)
        m.bias.data.fill_(0.01)

In [9]:
from torch.utils.data import DataLoader

model = LinkageAttackModel().cuda()
model.apply(init_weights)

optimizer = torch.optim.Adam(model.parameters(), lr=0.1)
criterion = nn.BCELoss()

train_loader = DataLoader(train_data, batch_size=32, shuffle=True, drop_last=True)
test_loader = DataLoader(test_data, batch_size=32, drop_last=True)

for epoch in range(10):
    model.train()
    losses = []
    accuracy = []
    for x1, x2, y in train_loader:
        x1 = x1.float().cuda()
        x2 = x2.float().cuda()
        y = y.float().cuda()

        optimizer.zero_grad()
        y_hat = model(x1, x2).squeeze()
        y_hat = torch.round(y_hat)
        loss = criterion(y_hat, y)
        accuracy.append((y_hat == y).sum().item() / y.shape[0])
        losses.append(loss.item())
        loss.backward()
        optimizer.step()

        # Gradient monitoring
        for name, param in model.named_parameters():
            if param.grad is not None:
                print(f"{name} gradient: {param.grad.data.norm().item()}")

    
    test_loss = []
    test_accuracy = []
    model.eval()
    with torch.no_grad():
        for x1, x2, y in test_loader:
            x1 = x1.float().cuda()
            x2 = x2.float().cuda()
            y = y.float().cuda()

            y_hat = model(x1, x2).squeeze()
            y_hat = torch.round(y_hat)
            loss = criterion(y_hat, y)
            test_loss.append(loss.item())
            test_accuracy.append((y_hat == y).sum().item() / y.shape[0])


    print('Epoch: {}, Train Loss: {}, Train Accuracy: {}, Test Loss: {}, Test Accuracy: {}'.format(epoch+1, np.mean(losses), np.mean(accuracy), np.mean(test_loss), np.mean(test_accuracy)))
    

linear1.weight gradient: 0.0
linear1.bias gradient: 0.0
linear2.weight gradient: 0.0
linear2.bias gradient: 0.0
linear1.weight gradient: 0.0
linear1.bias gradient: 0.0
linear2.weight gradient: 0.0
linear2.bias gradient: 0.0
linear1.weight gradient: 0.0
linear1.bias gradient: 0.0
linear2.weight gradient: 0.0
linear2.bias gradient: 0.0
linear1.weight gradient: 0.0
linear1.bias gradient: 0.0
linear2.weight gradient: 0.0
linear2.bias gradient: 0.0
linear1.weight gradient: 0.0
linear1.bias gradient: 0.0
linear2.weight gradient: 0.0
linear2.bias gradient: 0.0
linear1.weight gradient: 0.0
linear1.bias gradient: 0.0
linear2.weight gradient: 0.0
linear2.bias gradient: 0.0
linear1.weight gradient: 0.0
linear1.bias gradient: 0.0
linear2.weight gradient: 0.0
linear2.bias gradient: 0.0
linear1.weight gradient: 0.0
linear1.bias gradient: 0.0
linear2.weight gradient: 0.0
linear2.bias gradient: 0.0
linear1.weight gradient: 0.0
linear1.bias gradient: 0.0
linear2.weight gradient: 0.0
linear2.bias gradie