In [2]:
from PIL import Image
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as f
import matplotlib.pyplot as plt
from sklearn.metrics import roc_auc_score,f1_score,classification_report
import os
import gc



In [7]:
X = np.load("data500x50/train_data.npy")
Y = np.load("data500x50/train_label.npy")
X_v = np.load("data500x50/val_data.npy")
Y_v = np.load("data500x50/val_label.npy")

In [8]:
num_batch_train,_,batch_size_train,_,_,_ = X.shape
num_batch_val,_,batch_size_val,_,_,_ = X_v.shape

In [9]:
class model_3(nn.Module):
    def __init__(self):
        super(model_3,self).__init__()
        self.feature_ext = nn.Sequential(
            nn.Conv2d(1,5,5,padding=2),
            nn.ReLU(),
            nn.Conv2d(5,10,5,padding = 2),
            nn.MaxPool2d(2,stride=2),
            nn.Conv2d(10,15,3,padding=1),
            nn.ReLU(),
            nn.Conv2d(15,20,3,padding = 1),
            nn.MaxPool2d(2,stride=2),
            nn.Flatten(),
            nn.Linear(30000,5000),
            nn.ReLU(),
            nn.Linear(5000,1000),
        )
        self.out = nn.Sequential(
            nn.Linear(1000,50),
            nn.ReLU(),
            nn.Linear(50,2),
            nn.Softmax(dim = 1)
        )
    
    def forward(self,X1,X2):
        feature1 = self.feature_ext(X1)
        feature2 = self.feature_ext(X2)
        diff = feature1 - feature2
        diff = torch.abs(diff)
        return self.out(diff)

mod = model_3()

In [None]:
best_auc = 0.0

In [None]:
loss_fn = nn.CrossEntropyLoss()
opt = optim.Adam(mod.parameters(),lr = 0.0001)
pred_Y = np.zeros(num_batch_val*batch_size_val)
for epoch in range(20):
    total_loss = 0
    print(f"Epoch {epoch}")
    for i in range(num_batch_train):
        X1 = torch.tensor(X[i,0]).float()
        X2 = torch.tensor(X[i,1]).float()
        Y_ = torch.tensor(Y[i])
        X1 = X1
        X2 = X2
        Y_ = Y_
        opt.zero_grad()
        prob = mod(X1.float(),X2.float())
        del X2
        del X1
        loss = loss_fn(prob,Y_.long())
        total_loss += loss.item()
        loss.backward()

        opt.step()
        
        del loss
        del prob
    print(f"Training \t loss : {total_loss/num_batch_train}")
    with torch.no_grad():
        for i in range(num_batch_val):
            X1 = torch.tensor(X_v[i,0]).float()
            X2 = torch.tensor(X_v[i,1]).float()
            X1 = X1
            X2 = X2
            prob = mod(X1.float(),X2.float())
            pred_Y[i*batch_size_val:(i+1)*batch_size_val] = prob.cpu().numpy()[:,1]
            del prob
            del X2
            del X1
    print(f"Validation \t F1 : {f1_score(Y_v,(pred_Y>=0.5)*1)} \t AUC : {roc_auc_score(Y_v,pred_Y)}")
    if best_auc < roc_auc_score(Y_v,pred_Y):
        best_auc = roc_auc_score(Y_v,pred_Y)
        torch.save(mod,'best_auc_model.pth')
    gc.collect()

In [12]:
train_X = np.zeros((num_batch_train*batch_size_train,1000))
train_Y = np.zeros((num_batch_train*batch_size_train,1))
with torch.no_grad():
    for i in range(num_batch_train):
        X1 = torch.tensor(X[i,0]).float()
        X2 = torch.tensor(X[i,1]).float()
        feature1 = mod.feature_ext(X1)
        feature2 = mod.feature_ext(X2)
        diff = feature1 - feature2
        diff = torch.abs(diff)
        train_X[i*batch_size_train:(i+1)*batch_size_train,:] = diff.cpu().numpy()
        train_Y[i*batch_size_train:(i+1)*batch_size_train] = Y[i].reshape((-1,1))

In [None]:
with open('train_X.npy','wb') as f:
    np.save(f,train_X)
with open('train_Y.npy','wb') as f:
    np.save(f,train_Y)
