# Phase2 3 Detection

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, Dataset

import torchvision.utils
from torchvision import models

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

folder_path = ''

approach = ''

In [None]:
def load_concatenate_data(approach='path', alpha=0, layer=''):
  if approach != 'shap' and approach != 'most' and approach != 'path' and approach != 'single':
    print("Error approach")
    return 0

  elif approach == 'shap' or approach == 'most':
    data_deepfool = np.load(folder_path + 'deepfool_layer_'+str(approach)+'.npy')
    label_deepfool = np.load(folder_path + 'deepfool_pred_'+str(approach)+'.npy')
    data_fgsm = np.load(folder_path + 'fgsm_layer_'+str(approach)+'.npy')
    label_fgsm = np.load(folder_path + 'fgsm_pred_'+str(approach)+'.npy')
    data_pgd = np.load(folder_path + 'pgd_layer_'+str(approach)+'.npy')
    label_pgd = np.load(folder_path + 'pgd_pred_'+str(approach)+'.npy')

  elif approach == 'path':
    if alpha != 0:
      data_deepfool = np.load(folder_path + str(alpha)+'deepfool_layer_path.npy')
      label_deepfool = np.load(folder_path + str(alpha)+'deepfool_pred_path.npy')
      data_fgsm = np.load(folder_path + str(alpha)+'fgsm_layer_path.npy')
      label_fgsm = np.load(folder_path + str(alpha)+'fgsm_pred_path.npy')
      data_pgd = np.load(folder_path + str(alpha)+'pgd_layer_path.npy')
      label_pgd = np.load(folder_path + str(alpha)+'pgd_pred_path.npy')
    else:
      print("Error alpha")
      return 0
  
  elif approach == 'single':
    if layer != '':
      data_deepfool = np.load(folder_path + 'deepfool_'+str(layer)+'_shap.npy')
      label_deepfool = np.load(folder_path + 'deepfool_'+str(layer)+'_pred.npy')
      data_fgsm = np.load(folder_path + 'fgsm_'+str(layer)+'_shap.npy')
      label_fgsm = np.load(folder_path + 'fgsm_'+str(layer)+'_pred.npy')
      data_pgd = np.load(folder_path + 'pgd_'+str(layer)+'_shap.npy')
      label_pgd = np.load(folder_path + 'pgd_'+str(layer)+'_pred.npy')
    else:
      print("Error layer")
      return 0
  
  print(data_deepfool.shape, label_deepfool.shape)
  print(data_fgsm.shape, label_fgsm.shape)
  print(data_pgd.shape, label_pgd.shape)
  
  data = np.concatenate((data_deepfool, data_fgsm, data_pgd))
  label = np.concatenate((label_deepfool, label_fgsm, label_pgd))
  print(data.shape, label.shape)

  return data, label

In [None]:
detect_data, detect_label = load_concatenate_data(approach=approach, alpha=0.95)

detect_data = torch.Tensor(detect_data)
detect_label = pd.get_dummies(detect_label).to_numpy()
detect_label = torch.Tensor(detect_label)
print(detect_data.shape, detect_label.shape)

data_shape = torch.flatten(detect_data, start_dim=1)

In [None]:
BATCH_SIZE = 512
dtype = torch.float32

def shuffle_data(datas, labels):
  #8:1:1
  x_train, x_val_test, y_train, y_val_test = train_test_split(datas, labels, test_size=0.2, random_state=1028, shuffle=True)
  x_val, x_test, y_val, y_test = train_test_split(x_val_test, y_val_test, test_size=0.5, random_state=1028)

  train_dataset = TensorDataset(x_train, y_train)
  val_dataset = TensorDataset(x_val, y_val)
  test_dataset = TensorDataset(x_test, y_test)

  print("Training: ", len(train_dataset))
  print("Validation: ", len(val_dataset))
  print("Testing: ", len(test_dataset))

  train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True) 
  val_dataloader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=True) 
  test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True)

  return train_dataloader, val_dataloader, test_dataloader

In [None]:
def loss_plot(train, val, time):
  train_losses = torch.tensor(train, device='cpu')
  val_losses = torch.tensor(val, device='cpu')
  plt.plot(train_losses)
  plt.plot(val_losses)
  plt.title('Loss')
  plt.xlabel('Epochs')
  plt.savefig(folder_path+str(time+1)+'_loss.png')
  plt.show()
  plt.close()

def accuracy_plot(train, val, time):
  train_accuracy = torch.tensor(train, device='cpu')
  val_accuracy = torch.tensor(val, device='cpu')
  plt.plot(train_accuracy)
  plt.plot(val_accuracy)
  plt.title('Accuracy')
  plt.xlabel('Epochs')
  plt.savefig(folder_path+str(time+1)+'_acc.png')
  plt.show()
  plt.close()

def matrix_plot(true, pred, time):
    conf_matrix = confusion_matrix(y_true=true.cpu(), y_pred=pred.cpu())
    fig, ax = plt.subplots(figsize=(5, 5))
    ax.matshow(conf_matrix, cmap=plt.cm.Oranges, alpha=0.3)
    for i in range(conf_matrix.shape[0]):
        for j in range(conf_matrix.shape[1]):
            ax.text(x=j, y=i,s=conf_matrix[i, j], va='center', ha='center', size='xx-large')
    
    plt.xlabel('Predictions', fontsize=16)
    plt.ylabel('Actuals', fontsize=16)
    plt.title('Confusion Matrix', fontsize=16)
    plt.savefig(folder_path+str(time+1)+'_matrix.png')
    plt.show()
    plt.close()

In [None]:
def model_eval(model, criterion, dataloader, time):
  total = 0
  correct = 0
  all_true = []
  all_pred = []

  model.eval() 
  with torch.no_grad():
    for i, data in enumerate(dataloader):
        inputs, labels = data[0].to(device, dtype=dtype), data[1].to(device, dtype=dtype)

        outputs = model(inputs)
        loss = criterion(outputs, labels)

        pred = torch.argmax(outputs, dim=1).to(device)
        true = torch.argmax(labels, dim=1).to(device)

        if time != 'val':
          if total == 0:
            all_true = true.cpu()
            all_pred = pred.cpu()
          else:
            all_true = np.concatenate((all_true, true.cpu()))
            all_pred = np.concatenate((all_pred, pred.cpu()))
        
        total += labels.size(0)
        correct += (pred == true).sum()
        
        acc = (correct/total).item()

    if time != 'val':
      matrix_plot(true, pred, time)
        
  return loss, acc

In [None]:
epochs = 200
times = 10
avg_test = []
best_model = None

for num in range(times):
  print('Times: ', num+1, '/', times)
  train_dataloader, val_dataloader, test_dataloader = shuffle_data(detect_data, detect_label)

  losses = []
  accuracy = []
  val_losses = []
  val_accuracy = []

  model = nn.Sequential(
      nn.Flatten(),
      nn.Linear(data_shape.shape[1], 256),
      nn.ReLU(),
      nn.Linear(256, 128),
      nn.ReLU(),
      nn.Linear(128, 16),
      nn.ReLU(),
      nn.BatchNorm1d(16),
      nn.Dropout(p=0.5),
      nn.Linear(16, 2),
      )
  model = model.to(device)

  criterion = nn.CrossEntropyLoss()
  optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay=1e-5)

  for epoch in range(epochs): 
      model.train()
      total = 0
      correct = 0   

      for i, data in enumerate(train_dataloader):
          inputs, labels = data[0].to(device, dtype=dtype), data[1].to(device, dtype=dtype)

          outputs = model(inputs)
          loss = criterion(outputs, labels)

          model.zero_grad()
          optimizer.zero_grad()
          loss.backward()
          optimizer.step()

          pred = torch.argmax(outputs, dim=1).to(device)
          true = torch.argmax(labels, dim=1).to(device)
          total += labels.size(0)
          correct += (pred == true).sum()
      train_acc = (correct/total).item()

      losses.append(loss)
      accuracy.append(train_acc)
    
      val_loss, val_acc = model_eval(model, criterion, val_dataloader, time='val')
      val_losses.append(val_loss)
      val_accuracy.append(val_acc)

  loss_plot(losses, val_losses, num)
  accuracy_plot(accuracy, val_accuracy, num)

  print("Accuracy Max: ", max(accuracy))
  print("Accuracy min: ", min(accuracy))

  _, test_acc = model_eval(model, criterion, test_dataloader, num)
  
  print("Test accuracy: ", test_acc)
  avg_test.append(test_acc)

  if num == 0:
      best_model = model 
  elif(test_acc > max(avg_test)):
      best_model = model

print('Average test accuracy: %f' % np.average(avg_test))   
print(avg_test)

torch.save(best_model.state_dict(), folder_path+'detector_'+str(approach)+'.pt')