In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets,transforms
import numpy as np
import matplotlib.pyplot as plt
import os
import pandas as pd
from layer import KernelConv2d, GaussianKernel, PolynomialKernel
from functools import partial # To invoke Kernel objects with input parameters when creating KernelConv2d object (e.g. partial(GaussianKernel, 0.05) for Gaussian OR partial(PolynomialKernel,2,3) for Polynomial)
%matplotlib inline
def mkdirs(path):
    if not os.path.exists(path):
        os.makedirs(path)

## Kervolution LeNet

In [None]:
class KNNet(nn.Module):
    def __init__(self):
        super(KNNet,self).__init__()
        self.conv1=KernelConv2d(1,10,5,partial(GaussianKernel, 0.05)) # self.conv1=KernelConv2d(1,10,5) for default/Ploynomial kernel with default parameters
        print(self.conv1)
        self.bn1=nn.BatchNorm2d(10)
        self.conv2=KernelConv2d(10,20,5)
        self.bn2=nn.BatchNorm2d(20)
        self.conv2_drop=nn.Dropout2d()
        self.fc1=nn.Linear(320,50)
        self.fc2=nn.Linear(50,10)
    def forward(self,x):
        x=F.relu(F.max_pool2d(self.conv1(x),2))
        x=self.bn1(x)
        x=F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)),2))
        x=self.bn2(x)
        x=x.view(-1,320)
        x=F.relu(self.fc1(x))
        x=F.dropout(x,training=self.training)
        x=F.relu(self.fc2(x))
        return F.log_softmax(x,dim=1)

In [None]:
train_loader=torch.utils.data.DataLoader(
    datasets.MNIST("data",train=True,download=True,transform=transforms.Compose([
                transforms.ToTensor(),
            ])),batch_size=128,shuffle=True)
test_loader=torch.utils.data.DataLoader(
    datasets.MNIST("data",train=False,download=True,transform=transforms.Compose([
                transforms.ToTensor(),
            ])),batch_size=128,shuffle=False
)
attack_test_loader=torch.utils.data.DataLoader(
    datasets.MNIST("data",train=False,download=True,transform=transforms.Compose([
                transforms.ToTensor(),
            ])),batch_size=1,shuffle=False
)
print(len(train_loader))
print(len(test_loader))

In [None]:
device=torch.device("cuda" if torch.cuda.is_available() else "cpu")

knn=KNNet().to(device)
knn.train(mode=True)

In [None]:
criterion=torch.nn.NLLLoss()

In [None]:
def compute_accuray(pred,true):
    pred_idx=pred.argmax(dim=1).detach().cpu().numpy()
    tmp=pred_idx==true.cpu().numpy()
    return sum(tmp)/len(pred_idx)
def train(m,out_dir):
    iter_loss=[]
    train_losses=[]
    test_losses=[]
    iter_loss_path=os.path.join(out_dir,"iter_loss.csv")
    epoch_loss_path=os.path.join(out_dir,"epoch_loss.csv")
    nb_epochs=20
    last_loss=99999
    mkdirs(os.path.join(out_dir,"models"))
    optimizer=optim.SGD(m.parameters(),lr=0.003,momentum=0.9)
    for epoch in range(nb_epochs):
        train_loss=0.
        train_acc=0.
        m.train(mode=True)
        for data,target in train_loader:
            data,target=data.to(device),target.to(device)
            optimizer.zero_grad()
            output=m(data)
            loss=criterion(output,target)
            loss_value=loss.item()
            iter_loss.append(loss_value)
            train_loss+=loss_value
            loss.backward()
            optimizer.step()
            acc=compute_accuray(output,target)
            train_acc+=acc
        train_losses.append(train_loss/len(train_loader))
        
        test_loss=0.
        test_acc=0.
        m.train(mode=False)
        for data,target in test_loader:
            data,target=data.to(device),target.to(device)
            output=m(data)
            loss=criterion(output,target)
            loss_value=loss.item()
            iter_loss.append(loss_value)
            test_loss+=loss_value
            acc=compute_accuray(output,target)
            test_acc+=acc
        test_losses.append(test_loss/len(test_loader))
        print("Epoch {}: train loss is {}, train accuracy is {}; test loss is {}, test accuracy is {}".
              format(epoch,round(train_loss/len(train_loader),2),
                     round(train_acc/len(train_loader),2),
                     round(test_loss/len(test_loader),2),
                     round(test_acc/len(test_loader),2)))        
        if test_loss/len(test_loader)<last_loss:      
            save_model_path=os.path.join(out_dir,"models","best_model.tar".format(epoch))
            torch.save({
                    "model":m.state_dict(),
                    "optimizer":optimizer.state_dict()
                },save_model_path)
            last_loss=test_loss/len(test_loader)
        
    df=pd.DataFrame()
    df["iteration"]=np.arange(0,len(iter_loss))
    df["loss"]=iter_loss
    df.to_csv(iter_loss_path,index=False)
    
    df=pd.DataFrame()
    df["epoch"]=np.arange(0,nb_epochs)
    df["train_loss"]=train_losses
    df["test_loss"]=test_losses
    df.to_csv(epoch_loss_path,index=False)    

In [8]:
train(knn,"lenet-knn-cp0.3")

Epoch 0: train loss is 2.09, train accuracy is 0.26; test loss is 1.48, test accuracy is 0.62
Epoch 1: train loss is 1.38, train accuracy is 0.56; test loss is 0.74, test accuracy is 0.8
Epoch 2: train loss is 0.95, train accuracy is 0.7; test loss is 0.49, test accuracy is 0.87
Epoch 3: train loss is 0.78, train accuracy is 0.76; test loss is 0.39, test accuracy is 0.89
Epoch 4: train loss is 0.69, train accuracy is 0.79; test loss is 0.33, test accuracy is 0.91
Epoch 5: train loss is 0.62, train accuracy is 0.81; test loss is 0.3, test accuracy is 0.91
Epoch 6: train loss is 0.57, train accuracy is 0.83; test loss is 0.27, test accuracy is 0.92
Epoch 7: train loss is 0.53, train accuracy is 0.84; test loss is 0.25, test accuracy is 0.93
Epoch 8: train loss is 0.49, train accuracy is 0.85; test loss is 0.23, test accuracy is 0.93
Epoch 9: train loss is 0.46, train accuracy is 0.86; test loss is 0.21, test accuracy is 0.94
Epoch 10: train loss is 0.43, train accuracy is 0.87; test loss