# Thermometer encoding
## Some util functions for convert a normal image to a one-hot way coded image

### discription of a one-hot coded image:
* n:batch size
* w:width of a image
* h:height of a image
* k:k-level discretization of a image

In [1]:
import numpy as np
from sklearn.preprocessing import OneHotEncoder
"""
input:natural image arr:n*w*h*c
return: quantisized image n*w*h*c
"""
def quantization(arr,k):
    quant = np.zeros(arr.shape)
    for i in range(1,k):
        quant[arr>1.0*i/k]+=1
    return quant

"""
input:quantisized img shape:n*w*h*c
retun:one-hot coded image shape:n*w*h*c*k
"""
def onehot(arr,k):
    n,w,h = arr.shape
    arr = arr.reshape(n,-1)
    enc=OneHotEncoder(n_values=k,sparse=False)
    arr = enc.fit_transform(arr)
    arr = arr.reshape(n,w,h,k)
    arr = arr.transpose(0,3,1,2)
    return arr

"""
input:one-hot coded img shape:n*w*h*c*k
retun:trmp coded image shape:n*w*h*c*k
"""
def tempcode(arr,k):
    tempcode = np.zeros(arr.shape)
    for i in range(k):
        tempcode[:,i,:,:] = np.sum(arr[:,:i+1,:,:],axis=1)
    return tempcode
    
"""
from a thermometerencoding image to a mormally coded image, for some visulization usage
"""
def temp2img(tempimg,k):
    img = np.sum(tempimg,axis=1)
    img = np.ones(img.shape)*(k+1)-img
    img = img*1.0/k
    return img


### Test util functions

In [2]:
img = np.random.random((2, 2, 2))
print img
print img.shape
quant = quantization(img,4)
print quant
onehotimg=onehot(quant,4)
print onehotimg[0]
tempcod = tempcode(onehotimg.copy(),4)
print tempcod[0]
recoverimg = temp2img(tempcod,4)
print recoverimg

[[[ 0.79451513  0.08269887]
  [ 0.31621181  0.62802222]]

 [[ 0.33544049  0.68535259]
  [ 0.97273507  0.12748406]]]
(2, 2, 2)
[[[ 3.  0.]
  [ 1.  2.]]

 [[ 1.  2.]
  [ 3.  0.]]]
[[[ 0.  1.]
  [ 0.  0.]]

 [[ 0.  0.]
  [ 1.  0.]]

 [[ 0.  0.]
  [ 0.  1.]]

 [[ 1.  0.]
  [ 0.  0.]]]
[[[ 0.  1.]
  [ 0.  0.]]

 [[ 0.  1.]
  [ 1.  0.]]

 [[ 0.  1.]
  [ 1.  1.]]

 [[ 1.  1.]
  [ 1.  1.]]]
[[[ 1.    0.25]
  [ 0.5   0.75]]

 [[ 0.5   0.75]
  [ 1.    0.25]]]


## Function with attacks
### getmask
input image x, random perbutation $\epsilon$, get a mask for {$x-\epsilon$,$x+\epsilon$}

In [3]:
def getMask(x,epsilon,k):
    n,w,h = x.shape
    mask = np.zeros((n,k,w,h))
    low = x - epsilon
    low[low < 0] = 0
    high = x + epsilon
    high[high > 1] = 1
    for i in range(k+1):
        interimg = (i*1./k)*low + (1-i*1./k)*high
        mask+=onehot(quantization(interimg,k),k)
    mask[mask>1] = 1
    return mask

In [4]:
print img
print onehot(quantization(img,4),4)
print getMask(img,np.random.random(img.shape)*0.5,4)

[[[ 0.79451513  0.08269887]
  [ 0.31621181  0.62802222]]

 [[ 0.33544049  0.68535259]
  [ 0.97273507  0.12748406]]]
[[[[ 0.  1.]
   [ 0.  0.]]

  [[ 0.  0.]
   [ 1.  0.]]

  [[ 0.  0.]
   [ 0.  1.]]

  [[ 1.  0.]
   [ 0.  0.]]]


 [[[ 0.  0.]
   [ 0.  1.]]

  [[ 1.  0.]
   [ 0.  0.]]

  [[ 0.  1.]
   [ 0.  0.]]

  [[ 0.  0.]
   [ 1.  0.]]]]
[[[[ 0.  1.]
   [ 1.  0.]]

  [[ 1.  0.]
   [ 1.  0.]]

  [[ 1.  0.]
   [ 1.  1.]]

  [[ 1.  0.]
   [ 1.  0.]]]


 [[[ 1.  0.]
   [ 0.  1.]]

  [[ 1.  1.]
   [ 0.  1.]]

  [[ 1.  1.]
   [ 1.  1.]]

  [[ 0.  1.]
   [ 1.  0.]]]]


In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.autograd import Variable
train_loader = torch.utils.data.DataLoader(
    datasets.MNIST('../data', train=True, download=True,
                   transform=transforms.Compose([
                       transforms.ToTensor(),
                   ])),
    batch_size=100, shuffle=True)
test_loader = torch.utils.data.DataLoader(
    datasets.MNIST('../data', train=False, transform=transforms.Compose([
                       transforms.ToTensor(),
                   ])),
    batch_size=100, shuffle=True)

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(15, 32, kernel_size=5)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(4*4*64, 1024)
        self.fc2 = nn.Linear(1024, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 4*4*64)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return x

model = Net()
model = model.cuda()

### LS-PGD attack

In [11]:
def LSPGDonechannel(data,target,epsilon,k,delta,xi,step,criterion):
    datanumpy = data.numpy()
    data0 = datanumpy[:,0,:,:]
    mask = getMask(data0,epsilon,k)
    u = np.random.random(mask.shape)-(1-mask)*1e10
    T = 1.0
    u = Variable(torch.Tensor(u).cuda(),requires_grad=True)
    z = F.softmax(u/T,dim=1)
    z = torch.cumsum(z,dim=1)
    for t in range(step):
        out = model(z)
        loss = criterion(out,target)
        if u.grad!=None:
            u.grad.data._zero()
        loss.backward()
        grad = u.grad
        u = xi*torch.sign(grad) + u
        u = Variable(u.data,requires_grad=True)
        z = F.softmax(u/T,dim=1)
        z = torch.cumsum(z,dim=1)
        T = T*delta
    attackimg = np.argmax(u.data.cpu().numpy(),axis=1)
    themattackimg = tempcode(onehot(attackimg,k),k)
    return themattackimg

## Try on MNIST

In [7]:
criterion = nn.CrossEntropyLoss()
level=15
optimizer = optim.Adam(model.parameters(), lr=1e-4)
def trainnat(epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        target = Variable(target)
        data = data.numpy()[:,0,:,:]
        data = Variable(torch.Tensor(tempcode(onehot(quantization(data,level),level),level)))
        data, target = data.cuda(), target.cuda()
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 10 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.data[0]))

def test():
    model.eval()
    test_loss = 0
    correct = 0
    for data, target in test_loader:
        target = Variable(target)
        data = data.numpy()[:,0,:,:]
        data = Variable(torch.Tensor(tempcode(onehot(quantization(data,level),level),level)))
        data, target = data.cuda(), target.cuda()
        output = model(data)
        test_loss += criterion(output, target).data[0] # sum up batch loss
        pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability
        correct += pred.eq(target.data.view_as(pred)).cpu().sum()

    test_loss /= len(test_loader.dataset)
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))
    
def trainadv(epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        target = Variable(target.cuda())
        data = LSPGDonechannel(data=data,target=target,epsilon=0.3,k=level,delta=1.2,xi=1.0,step=2,criterion=criterion)
        data = Variable(torch.Tensor(data).cuda())
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 10 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.data[0]))
    
def testadv():
    model.eval()
    test_loss = 0
    correct = 0
    for data, target in test_loader:
        target = target.cuda()
        target = Variable(target)
        data = LSPGDonechannel(data=data,target=target,epsilon=0.3,k=level,delta=1.2,xi=1.0,step=7,criterion=criterion)
        data = Variable(torch.Tensor(data).cuda())
        output = model(data)
        test_loss += criterion(output, target).data[0] # sum up batch loss
        pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability
        correct += pred.eq(target.data.view_as(pred)).cpu().sum()

    test_loss /= len(test_loader.dataset)
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))



train for 1 epoh

In [8]:
trainnat(0)



test on natral

In [9]:
test()


Test set: Average loss: 0.0013, Accuracy: 9603/10000 (96%)



test on LS-PGD attack

In [12]:
testadv()


Test set: Average loss: 0.0783, Accuracy: 26/10000 (0%)



advtrain on LS-PGD attack 

In [13]:
trainadv(0)



test after advtrain

In [14]:
testadv()


Test set: Average loss: 0.0069, Accuracy: 7547/10000 (75%)

