### imports

In [None]:
# import libraries
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split

import numpy as np

import matplotlib.pyplot as plt
import matplotlib_inline.backend_inline
matplotlib_inline.backend_inline.set_matplotlib_formats('svg')

### Create and prepare the data

In [None]:
# create data

nPerClust = 300
blur = 1

A = [1, 1]
B = [5, 1]
C = [4, 3]

# generate data
a = [A[0] + np.random.randn(nPerClust) * blur, A[1] + np.random.randn(nPerClust) * blur]
b = [B[0] + np.random.randn(nPerClust) * blur, B[1] + np.random.randn(nPerClust) * blur]
c = [C[0] + np.random.randn(nPerClust) * blur, C[1] + np.random.randn(nPerClust) * blur]

# true labels
labels_np = np.hstack(
    (np.zeros((nPerClust)), np.ones((nPerClust)), 1 + np.ones((nPerClust)))
)

# concatanate into a matrix
data_np = np.hstack((a, b, c)).T

# convert to a pytorch tensor
data = torch.tensor(data_np).float()
labels = torch.tensor(labels_np).long()  # note: "long" format for CCE

# show the data
fig = plt.figure(figsize=(8, 8))
# draw distance to origin
color = "bkr"
for i in range(len(data)):
    plt.plot([0, data[i, 0]], [0, data[i, 1]], color=color[labels[i]], alpha=0.2)

plt.plot(
    data[np.where(labels == 0)[0], 0],
    data[np.where(labels == 0)[0], 1],
    "bs",
    alpha=0.5,
)
plt.plot(
    data[np.where(labels == 1)[0], 0],
    data[np.where(labels == 1)[0], 1],
    "ko",
    alpha=0.5,
)
plt.plot(
    data[np.where(labels == 2)[0], 0],
    data[np.where(labels == 2)[0], 1],
    "r^",
    alpha=0.5,
)

plt.grid(color=[0.9, 0.9, 0.9])
plt.title("The qwerties!")
plt.xlabel("qwerty dimension 1")
plt.ylabel("qwerty dimension 2")
plt.show()

In [None]:
# compute Euclidean distance to the origin
dist2orig = torch.sqrt( data[:,0]**2 + data[:,1]**2 )

plt.plot(labels+torch.randn(900)/10,dist2orig,'o')
plt.xticks([0,1,2],labels=['blue','black','red'])
plt.ylabel('Euclidean distance (a.u.)')
plt.title('Distance to origin')
plt.show()

In [None]:
# And add that to the data matrix
dataAug = torch.cat((data,dist2orig.view(len(data),1)),axis=1)

# check data sizes
print(data.shape)
print(dataAug.shape)
print(' ')

# look at some of the data
print(dataAug)

In [None]:
# use scikitlearn to split the data
train_data,test_data, train_labels,test_labels = train_test_split(dataAug, labels, test_size=.1)

# then convert them into PyTorch Datasets (note: already converted to tensors)
train_data = torch.utils.data.TensorDataset(train_data,train_labels)
test_data  = torch.utils.data.TensorDataset(test_data,test_labels)

# finally, translate into dataloader objects
batchsize    = 16
train_loader = DataLoader(train_data,batch_size=batchsize,shuffle=True,drop_last=True)
test_loader  = DataLoader(test_data,batch_size=test_data.tensors[0].shape[0])

### Create the model

In [None]:
class qwertyNet(nn.Module):
    def __init__(self, useExtraFeature=False):
        super().__init__()

        # input layer
        if useExtraFeature:
            self.input = nn.Linear(3, 8)
        else:
            self.input = nn.Linear(2, 8)

        self.fc1 = nn.Linear(8, 8)
        self.output = nn.Linear(8, 3)
        self.useExtraFeature = useExtraFeature

    # forward pass
    def forward(self, x):

        # by request, only use XY features
        if not self.useExtraFeature:
            x = x[:, :2]

        x = F.relu(self.input(x))
        x = F.relu(self.fc1(x))
        return self.output(x)


def create_model(useExtraFeature=False):

    net = qwertyNet(useExtraFeature)
    lossfun = nn.CrossEntropyLoss()

    optimizer = torch.optim.SGD(net.parameters(), lr=0.001)

    return net, lossfun, optimizer

In [None]:
# test the model
print('Using augmented feature:')
net = create_model(True)[0]
net(next(iter(train_loader))[0]);

print('\nNot using augmented feature:')
net = create_model(False)[0]
net(next(iter(train_loader))[0]);


### training   

In [None]:
def function2trainTheModel(useExtraFeature=False):

    numepochs = 200
    net, lossfun, optimizer = create_model(useExtraFeature)
    losses = torch.zeros(numepochs)
    trainAcc = []
    testAcc = []

    for epochi in range(numepochs):
        batchAcc = []
        batchLoss = []
        for X, y in train_loader:

            # forward pass and loss
            yHat = net(X)
            loss = lossfun(yHat, y)

            # backprop
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # loss from this batch
            batchLoss.append(loss.item())

            # compute accuracy
            batchAcc.append(100 * (torch.argmax(yHat, axis=1) == y).float().mean())

        trainAcc.append(np.mean(batchAcc))
        losses[epochi] = np.mean(batchLoss)

        # test accuracy
        X, y = next(iter(test_loader))  
        with torch.no_grad():  
            yHat = net(X)

        # compare the following really long line of code to the training accuracy lines
        testAcc.append(100 * torch.mean((torch.argmax(yHat, axis=1) == y).float()))

    return trainAcc, testAcc, losses, net

### run experiment and plot the results

In [None]:
#
def run_experiment():

    # compute accuracy over entire dataset (train+test)
    yHat = net(dataAug)
    predictions = torch.argmax(yHat, axis=1)
    accuracy = (predictions == labels).float()

    # and accuracy by group
    accuracyByGroup = np.zeros(3)
    for i in range(3):
        accuracyByGroup[i] = 100 * torch.mean(accuracy[labels == i])

    # create the figure
    fig, ax = plt.subplots(2, 2, figsize=(10, 6))

    # plot the loss function
    ax[0, 0].plot(losses.detach())
    ax[0, 0].set_ylabel("Loss")
    ax[0, 0].set_xlabel("epoch")
    ax[0, 0].set_title("Losses")

    # plot the accuracy functions
    ax[0, 1].plot(trainAcc, label="Train")
    ax[0, 1].plot(testAcc, label="Test")
    ax[0, 1].set_ylabel("Accuracy (%)")
    ax[0, 1].set_xlabel("Epoch")
    ax[0, 1].set_title("Accuracy")
    ax[0, 1].legend()

    # plot overall accuracy by group
    ax[1, 0].bar(range(3), accuracyByGroup)
    ax[1, 0].set_ylim([np.min(accuracyByGroup) - 5, np.max(accuracyByGroup) + 5])
    ax[1, 0].set_xticks([0, 1, 2])
    ax[1, 0].set_xlabel("Group")
    ax[1, 0].set_ylabel("Accuracy (%)")
    ax[1, 0].set_title("Accuracy by group")

    # scatterplot of correct and incorrect labeled data
    colorShapes = ["bs", "ko", "g^"]  # data markers
    for i in range(3):
        # plot all data points
        ax[1, 1].plot(
            dataAug[labels == i, 0],
            dataAug[labels == i, 1],
            colorShapes[i],
            alpha=0.3,
            label=f"Group {i}",
        )

        # cross-out the incorrect ones
        idxErr = (accuracy == 0) & (labels == i)
        ax[1, 1].plot(dataAug[idxErr, 0], dataAug[idxErr, 1], "rx")

    ax[1, 1].set_title("All groups")
    ax[1, 1].set_xlabel("qwerty dimension 1")
    ax[1, 1].set_ylabel("qwerty dimension 2")
    ax[1, 1].legend()

    plt.tight_layout()
    plt.show()

# Test the model with and without the additional feature

In [None]:
# run the model and visualize the results
trainAcc,testAcc,losses,net = function2trainTheModel(False)
print('Final accuracy: %.2f%%' %testAcc[-1].item())
run_experiment()

In [None]:
# run the model and visualize the results
trainAcc,testAcc,losses,net = function2trainTheModel(True)
print('Final accuracy: %.2f%%' %testAcc[-1].item())
run_experiment()

### t-test


In [None]:
n=30

finalacc2 = np.zeros(n)
finalacc3 = np.zeros(n)

for i in range(n):
  finalacc2[i] = np.mean(function2trainTheModel(False)[1][-10:])
  finalacc3[i] = np.mean(function2trainTheModel(True)[1][-10:])
  print(f"Finished iteration {i+1}/{n}")

# run the t-test and print the results
from scipy import stats
t,p = stats.ttest_ind(finalacc3,finalacc2)
print('\n\nt=%.2f, p=%.2f' %(t,p))

In [None]:
# plot
plt.plot(finalacc2,'bo-',label='2 features')
plt.plot(finalacc3,'ro-',label='3 features')
plt.xlabel('Simulation')
plt.ylabel('Final accuracy')
plt.legend()
plt.show()