In [None]:
import RingDataset
from Models import CNNModel, CNNModelWide, CNNModelDeep, RNNGenerator, Distiller, MLP, RNNModel, QGRU2
import os
import numpy as np
import pickle
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import re
import time

In [None]:
testset =  RingDataset.RingDataset('core4ToSlice3_test.pkl', threshold=42)

testloader = DataLoader(testset, batch_size=256, num_workers=4)

studentdim = 10
gen=QGRU2(42, scale=0.25, dim=studentdim, drop=0.0)
assert os.path.isfile('./models/best_{}_{}.pth'.format('qgru', studentdim))
gen.load_state_dict(torch.load('./models/best_{}_{}.pth'.format('qgru', studentdim)))



In [None]:
def shifter(arr, window=32):
    dup = arr[:,None,:].expand(arr.size(0), arr.size(1)+1, arr.size(1))
    dup2 = dup.reshape(arr.size(0), arr.size(1), arr.size(1)+1)
    shifted = dup2[:,:window,:-window]
    return shifted

In [None]:
model = gen
model.eval()

model_int8 = torch.quantization.quantize_dynamic(
    model, {nn.GRUCell, nn.Linear}, dtype=torch.qint8
)
print(model_int8)

input_fp32, _ = next(iter(testloader))
shifted = shifter(input_fp32)

'''
model.qconfig = torch.quantization.get_default_qconfig('fbgemm')

# Prepare the model for static quantization. This inserts observers in
# the model that will observe activation tensors during calibration.
model_fp32_prepared = torch.quantization.prepare(model)

# calibrate the prepared model to determine quantization parameters for activations
# in a real world setting, the calibration would be done with a representative dataset

model_fp32_prepared(shifted, hidden_fp32)

# Convert the observed model to a quantized model. This does several things:
# quantizes the weights, computes and stores the scale and bias value to be
# used with each activation tensor, and replaces key operators with quantized
# implementations.
model_int8 = torch.quantization.convert(model_fp32_prepared)

# run the model, relevant calculations will happen in int8
'''


# run the model, relevant calculations will happen in int8
res = model_int8(shifted)
res2 = model(shifted)
print(res-res2)

In [None]:
def quantizer(arr, std=8):
    return torch.round(arr*std)/std

testset =  RingDataset.RingDataset('core4ToSlice3_test.pkl', threshold=42)
valset =  RingDataset.RingDataset('core4ToSlice3_valid.pkl', threshold=42)

testloader = DataLoader(testset, batch_size=128, num_workers=4)
valloader = DataLoader(valset, batch_size=128, num_workers=4)



halfstudent = model_int8
cooldown = 100
#halfstudent.eval()
for _ in range(3):
    classifier_test = CNNModelWide(42, dim=256).cuda()
    #classifier_test = RNNModel(42, dim=256).cuda()
    criterion = nn.CrossEntropyLoss()

    lastacc = 0.0
    lastnorm = 0.0
    optim_c2 = torch.optim.Adam(classifier_test.parameters(), lr=1e-4)
    for e in range(cooldown):
        classifier_test.train()
        for x,y in valloader:
            xdata, ydata = x.cuda(), y.cuda()
            shifted = shifter(xdata.cpu())
            #train classifier
            optim_c2.zero_grad()
            perturb = halfstudent(shifted).view(shifted.size(0),-1).cuda()
            #perturb = gen(xdata[:,31:])
            #interleaving?
            output = classifier_test(xdata[:,31:]+perturb.detach().float())
            loss_c = criterion(output, ydata)
            loss_c.backward()
            optim_c2.step()


        mloss = 0.0
        totcorrect = 0
        totcount = 0
        mnorm = 0.0
        zerocorrect = 0
        zerocount = 0
        onecorrect = 0
        onecount = 0
        #evaluate classifier

        with torch.no_grad():
            classifier_test.eval()
            for x,y in testloader:
                xdata, ydata = x.cuda(), y.cuda()
                shifted = shifter(xdata.cpu())
                perturb = halfstudent(shifted).view(shifted.size(0),-1).cuda()
                perturb = quantizer(perturb)
                #perturb = gen(xdata[:,31:])
                norm = torch.mean(perturb)
                output = classifier_test(xdata[:,31:]+perturb.float())
                loss_c = criterion(output, ydata)
                pred = output.argmax(axis=-1)
                mnorm += norm.item()/len(testloader)
                mloss += loss_c.item()/len(testloader)
                #macc += ((pred==ydata).sum().float()/pred.nelement()).item()/len(testloader)
                totcorrect += (pred==ydata).sum().item()
                totcount += y.size(0)
                zerocorrect += ((pred==0)*(ydata==0)).sum().item()
                zerocount += (ydata==0).sum().item()
                onecorrect += ((pred==1)*(ydata==1)).sum().item()
                onecount += (ydata==1).sum().item()
            macc = float(totcorrect)/totcount
            zacc = float(zerocorrect)/zerocount
            oacc = float(onecorrect)/onecount
            if (e+1)%10 == 0:
                print("epoch {} \t zacc {:.6f}\t oneacc {:.6f}\t loss {:.6f}\t Avg perturb {:.6f}\n".format(e+1, zacc, oacc, mloss, mnorm))
            if cooldown - e <= 10:
                lastacc += macc/10
                lastnorm += mnorm/10
            
    print("Last 10 acc: {:.6f}\t perturb: {:.6f}".format(lastacc,lastnorm))



In [None]:
train_x = []
train_y = []
test_x = []
test_y = []
with torch.no_grad():
    for x,y in valloader:
        shifted = shifter(x)
        #train classifier
        perturb = halfstudent(shifted).view(shifted.size(0),-1)
        perturbed_x = x[:,31:]+perturb
        for p in perturbed_x:
            train_x.append(p.numpy())
        for y_i in y:
            train_y.append(y_i.item())

    for x,y in testloader:
        shifted = shifter(x)
        #train classifier
        perturb = halfstudent(shifted).view(shifted.size(0),-1)
        perturbed_x = x[:,31:]+perturb
        for p in perturbed_x:
            test_x.append(p.numpy())
        for y_i in y:
            test_y.append(y_i.item())
print(len(train_y))

In [None]:
from sklearn import svm
clf = svm.SVC()
clf.fit(train_x, train_y)
pred_y = clf.predict(test_x)
#tpred_y = clf.predict(train_x)
(pred_y == test_y).sum()/len(pred_y)
#(tpred_y == train_y).sum()/len(train_y)

In [None]:
from sklearn.neighbors import KNeighborsClassifier
clf = KNeighborsClassifier(n_neighbors=25)
clf.fit(train_x, train_y)
pred_y = clf.predict(test_x)
#tpred_y = clf.predict(train_x)
(pred_y == test_y).sum()/len(pred_y)
#(tpred_y == train_y).sum()/len(train_y)

In [None]:
clf.n_support_