In [1]:
import pickle
from datetime import datetime

import numpy as np
import torch
import torch.nn as nn
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset, DataLoader

from utils_classification.Attacker import Attacker

In [2]:
with open("../data/test/In_Pop.pkl", "rb") as f:
    in_beacon = pickle.load(f)
in_beacon = np.array([np.unpackbits(np.frombuffer(genome.tobytes(), dtype=np.uint8)) for genome in in_beacon])

In [3]:
with open("../data/test/Not_In_Pop.pkl", "rb") as f:
    not_in_beacon = pickle.load(f)
not_in_beacon = np.array([np.unpackbits(np.frombuffer(genome.tobytes(), dtype=np.uint8)) for genome in not_in_beacon])

In [4]:
in_population_beacon = np.loadtxt("../data/test/In_Pop_Beacon.txt", delimiter="\t")
beacon_allele_presence = in_population_beacon[:, 0]
beacon_allele_frequency = in_population_beacon[:, 1]

In [5]:
num_genomes_in_beacon = len(in_beacon)
num_genomes_not_in_beacon = len(not_in_beacon)
num_genomes = num_genomes_in_beacon + num_genomes_not_in_beacon
num_snps = len(in_beacon[0])
target_num_snps = 40000

In [6]:
labels_in_beacon = np.ones(len(in_beacon))
labels_not_in_beacon = np.zeros(len(not_in_beacon))

In [7]:
genomes = np.concatenate((in_beacon, not_in_beacon), axis=0)
labels = np.concatenate((labels_in_beacon, labels_not_in_beacon), axis=0)

In [8]:
genomes_selected = genomes[:, :target_num_snps]
beacon_allele_presence_selected = beacon_allele_presence[:target_num_snps]
beacon_allele_frequency_selected = beacon_allele_frequency[:target_num_snps]

In [9]:
beacon_allele_presence_selected_expanded = np.broadcast_to(beacon_allele_presence_selected, (num_genomes, target_num_snps))[..., np.newaxis]
beacon_allele_frequency_selected_expanded = np.broadcast_to(beacon_allele_frequency_selected, (num_genomes, target_num_snps))[..., np.newaxis]
genomes_selected_expanded = genomes_selected[..., np.newaxis]
x = np.concatenate((genomes_selected_expanded, beacon_allele_presence_selected_expanded, beacon_allele_frequency_selected_expanded), axis=2)
y = labels

In [10]:
print(x.shape)
print(y.shape)

(800, 40000, 3)
(800,)


In [11]:
x_tensor = torch.tensor(x, dtype=torch.float32)
y_tensor = torch.tensor(y, dtype=torch.float32)

In [12]:
x_train, x_test, y_train, y_test = train_test_split(x_tensor, y_tensor, test_size=0.2, random_state=42)

In [13]:
batch_size = 32
train_dataset = TensorDataset(x_train, y_train)
test_dataset = TensorDataset(x_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [14]:
model = Attacker(input_size=3, hidden_size=64, num_layers=1, bidirectional=False, dropout=0.5)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

Attacker(
  (lstm): LSTM(3, 64, batch_first=True)
  (linear): Linear(in_features=64, out_features=1, bias=True)
)

In [15]:
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
num_epochs = 256
best_epoch = -1
best_val_loss = np.inf
best_state_dict = None
counter = 0

for epoch in range(num_epochs):
    model.train()
    train_losses = []
    for x_batch, y_batch in train_loader:
        x_batch, y_batch = x_batch.to(device), y_batch.to(device)
        optimizer.zero_grad()
        output = model(x_batch)
        loss = criterion(output, y_batch)
        loss.backward()
        optimizer.step()
        train_losses.append(loss.item())
    model.eval()
    val_losses = []
    with torch.no_grad():
        for x_batch, y_batch in test_loader:
            x_batch, y_batch = x_batch.to(device), y_batch.to(device)
            output = model(x_batch)
            loss = criterion(output, y_batch)
            val_losses.append(loss.item())
    train_loss = np.mean(train_losses)
    val_loss = np.mean(val_losses)
    print(f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")
    if val_loss < best_val_loss:
        best_epoch = epoch
        best_val_loss = val_loss
        best_state_dict = model.state_dict()
        counter = 0
model.load_state_dict(best_state_dict)
print(f"Best Model found at Epoch {best_epoch + 1}")

In [37]:
model.eval()
test_losses = []
correct = 0
total = 0
with torch.no_grad():
    for x_batch, y_batch in test_loader:
        x_batch, y_batch = x_batch.to(device), y_batch.to(device)
        output = model(x_batch)
        loss = criterion(output, y_batch)
        test_losses.append(loss.item())
        predictions = (torch.sigmoid(output) >= 0.5).float()
        correct += (predictions == y_batch).sum().item()
        total += len(y_batch)
test_loss = np.mean(test_losses)
test_accuracy = correct / total

In [38]:
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")

Test Loss: 0.6931, Test Accuracy: 0.5000


In [39]:
model.save("../models", f"attacker_{datetime.now().strftime('%Y%m%d%H%M%S')}")

In [40]:
chance = y_test.mean()
print(f"Chance: {chance}")

Chance: 0.5
