In [1]:
import torch
from torch import nn, Tensor
from torch.nn import TransformerEncoder, TransformerEncoderLayer
import math
import numpy as np
# from DataBundle import *

In [2]:
class BinaryClassification(nn.Module):
  def __init__(self, embed_size, device):
    super(BinaryClassification, self).__init__()
    # Number of input features is embed_size. (108*7)
    self.layer_1 = nn.Linear(embed_size, 64)
    self.layer_2 = nn.Linear(64, 64)
    self.layer_out = nn.Linear(64, 1)

    self.relu = nn.ReLU()
    self.dropout = nn.Dropout(p=0.1)
    self.batchnorm1 = nn.BatchNorm1d(64)
    self.batchnorm2 = nn.BatchNorm1d(64)
    self.device = device
    #self.sigmoid = nn.Sigmoid()

  def forward(self, inputs):
    #print(inputs)
    #print(inputs.shape)
    x = self.relu(self.layer_1(inputs))
    x = self.batchnorm1(x)
    x = self.relu(self.layer_2(x))
    x = self.batchnorm2(x)
    x = self.dropout(x)
    x = self.layer_out(x)
    #print(x)
    #x = self.sigmoid(x)
    #if math.isnan (x[0][0]):
    #  print(src)

    return x

In [3]:
class Classifier(nn.Module):
  def __init__(self, d_model_tx, seq_len_tx, nhead_tx, dim_feedforward, nlayers_tx, device, dropout = 0.5):
    super(Classifier, self).__init__()
    self.d_model_tx = d_model_tx

    self.seq_len_tx = seq_len_tx

    self.nhead_tx = nhead_tx

    self.dim_feedforward = dim_feedforward
    self.nlayers_tx = nlayers_tx

    self.device = device
    #self.pos_encoder = PositionalEncoding(d_model, dropout)
    self.position_embedding_tx = nn.Embedding(seq_len_tx, d_model_tx)

    encoder_layer_tx = TransformerEncoderLayer(d_model_tx, nhead_tx, dim_feedforward, dropout, batch_first=True)

    self.encoder_tx = TransformerEncoder(encoder_layer_tx, nlayers_tx)

    self.binary_classifier = BinaryClassification((seq_len_tx*d_model_tx), device)


  def forward(self, src_tx: Tensor) -> Tensor:
    #print("Classifier forwrd")
    #print(src_rw)

    N, seq_length, embed_size = src_tx.shape
    positions_tx = torch.arange(0, seq_length).expand(N, seq_length).to(self.device)
    src_tx_ = src_tx + self.position_embedding_tx(positions_tx)


    #print(f"src after positional embeddings: {src.shape}")
    #print(src)
    #print("before encoder")
    output_tx = self.encoder_tx(src_tx_)

    #print(output_rw)
    output_tx_f = torch.reshape(output_tx, (N, seq_length*embed_size))

    #print(output_tx_f)
    #print(f"encoder output shape: {output.shape}")
    #print(output)
    #print("after encoder")
    output = self.binary_classifier(torch.cat((output_tx_f), dim=1)) ##
    return output

In [4]:
class Classifier(nn.Module):
  def __init__(self, d_model_tx, seq_len_tx, nhead_tx, dim_feedforward, nlayers_tx, device, dropout=0.5):
    super(Classifier, self).__init__()
    self.d_model_tx = d_model_tx
    self.seq_len_tx = seq_len_tx
    self.nhead_tx = nhead_tx
    self.  = dim_feedforward
    self.nlayers_tx = nlayers_tx
    self.device = device

    # Embedding layer for positional encoding
    self.position_embedding_tx = nn.Embedding(seq_len_tx, d_model_tx)

    # Transformer Encoder Layer
    encoder_layer_tx = TransformerEncoderLayer(d_model_tx, nhead_tx, dim_feedforward, dropout, batch_first=True)
    
    # Transformer Encoder
    self.encoder_tx = TransformerEncoder(encoder_layer_tx, nlayers_tx)

    # Binary Classification Layer
    # This assumes you have a BinaryClassification class defined elsewhere
    self.binary_classifier = BinaryClassification((seq_len_tx * d_model_tx), device)

  def forward(self, src_tx: Tensor) -> Tensor:
    # Calculate positional embeddings
    N, seq_length, embed_size = src_tx.shape
    positions_tx = torch.arange(0, seq_length).expand(N, seq_length).to(self.device)
    src_tx_ = src_tx + self.position_embedding_tx(positions_tx)

    # Pass the source through the Transformer Encoder
    output_tx = self.encoder_tx(src_tx_)

    # Flatten the output for binary classification
    output_tx_f = torch.reshape(output_tx, (N, seq_length * embed_size))

    # Concatenate and pass through the binary classifier
    output = self.binary_classifier(torch.cat((output_tx_f), dim=1))

    return output


In [5]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [6]:
#hyperparameters
dim_feedforward=16
#dim_feedforward=32
nlayers_tx=8


In [7]:
model = Classifier(d_model_tx=7,  seq_len_tx=108,nhead_tx=7,  dim_feedforward=16, nlayers_tx=nlayers_tx, device=device)
model.to(device)



Classifier(
  (position_embedding_tx): Embedding(108, 7)
  (encoder_tx): TransformerEncoder(
    (layers): ModuleList(
      (0-7): 8 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=7, out_features=7, bias=True)
        )
        (linear1): Linear(in_features=7, out_features=16, bias=True)
        (dropout): Dropout(p=0.5, inplace=False)
        (linear2): Linear(in_features=16, out_features=7, bias=True)
        (norm1): LayerNorm((7,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((7,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.5, inplace=False)
        (dropout2): Dropout(p=0.5, inplace=False)
      )
    )
  )
  (binary_classifier): BinaryClassification(
    (layer_1): Linear(in_features=756, out_features=64, bias=True)
    (layer_2): Linear(in_features=64, out_features=64, bias=True)
    (layer_out): Linear(in_features=64, out_features=1, bias=True)
    

In [None]:
#load data
from torch.utils.data import Dataset, DataLoader

In [None]:
#hyperparameters
EPOCHS = 10
BATCH_SIZE = 8
LEARNING_RATE = 0.001

In [None]:
train_loader = DataLoader(dataset=train_data, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(dataset=val_data, batch_size=1)
test_loader = DataLoader(dataset=test_data, batch_size=1)

In [None]:
import torch.optim as optim
criterion = nn.BCEWithLogitsLoss()
#criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

In [None]:
def binary_acc(y_pred, y_test):
    y_pred_tag = torch.round(torch.sigmoid(y_pred))

    correct_results_sum = (y_pred_tag == y_test).sum().float()
    acc = correct_results_sum/y_test.shape[0]
    acc = torch.round(acc * 100)

    return acc

In [None]:
for e in range(1, EPOCHS+1):
    epoch_loss = 0
    epoch_acc = 0
    model.train()
    for X1_batch, y_batch in train_loader:
        #print("w.requires_grad:",X_batch.requires_grad)
        X1_batch, y_batch = X1_batch.to(device),y_batch.to(device)
        optimizer.zero_grad()

        y_pred = model(X1_batch.float())
        #y_pred = (y_pred>0.5).float() 
        #print(f"y_pred = {y_pred}")
        #print(f"y_batch.unsqueeze(1) = {y_batch.unsqueeze(1)}")
        #print(y_pred)
        #print(y_batch)

        loss = criterion(y_pred.float(), y_batch.unsqueeze(1).float())
        acc = binary_acc(y_pred.float(), y_batch.unsqueeze(1).float())

        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        epoch_acc += acc.item()

    val_loss = 0
    val_accuracy = 0
    model.eval()
    for X1_batch, y_batch in val_loader:
        #print("w.requires_grad:",X_batch.requires_grad)
        X1_batch,  y_batch = X1_batch.to(device),  y_batch.to(device)

        y_pred = model(X1_batch.float())
        loss = criterion(y_pred.float(), y_batch.unsqueeze(1).float())
        acc = binary_acc(y_pred.float(), y_batch.unsqueeze(1).float())
        val_loss += loss.item()
        val_accuracy += acc.item()

    print(f'Epoch {e+0:03}: | Loss: {epoch_loss/len(train_loader):.5f} | Acc: {epoch_acc/len(train_loader):.3f} | Val_Loss: {val_loss/len(val_loader):.5f} | Val_Acc: {val_accuracy/len(val_loader):.3f}')

In [None]:
#PATH = './DeFi_Latest_v2.pth'
#PATH = './DeFi_Latest_v3.pth' #transformer layers=4,4
#PATH = './DeFi_Latest_v4.pth' #tranformer layers=8,8
PATH = './DeFi_Latest_v4_1.pth' #tranformer layers=8,8
#PATH = './DeFi_Latest_v5.pth' #transformer layers=12,12
#PATH = './DeFi_Latest_v6.pth' #tranformer layers=8,8 epoch=50
#PATH = './DeFi_Latest_v7.pth' #tranformer layers=8,8, ff=32, epoch=50
#PATH = './DeFi_Latest_v4_2.pth' #tranformer layers=8,8, ff=32, epoch=10
#torch.save(model, PATH)

Performance metrics

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
# import seaborn as sn
import pandas as pd

In [None]:
y_pred = []
y_true = []
sigmoid = nn.Sigmoid()

for X1_batch, y_batch in test_loader:
        #print("w.requires_grad:",X_batch.requires_grad)
        X1_batch, X2_batch, y_batch = X1_batch.to(device), X2_batch.to(device), y_batch.to(device)
        y_batch_pred = torch.round(sigmoid(loaded_model(X1_batch.float())))
        y_pred.extend(y_batch_pred.cpu().detach().numpy())
        y_true.extend(y_batch.cpu().detach().numpy())

In [None]:
#Test Dataset
count_true = 0
for i in range(len(y_pred)):
  if y_true[i]==y_pred[i]:
    count_true+=1
acc = count_true/len(y_pred)
print(len(y_test))
print(acc)

In [None]:
tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
(tn, fp, fn, tp)

In [None]:
cf_matrix = confusion_matrix(y_true, y_pred)
df_cm = pd.DataFrame(cf_matrix / np.sum(cf_matrix, axis=1), index = ['0','1'],
                     columns = ['0','1'])
plt.figure(figsize = (12,7))
sn.heatmap(df_cm, annot=True)
plt.savefig('output.png')