# CNN Sample

This section implements a simple CNN to serve as a better visualization of the permutation test method described in "Permutation-based Hypothesis Testing for Neural Networks."
The model consists of 3 layers: a CNN layer with 2 out channels and a 15x15 kernel, a ReLU layer, and a linear layer.

The training dataset consists of 60000 samples, each sample is a 28x28 image with only 1 color.

In [1]:
import torch
import torchvision
from torchvision import datasets, transforms
from torch import nn, optim
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
from time import time
import numpy as np

transform = transforms.Compose([transforms.ToTensor(),
                              transforms.Normalize((0.5,), (0.5,)),
                              ])

DATA = "./_Data/"
ORIGINAL = "./_Data/parameters/original/"
PERMUTED = "./_Data/parameters/permuted/"

trainset = datasets.MNIST(DATA + 'train', download=True, train=True, transform=transform)
testset = datasets.MNIST(DATA + 'test', download=True, train=False, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=64, shuffle=True)
testloader = torch.utils.data.DataLoader(testset, batch_size=64, shuffle=True)

In [2]:
# model
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 2, 15)
        self.fc1 = nn.Linear(2 * 14 * 14, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = x.view(-1, 2*14*14)
        x = self.fc1(x)
        return x

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = CNN()
model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

In [None]:
# train
for epoch in range(6):  # loop over the dataset multiple times

    running_loss = 0.0
    for i, data in enumerate(trainloader, 0):
        inputs, labels = data
        inputs = inputs.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        if i % 200 == 199:  
            print('[epoch %d, batch %5d] loss: %.3f' %
                  (epoch + 1, i + 1, running_loss / 200))
            running_loss = 0.0

In [3]:
# test
correct = 0
total = 0
with torch.no_grad():
    for data in testloader:
        inputs, labels = data
        inputs = inputs.to(device)
        labels = labels.to(device)
        
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print('Accuracy: %d %%' % (
    100 * correct / total))

Accuracy: 7 %


In [24]:
# save parameters
torch.save(model.conv1.weight, ORIGINAL+'conv_w.pt')
torch.save(model.fc1.weight, ORIGINAL+'fc_w.pt')
torch.save(model.conv1.bias, ORIGINAL+'conv_b.pt')
torch.save(model.fc1.bias, ORIGINAL+'fc_b.pt')

In [3]:
# load parameters
conv_w = torch.load(ORIGINAL+'conv_w.pt', weights_only=True, map_location=device)
fc_w = torch.load(ORIGINAL+'fc_w.pt', weights_only=True, map_location=device)
conv_b = torch.load(ORIGINAL+'conv_b.pt', weights_only=True, map_location=device)
fc_b = torch.load(ORIGINAL+'fc_b.pt', weights_only=True, map_location=device)

# Permutation test

This section implements the association test featured in "Permutation-based Hypothesis Testing for Neural Networks."

Link: https://arxiv.org/pdf/2301.11354

## Partial Derivatives (no acceleration)

`partialDerivative(X, w, b, f, df, types)`

Calculates the partial derivative of a single sample using Chain Rule.

<b> Parameters: </b>

`X`: a single sample from the training data \
`w`: a list containing the weights obtained in layer of the neural network \
`b`: a list containing the biases obtained in layer of the neural network\
&ensp; (For layers that does not have weights/bias, the corresponding element is set to `None`) \
`f`: a list of functions representing each layer of the neural network \
`df`: a list of functions representing the derivatives of each element in `f` \
&ensp; (For matrix reshaping layers, the corresponding element is set to `None`) \
`types`: a list of integers corresponding to the correct way to calculate partial derivative for each layer

| Value          | Corresponding Functions|
| :-----------  | :------------------------- |
| 0 | reshape function  |
| 1    | ReLU / mask |
| 2 | argmax / argmin  |
| 3    | default  |

derivative function parameters:
`x`: input \
`w`: weight \
`b`: bias \
`y`: output `f(x)` \
`i`: input index (for 1D / 2D array input) \
`j`: input index (for 2D array input)


In [19]:
# currently for for 2D input sample (subject to changes)
def partialDerivative2D(X, w, b, f, df, types):
    pds = []
    for i in range(len(X[0])):
        for j in range(len(X[0][0])):
            pd = 1
            y_0 = X
            for k in range(len(f)):
                y_1 = f[k](y_0)
                if types[k] == 0:
                    pd = f[k](pd)
                elif types[k] == 1:
                    pd = df[k](y_0, w[k], b[k], y_1, i, j) * pd
                elif types[k] == 2:
                    pd = df[k](pd, y_0)
                else:
                    pd = torch.tensordot(df[k](y_0, w[k], b[k], y_1, i, j), pd[0].to(device), dims=1)
                y_0 = y_1
            pds.append(pd)
    return torch.Tensor(pds)

In [None]:
# derivative of convolution layer
def dconv(x, w, b, y, i, j):
    grads = []
    for z in range(2):
        grad = np.zeros((14,14))
        A = min(j, 14)
        for a in range(max(j-14, 0), min(j+1, 14)):
            B = min(i, 14)
            for b in range(max(i-14, 0), min(i+1, 14)):
                grad[a][b] = w[z][0][A][B]
                B -= 1
            A -= 1
        grads.append(grad)
    return torch.Tensor(grads)

# derivative of ReLU layer
def drelu(x, w, b, y, i, j):
    grads = []
    for z in range(2):
        grad = np.zeros((14,14))
        for a in range(14):
            for b in range(14):
                if x[z][a][b] > 0:
                    grad[a][b] = 1
        grads.append(grad)
    return torch.Tensor(grads)

# derivative of linear layer
def dlinear(x, w, b, y, i, j):
    return w

# reshape function
def reshape(x):
    return x.view(-1, 2*14*14)

# derivative of max function
def dmax(grad, y):
    maximum = y[0][0]
    ind = 0
    for k in range(1, y.shape[1]):
        if y[0][k] > maximum:
            maximum = y[0][k]
            ind = k
    return grad[ind]

# def dpool(x, w, b, y, i, j):
#     grads = []
#     for z in range(6):
#         grad = np.zeros((4,4))
#         for a in range(4):
#             for b in range(4):
#                 ind = maxInd(y[z][a * 6: a * 6 + 6][b * 6: b * 6 + 6])
#                 grad[a][b] = w[z][a * 6 + ind[0]][b * 6 + ind[1]]
#         grads.append(grad)
#     return grads

# def maxInd(X):
#     ind = (0, 0)
#     maximum = np.nextafter(0, 1)
#     for i in range(len(X)):
#         for j in range(len(X[0])):
#             if X[i][i] > maximum:
#                 ind = (i, j)
#     return ind

## Partial Derivatives (CUDA)

In [4]:
import sys
import numba 
import numpy
from numba import cuda
import numpy as np
import time
import math

In [5]:
X = trainset[0][0].detach().cpu().numpy()

In [6]:
conv_w = conv_w.detach().cpu().numpy()
fc_w = fc_w.detach().cpu().numpy()
conv_b = conv_b.detach().cpu().numpy()
fc_b = fc_b.detach().cpu().numpy()

In [9]:
def conv2D(X, weights, bias):
    m, n = weights.shape
    if (m == n):
        y, x = X.shape
        y = y - m + 1
        x = x - m + 1
        x_new = numba.cuda.local.array((14, 14), type)
        x_new = np.zeros((y,x))
        for a in range(y):
            for b in range(x):
                x_new[a][b] = np.sum(X[a:a+m, b:b+m]*weights) + bias
    return x_new

#a = conv2D(img, w_1[0][0], b_1)
#b = conv2D(img, w_1[1][0], b_1)

In [48]:
@cuda.jit
def CNN_pd(array):
    img = cuda.const.array_like(X[0])
    w_1 = cuda.const.array_like(conv_w)
    b_1 = cuda.const.array_like(conv_b)
    
    m, n = cuda.grid(2)
    conv2D(img, w_1[0][0], b_1)
        
@cuda.jit(device=True)
def conv2D(X, weights, bias):
    m, n = weights.shape
    if (m == n):
        y, x = X.shape
        y = y - m + 1
        x = x - m + 1
        for a in range(y):
            for b in range(x):
                np.sum(X[a:a+m, b:b+m]*weights) + bias

In [43]:
dy = np.zeros((1, 28*28), np.float32)
threadsperblock = (28, 28)
blockspergrid_x = math.ceil(dy.shape[0] / threadsperblock[0])
blockspergrid_y = math.ceil(dy.shape[1] / threadsperblock[1])
blockspergrid = (blockspergrid_x, blockspergrid_y)

print('Initial array:', dy)

CNN_pd[blockspergrid, threadsperblock](dy)
print('Kernel launch: ', dy)

Initial array: [[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0. 0. 0. 0. 0. 



TypingError: Failed in cuda mode pipeline (step: nopython frontend)
Failed in cuda mode pipeline (step: nopython frontend)
Use of unsupported NumPy function 'numpy.sum' or unsupported use of the function.

File "../../../../../../tmp/ipykernel_19723/365025871.py", line 19:
<source missing, REPL/exec in use?>

During: typing of get attribute at /tmp/ipykernel_19723/365025871.py (19)

File "../../../../../../tmp/ipykernel_19723/365025871.py", line 19:
<source missing, REPL/exec in use?>

During: resolving callee type: type(CUDADispatcher(<function conv2D at 0x2accc8d5c790>))
During: typing of call at /tmp/ipykernel_19723/365025871.py (8)


File "../../../../../../tmp/ipykernel_19723/365025871.py", line 8:
<source missing, REPL/exec in use?>


In [14]:
conv_w.shape

(2, 1, 15, 15)

## Permute

In [None]:
for i in range(100):
    print("permutation", i)
    model = CNN()
    model.to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
    # train
    for epoch in range(6):  # loop over the dataset multiple times
        for i, data in enumerate(trainloader, 0):
            inputs, labels = data
            ind = torch.randperm(labels.shape[0])
            labels = labels[ind]
            inputs = inputs.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
    # save parameters
    torch.save(model.conv1.weight, PERMUTED+'conv_w.pt')
    torch.save(model.fc1.weight, PERMUTED+'fc_w.pt')
    torch.save(model.conv1.bias, PERMUTED+'conv_b.pt')
    torch.save(model.fc1.bias, PERMUTED+'fc_b.pt')

## Run Test

In [9]:
weights = [conv_w, None, None, fc_w, None]
bias = [conv_b, None, None, fc_b, None]
functions = [model.conv1, F.relu, reshape, model.fc1, torch.max]
derivatives = [dconv, drelu, None, dlinear, dmax]
types = [1,1,0,3,2]

In [22]:
T = torch.zeros(28*28)
for i, data in enumerate(trainloader, 0):
    print("batch", i)
    for d in data[0]:
        d = d.to(device)
        start_time = time.perf_counter()
        pds = partialDerivative(d, weights, bias, functions, derivatives, types)
        end_time = time.perf_counter()
        elapsed_time = end_time - start_time
        print(f"Elapsed time: {elapsed_time:.4f} seconds")
        pds = torch.square(pds)
        T += pds
        break
    break

batch 0
Elapsed time: 8.6745 seconds


In [13]:
bias

[Parameter containing:
 tensor([-0.0139,  0.0155], device='cuda:0', requires_grad=True),
 None,
 None,
 Parameter containing:
 tensor([ 0.0038, -0.0020, -0.0157,  0.0184, -0.0488,  0.0252,  0.0279, -0.0055,
          0.0037,  0.0499], device='cuda:0', requires_grad=True),
 None]