In [None]:
! pip install torch

In [None]:
%matplotlib inline

In [None]:
import math
import random
import numpy as np

DATA_SIZE=8000
RADIUS=1.0
EPOCHS=2000
CHECKPOINT=250


In [None]:
def generate_unitary_circle_classif_data(nb_samples):
    """
    Generates training data and labels:
    Data: consists of points sampled from a 2D Gaussian.
    Labels: there are two classes: 
                - 0 if the point is inside the unitary circle
                - 1 otherwise
    """
    data  = np.random.normal(size=(nb_samples, 2))
    magnit = np.linalg.norm(data, ord=2, axis=-1)
    labels = magnit > RADIUS
    return data, labels

def get_circle_segment(data, min_angle, max_angle):
    """
    Returns a boolean array indicating which samples lay between min_angle and max_angle
    """
    angles = get_angles(data)
    return np.logical_and(min_angle < angles, angles < max_angle)
    
def get_angles(data):
    """
    Compute the angle of each point stored in data
    """
    return np.arctan2( data[:,0], data[:,1] )


#generate random data
data, labels = generate_unitary_circle_classif_data(DATA_SIZE)
angles = get_angles(data)

Q34 = get_circle_segment( data, -1*math.pi, 0 )
Q12 = get_circle_segment( data, 0, math.pi )

data_Q34 = data[Q34,:]
data_Q12 = data[Q12,:]

labels_Q34 = labels[Q34].astype('int').reshape(-1,1)
labels_Q12 = labels[Q12].astype('int').reshape(-1,1)

angles_Q34 = angles[Q34]
angles_Q12 = angles[Q12]



In [None]:
from matplotlib import pyplot as plt

plt.scatter(data_Q12[:,0],data_Q12[:,1],c="g",label='Q12')
plt.scatter(data_Q34[:,0],data_Q34[:,1],c="b",label='Q34')
plt.legend()
plt.show()

In [None]:
from matplotlib import pyplot as plt

plt.scatter(data_Q12[:,0],data_Q12[:,1],c=labels_Q12.reshape((-1,)))
plt.scatter(data_Q34[:,0],data_Q34[:,1],c=labels_Q34.reshape((-1,)))

plt.show()

In [None]:

def generate_test_grid(min_val=-5, max_val=5):
    """
    Generate a grid for testing on the whole space
    """
    lsp = np.linspace(min_val, max_val)
    x, y = np.meshgrid(lsp, lsp)
    grid = np.vstack((x.flatten(), y.flatten())).transpose()
    
    magnit = np.linalg.norm( grid, ord=2, axis=-1 )
    labels = magnit > RADIUS
    labels = labels.astype('int').reshape(-1,1)
    
    angles = get_angles(grid)
        
    return grid, labels, angles

grid_data, grid_labels, grid_angles = generate_test_grid(min_val=-5,max_val=5)
nb_grid_samples, _ = grid_data.shape


In [None]:

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable

batch_size  = 1024
hidden_size = 64

In [None]:
from torch.utils.data import TensorDataset, DataLoader

simple_train_set = TensorDataset(torch.from_numpy(data_Q34).float(), torch.from_numpy(labels_Q34))
simple_valid_set = TensorDataset(torch.from_numpy(grid_data).float(), torch.from_numpy(grid_labels))

simple_train_loader = DataLoader(simple_train_set, batch_size=batch_size, shuffle=True)
simple_valid_loader = DataLoader(simple_valid_set, batch_size=nb_grid_samples, shuffle=True)


In [None]:
class SimpleClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.sig    = nn.Sigmoid()
        
        self.layer1 = nn.Linear(2,hidden_size)
        self.layer2 = nn.Linear(hidden_size,hidden_size)
        self.layer3 = nn.Linear(hidden_size,hidden_size)
        
        self.decision = nn.Linear(hidden_size,2)
    
    def forward(self, coord):
        h = self.layer1(coord)
        h = self.sig(h)
        h = self.layer2(h)
        h = self.sig(h)
        h = self.layer3(h)
        h = self.sig(h)
        out = self.decision(h)
        return out

simple_model = SimpleClassifier()
simple_model


In [None]:

def plot_mesh(model, loader):
    """
    Plot the model predictions on the validation grid
    """ 
    fig, ax = plt.subplots(ncols=2)
    model.eval()
    for x, y in loader:
        y_scores = model(Variable(x))
        y_pred = torch.max(y_scores, 1)[1]
        points = ax[0].scatter( x.numpy()[:,0], x.numpy()[:,1], c=y.numpy().reshape(-1,),  s=10 )
        points = ax[1].scatter( x.numpy()[:,0], x.numpy()[:,1], c=y_pred.numpy(),  s=10 )
          
    circle = plt.Circle((0,0), RADIUS, color='r', fill=False, lw=3)
    ax[0].add_artist(circle)
    ax[0].axis('equal')
    ax[0].set_title("Gold Labels")
    ax[0].set_xlabel("x1")
    ax[0].set_ylabel("x2")
    
    circle = plt.Circle((0,0), RADIUS, color='r', fill=False, lw=3)
    ax[1].add_artist(circle)
    ax[1].axis('equal')
    ax[1].set_title("Predicted Labels")
    ax[1].set_xlabel("x1")
    ax[1].set_ylabel("x2")
    
    plt.show()
        
plot_mesh(simple_model, simple_valid_loader)

In [None]:
def perf(model, loader):
    """
    Compute loss and accuracy on a dataset 
    """
    criterion = nn.CrossEntropyLoss()
    model.eval()
    total_loss = num = oks = 0
    for x, y in loader:
        y_scores = model(Variable(x))
        y_pred = torch.max(y_scores, 1)[1]
        
        loss = criterion(y_scores.view(y.size(0), -1), Variable(y.view(y.size(0))))
        total_loss += loss.data
        num += len(y)
        oks += torch.sum((y_pred.data == y.reshape(-1,)))
        
    return total_loss / num, math.exp(total_loss / num), float(oks)/float(num)


def fit(model, train_loader, valid_loader, epochs):
    """
    Train the model 
    """    

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters())
    for epoch in range(epochs):
        model.train()
        total_loss = num = 0
        
        for x, y in train_loader:
            optimizer.zero_grad()
            y_scores = model(Variable(x))
            loss = criterion(y_scores.view(y.size(0), -1), Variable(y.view(y.size(0))))
            
            loss.backward()
            optimizer.step()
            total_loss += loss.data
            num += len(y)
            
        if((epoch+1)%CHECKPOINT == 0):
            print(epoch+1, total_loss / num, *perf(model, valid_loader))


fit(simple_model, simple_train_loader, simple_valid_loader, EPOCHS)        
plot_mesh(simple_model, simple_valid_loader)


## Adversarial Domain Classifier

In [None]:
from torch.autograd import Function

class GradReverse(Function):
    @staticmethod
    def forward(ctx, x):
        return x.view_as(x)

    @staticmethod
    def backward(ctx, grad_output):
        return grad_output.neg()

def grad_reverse(x):
    return GradReverse.apply(x)
  
  

class AdversarialClassifier(nn.Module):
    """
    The previous three layer neural network + adversarial network
    """ 
    def __init__(self, nb_outs=2):
        super().__init__()
        self.sig     = nn.Sigmoid()
        self.nb_outs = nb_outs
        
        self.layer1 = nn.Linear(2,hidden_size)
        self.layer2 = nn.Linear(hidden_size,hidden_size)
        self.layer3 = nn.Linear(hidden_size,hidden_size)
        
        self.decision = nn.Linear(hidden_size,2)
        self.adversarial = nn.Linear(hidden_size, self.nb_outs)
    
    def forward(self, coord):
        h = self.layer1(coord)
        h = self.sig(h)
        h = self.layer2(h)
        h = self.sig(h)
        h = self.layer3(h)
        h = self.sig(h)
        
        out = self.decision(h)
        
        adv = grad_reverse(h)
        adv = self.adversarial(adv)
        
        return out, adv


adversarial_classif_model = AdversarialClassifier()
adversarial_classif_model

In [None]:

def get_simple_bi_domain_small_adv_labels(data):
    """
    Generate a grid for testing on the whole space
    """
    Q34   = get_circle_segment( data, -1*math.pi    , 0.0 )
    Q34_1 = get_circle_segment( data, -1*math.pi    , -1*math.pi/2.0 )
    Q34_2 = get_circle_segment( data, -1*math.pi/2.0, 0.0 )
    
    nb_samples, _ = data.shape
    domain_labels = np.zeros((nb_samples,1))
    domain_labels[Q34_1] = 0
    domain_labels[Q34_2] = 1
    
    return domain_labels.astype('int').reshape(-1,1)



grid_domain = get_simple_bi_domain_small_adv_labels(grid_data)
data_domain = get_simple_bi_domain_small_adv_labels(data)

# make the data loaders
adv_classif_maintask_train_set = TensorDataset(torch.from_numpy(data_Q34).float(), torch.from_numpy(labels_Q34))
adv_classif_advtask_train_set  = TensorDataset(torch.from_numpy(data).float(), torch.from_numpy(data_domain))

adv_classif_maintask_loader = DataLoader(adv_classif_maintask_train_set, batch_size=batch_size, shuffle=True)
adv_classif_advtask_loader  = DataLoader(adv_classif_advtask_train_set, batch_size=batch_size, shuffle=True)

adv_classif_valid_set = TensorDataset(torch.from_numpy(grid_data).float(), torch.from_numpy(grid_labels), torch.from_numpy(grid_domain))
adv_classif_valid_loader = DataLoader(adv_classif_valid_set, batch_size=nb_grid_samples, shuffle=True)


In [None]:

def perf_adv_classif(model, loader):
    """
    Compute loss and accuracy on a dataset 
    **ignores the aversarial task
    """
    criterion = nn.CrossEntropyLoss()
    model.eval()
    y_total_loss = d_total_loss = num = y_oks = d_oks = 0
    for x, y, d in loader:
        y_scores, d_scores = model(Variable(x))
        y_pred = torch.max(y_scores, 1)[1]
        d_pred = torch.max(d_scores, 1)[1]
        
        loss = criterion(y_scores.view(y.size(0), -1), Variable(y.view(y.size(0))))
        y_total_loss += loss.data
        
        
        loss = criterion(d_scores.view(d.size(0), -1), Variable(d.view(d.size(0))))
        d_total_loss += loss.data
        
        num += len(y)
        y_oks += torch.sum((y_pred.data == y.reshape(-1,)))
        d_oks += torch.sum((d_pred.data == d.reshape(-1,)))
        
    return "Y_LOSS:", y_total_loss / num, "Y_ACC:", float(y_oks)/float(num), "D_LOSS:", d_total_loss / num, "D_ACC:", float(d_oks)/float(num)
  


def fit_adv_classif(model, main_loader, adv_loader, valid_loader, epochs):
    """
    Train the adversarial model 
    """    

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters())
    adv_loader_epoch = iter(adv_loader)

    for epoch in range(epochs):
    
        model.train()
        total_loss = num = 0
        # iterate over batches
        for (x, y) in main_loader:
        
            optimizer.zero_grad()
            # update lambda and learning rate to ensure stability
            p = float(epoch) / epochs
            lambd = 3. / (1. + np.exp(-5. * p)) - 1
        
            # exit if batch size incorrect, get next target batch
            if len(x) < batch_size:
                continue
        
            xt, dt = next(adv_loader_epoch)
            if len(xt) < batch_size:
                adv_loader_epoch = iter(adv_loader)
                continue      

            y_scores, _ = model(x)
            _, adv_dt   = model(xt)

            
            main_loss = criterion(y_scores.view(y.size(0), -1), Variable(y.view(y.size(0))))
            adv_loss  = criterion(adv_dt.view(dt.size(0), -1), Variable(dt.view(dt.size(0))))

            loss = main_loss + lambd*(adv_loss)
            loss.backward()
            optimizer.step()
            total_loss += loss.data
            num += len(y)
            
        if((epoch+1)%CHECKPOINT == 0):
            print(epoch+1, lambd, total_loss / num, *perf_adv_classif(model, valid_loader))      




fit_adv_classif(adversarial_classif_model, adv_classif_maintask_loader, 
                adv_classif_advtask_loader, adv_classif_valid_loader, EPOCHS)



In [None]:
def plot_adv_classif_mesh(model, loader):
    
    fig, ax = plt.subplots(ncols=2,nrows=2)
    model.eval()
    for x, y, d in loader:
        y_scores, d_scores = model(Variable(x))
        y_pred = torch.max(y_scores, 1)[1]
        d_pred = torch.max(d_scores, 1)[1]
        
        ax[0][0].scatter( x.numpy()[:,0], x.numpy()[:,1], c=y.numpy().reshape(-1), s=10 )
        ax[0][1].scatter( x.numpy()[:,0], x.numpy()[:,1], c=y_pred.numpy(), s=10 )
        ax[1][0].scatter( x.numpy()[:,0], x.numpy()[:,1], c=d.numpy().reshape(-1), s=10 )
        ax[1][1].scatter( x.numpy()[:,0], x.numpy()[:,1], c=d_pred.numpy(), s=10 )
        
        
    circle = plt.Circle((0,0), RADIUS, color='r', fill=False, lw=3)
    ax[0][0].add_artist(circle)
    ax[0][0].axis('equal')
    ax[0][0].axvline(x=0)
    ax[0][0].axhline(y=0)
    ax[0][0].set_title("Gold Labels")
    ax[0][0].set_xlabel("x1")
    ax[0][0].set_ylabel("x2")
    
    circle = plt.Circle((0,0), RADIUS, color='r', fill=False, lw=3)
    ax[1][0].add_artist(circle)
    ax[1][0].axis('equal')
    ax[1][0].axvline(x=0)
    ax[1][0].axhline(y=0)
    ax[1][0].set_title("Gold Adversarial")
    ax[1][0].set_xlabel("x1")
    ax[1][0].set_ylabel("x2")
    
    circle = plt.Circle((0,0), RADIUS, color='r', fill=False, lw=3)
    ax[0][1].add_artist(circle)
    ax[0][1].axis('equal')
    ax[0][1].axvline(x=0)
    ax[0][1].axhline(y=0)
    ax[0][1].set_title("Predicted Labels")
    ax[0][1].set_xlabel("x1")
    ax[0][1].set_ylabel("x2")

    
    circle = plt.Circle((0,0), RADIUS, color='r', fill=False, lw=3)
    ax[1][1].add_artist(circle)
    ax[1][1].axis('equal')
    ax[1][1].axvline(x=0)
    ax[1][1].axhline(y=0)
    ax[1][1].set_title("Predicted Adversarial")
    ax[1][1].set_xlabel("x1")
    ax[1][1].set_ylabel("x2")
    
    plt.show()
    

plot_adv_classif_mesh(adversarial_classif_model, adv_classif_valid_loader)

# Adversarial Regression

Here we train a model that adversarily predicts the angle of each data point

In [None]:

class AdversarialRegressor(nn.Module):
    """
    The previous three layer neural network + adversarial network
    """ 
    def __init__(self):
        super().__init__()
        self.sig    = nn.Sigmoid()
        
        self.layer1 = nn.Linear(2,hidden_size)
        self.layer2 = nn.Linear(hidden_size,hidden_size)
        self.layer3 = nn.Linear(hidden_size,hidden_size)
        
        self.decision = nn.Linear(hidden_size,2)
        self.adversarial = nn.Linear(hidden_size,1)
    
    def forward(self, coord):
        h = self.layer1(coord)
        h = self.sig(h)
        h = self.layer2(h)
        h = self.sig(h)
        h = self.layer3(h)
        h = self.sig(h)
        
        out = self.decision(h)
        
        adv = grad_reverse(h)
        adv = self.adversarial(adv)
        #adv = self.adversarial(h)
        
        return out, adv

adversarial_regress_model = AdversarialRegressor()

In [None]:

def perf_adv_regressor(model, loader):
    """
    Compute loss and accuracy on a dataset 
    **ignores the aversarial task
    """
    criterion_classif = nn.CrossEntropyLoss()
    criterion_regress = nn.MSELoss()
    model.eval()
    total_loss = num = oks = mse = 0
    for x, y, d in loader:
        y_scores, d_reg = model(Variable(x))
        y_pred = torch.max(y_scores, 1)[1]
        
        loss = criterion_classif(y_scores.view(y.size(0), -1), Variable(y.view(y.size(0))))
        mse += criterion_regress(d_reg, Variable(d))
        total_loss += loss.data
        num += len(y)
        oks += torch.sum((y_pred.data == y.reshape(-1,)))
        
    return "Y_LOSS:", total_loss / num, "Y_ACC:", float(oks)/float(num), "D_LOSS:", mse / num


def fit_adv_regressor(model, main_loader, adv_loader, valid_loader, epochs):
    """
    Train the adversarial model 
    """    

    criterion_classif = nn.CrossEntropyLoss()
    criterion_regress = nn.MSELoss()
    optimizer = optim.Adam(model.parameters())
    adv_loader_epoch = iter(adv_loader)

    for epoch in range(epochs):
    
        model.train()
        total_loss = num = 0
        # iterate over batches
        for (x, y) in main_loader:
        
            optimizer.zero_grad()
            # update lambda and learning rate to ensure stability
            p = float(epoch) / epochs
            
            if( epoch < 10 ):
                lambd = 0.0
            else:
                lambd = 3. / (1. + np.exp(-1.0 * p)) - 1
            
            
            # exit if batch size incorrect, get next target batch
            if len(x) < batch_size:
                continue
        
            xt, dt = next(adv_loader_epoch)
            if len(xt) < batch_size:
                adv_loader_epoch = iter(adv_loader)
                continue      

            y_scores, _ = model(x)
            _, adv_dt   = model(xt)

            main_loss = criterion_classif(y_scores.view(y.size(0), -1), Variable(y.view(y.size(0))))
            adv_loss  = criterion_regress(adv_dt.view(adv_dt.size(0), -1), Variable(dt).view(adv_dt.size(0), -1))

            loss = main_loss + lambd*(adv_loss)
            loss.backward()
            optimizer.step()
            total_loss += loss.data
            num += len(dt)
            
        if((epoch+1)%CHECKPOINT == 0):
            print(epoch+1, lambd, total_loss / num, *perf_adv_regressor(model, valid_loader))      

    
# make the data loaders
adv_regress_maintask_train_set = TensorDataset(torch.from_numpy(data_Q34).float(), torch.from_numpy(labels_Q34))
adv_regress_advtask_train_set  = TensorDataset(torch.from_numpy(data).float(), torch.from_numpy(angles).float())

adv_regress_maintask_loader = DataLoader(adv_regress_maintask_train_set, batch_size=batch_size, shuffle=True)
adv_regress_advtask_loader  = DataLoader(adv_regress_advtask_train_set, batch_size=batch_size, shuffle=True)

adv_regress_valid_set = TensorDataset(torch.from_numpy(grid_data).float(), torch.from_numpy(grid_labels), torch.from_numpy(grid_angles).float())
adv_regress_valid_loader = DataLoader(adv_regress_valid_set, batch_size=nb_grid_samples, shuffle=True)


fit_adv_regressor(adversarial_regress_model, adv_regress_maintask_loader,
                  adv_regress_advtask_loader, adv_regress_valid_loader, EPOCHS)




In [None]:
  
def plot_adv_regress_mesh(model, loader):
    
    fig, ax = plt.subplots(ncols=2,nrows=2)
    model.eval()
    for x, y, d in loader:
        y_scores, d_scores = model(Variable(x))
        y_pred = torch.max(y_scores, 1)[1]
        d_pred = d_scores.detach().numpy().reshape(-1,)
        
        ax[0][0].scatter( x.numpy()[:,0], x.numpy()[:,1], c=y.numpy().reshape(-1), s=10 )
        ax[0][1].scatter( x.numpy()[:,0], x.numpy()[:,1], c=y_pred.numpy(), s=10 )
        im10 = ax[1][0].scatter( x.numpy()[:,0], x.numpy()[:,1], c=(180.0/math.pi)*d.numpy().reshape(-1), s=10, vmin=-180, vmax=180 )
        im11 = ax[1][1].scatter( x.numpy()[:,0], x.numpy()[:,1], c=(180.0/math.pi)*d_pred, s=10, vmin=-180, vmax=180 )
        
        
    circle = plt.Circle((0,0), RADIUS, color='r', fill=False, lw=3)
    ax[0][0].add_artist(circle)
    ax[0][0].axis('equal')
    
    circle = plt.Circle((0,0), RADIUS, color='r', fill=False, lw=3)
    ax[1][0].add_artist(circle)
    ax[1][0].axis('equal')
    plt.colorbar(im10, ax=ax[1][0])
    #ax[1][0].colorbar()
    
    circle = plt.Circle((0,0), RADIUS, color='r', fill=False, lw=3)
    ax[0][1].add_artist(circle)
    ax[0][1].axis('equal')
    
    
    circle = plt.Circle((0,0), RADIUS, color='r', fill=False, lw=3)
    ax[1][1].add_artist(circle)
    ax[1][1].axis('equal')
    plt.colorbar(im11, ax=ax[1][1])
    
    plt.show()


plot_adv_regress_mesh(adversarial_regress_model, adv_regress_valid_loader)
