<a href="https://colab.research.google.com/github/vlamen/tue-deeplearning/blob/main/assignments/assignment_1/Assignment_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>    


# Student : Mingyao Zhao


In [None]:
import requests
import io
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torch import optim
import torch.nn.functional as F
import matplotlib as plt

%pylab inline

### Training data set


For Assignment 1 you need to use a specific data set prepared using images from the [Omniglot dataset](https://github.com/brendenlake/omniglot). The provided training data set contains images of handwritten characters of size (28,28). 



For training data, the dataset contains 10000 sets of 6 images each. Each set consists of 5 support images and 1 query image. In each set, the first five columns are support images and the last one is a query image.

For training labels, the dataset contains 10000 sets of 5 binary flags for support images. 1 indicates the same character is given in the query image and 0 means not. For example, a label [1,0,0,1,1] means the support images with index 0,3,4 are the same character of query image.

 
 
The following cell provides code that loads the data from hardcoded URLs.You can use the code in this cell to load the dataset or download the data set from the given URLs to your local drive (or your Google drive) and modify the code to load the data from another location. 




In [None]:
def load_numpy_arr_from_url(url):
    """
    Loads a numpy array from surfdrive. 
    
    Input:
    url: Download link of dataset 

    Outputs:
    dataset: numpy array with input features or labels
    """
    
    response = requests.get(url)
    response.raise_for_status()

    return np.load(io.BytesIO(response.content)) 
    
    
    
#Downloading may take a while..
train_data = load_numpy_arr_from_url('https://surfdrive.surf.nl/files/index.php/s/4OXkVie05NPjRKK/download')
train_label = load_numpy_arr_from_url('https://surfdrive.surf.nl/files/index.php/s/oMLFw60zpFX82ua/download')

print(f"train_data shape: {train_data.shape}")
print(f"train_label shape: {train_label.shape}\n")

Now, we plot the first 5 cases in the training dataset. The last column corresponds with the query images of each task. All other images are support images. The image enclosed in a red box denotes the target image that your model should be able to recognize as the same class as the query image. 

In [None]:
def plot_case(caseID,train_data,labels):
    """
    Plots a single sample of the query dataset
    
    Inputs
    caseID: Integer between 0 and 99, each corresponding to a single sample in the query dataset 
    """
    

    support_set,queries = np.split(train_data, [5], axis=1)
    
    f, axes = plt.subplots(1, 6, figsize=(20,5))

    # plot anchor image
    axes[5].imshow(queries[caseID, 0])
    axes[5].set_title(f"Query image case {caseID}", fontsize=15)

    # show all test images images 
    [ax.imshow(support_set[caseID, i]) for i, ax in enumerate(axes[0:-1])]


    # Add the patch to the Axes
    for ind in np.where(labels[caseID]==True)[0]:
        axes[ind].add_patch(Rectangle((0,0),27,27,linewidth=2, edgecolor='r',facecolor='none'))


In [None]:
plot_case(caseID,train_data,train_label) for caseID in range(5)

### Query data set

For this task you need to use the following query data set. The dataset contains 1000 sets of 6 images each. The images are also of hand written characters, however these characters are not present in the training data set. The characters in the query data set all come from the Greek alphabet that is not part of the set of alphabets in the training data. 


In [None]:
    
#Downloading may take a while..
test_data = load_numpy_arr_from_url('https://surfdrive.surf.nl/files/index.php/s/06c34QVUr69CxWY/download')
test_label = load_numpy_arr_from_url('https://surfdrive.surf.nl/files/index.php/s/LQIH1CW7lfDXevk/download')

print(f"test_data shape: {test_data.shape}")
print(f"test_label shape: {test_label.shape}\n")

In [None]:
[plot_case(caseID,test_data,test_label) for caseID in range(5)] ;

### Build pytorch dataset and dataload

In [None]:
import torch
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader,TensorDataset
import numpy as np
from PIL import Image


class MyDataset(Dataset):
    def __init__(self, data, targets):
        self.data = torch.FloatTensor(data)
        self.targets = torch.LongTensor(targets)
           
    def __len__(self):
        return len(self.data)
        
    def __getitem__(self, index):  # get the label and data by index
        x = self.data[index]
        y = self.targets[index]
        return x, y
    
train_dataset=MyDataset(train_data,train_label)
test_dataset=MyDataset(test_data,test_label)

In [None]:
# ## Model Definition ##
import torch
import torch.nn as nn
import torch.nn.functional as F

class Lambda(nn.Module):
    def __init__(self, func):
        super().__init__()
        self.func = func

    def forward(self, x):
        return self.func(x)


def preprocess(x):
    return x.view(-1, 1, 28, 28)


class Mynetwork(nn.Module):
    
    def __init__(self):
        """CNN Builder."""
        super(Mynetwork, self).__init__()

        self.layer = nn.Sequential(
            # Lambda(preprocess),
            nn.Conv2d(1, 32, 3, padding=1),  # 32@28*28
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.Conv2d(32, 64, 3, padding=1),
            nn.ReLU(),    # 64@28*28
            nn.MaxPool2d(2),   # 64@14*14

            nn.Conv2d(64, 128, 3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(), # 128@14*14
            # nn.MaxPool2d(2), 
            nn.Conv2d(128, 128, 3, padding=1),
            nn.ReLU(),   # 128@14*14
            nn.MaxPool2d(2), # 128@7*7

            nn.Conv2d(128, 256, 3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(), # 256@7*7
            # nn.MaxPool2d(2), 
            nn.Conv2d(256, 256, 3, padding=1),
            nn.ReLU(),   # 256@7*
            nn.MaxPool2d(2), # 256@3*3

            #flatten the tensor
            Lambda(lambda x: x.view(x.size(0), -1)),
            nn.Linear(2304, 1024),
            nn.Sigmoid(),
            nn.Linear(1024, 30),
        )

    def forward(self, x):
        """Perform forward."""
        # conv layers
        x = self.layer(x)
        return x

In [None]:
# Loss definition(using the triplet loss for training dataset)
class TripletLoss(nn.Module):
    """
    Triplets loss
    Takes a batch of embeddings and corresponding labels.
    Triplets are generated using triplet_selector object that take embeddings and targets and return indices of
    triplets
    """

    def __init__(self, margin):
        super(TripletLoss, self).__init__()
        self.margin = margin

    def forward(self, anchor, positive, negative):
        ap_distances = (anchor - positive).pow(2).sum(1)  
        an_distances = (anchor - negative).pow(2).sum(1) 
        losses = F.relu(ap_distances - an_distances + self.margin)  # The gap is greater than the margin
        return losses.mean()


In [None]:
## Build Sample Selector ##
class RandomTripletSelector():
    """
    Select random negative example for each positive pair to create triplets
    """
    def __init__(self,input):
        super(RandomTripletSelector, self).__init__()
        self.input=input
        self.data=self.input[:][0]  
        self.target=self.input[:][1]  

    def __len__(self):
        return len(self.input)
    
    def __getitem__(self, item):
        anchor = torch.unsqueeze(self.input[item][0][-1], 0)
        positive_index=[]
        negative_index=[]
        for i ,label_pos in enumerate(self.input[item][1]):
            if label_pos==1:
                positive_index.append(i)
            elif label_pos==0:
                negative_index.append(i)
        positive_indices=np.random.choice(positive_index)
        negative_indices=np.random.choice(negative_index)
        positive=torch.unsqueeze(self.input[item][0][positive_indices],0)
        negative=torch.unsqueeze(self.input[item][0][negative_indices],0)
        return anchor,positive,negative
        

In [None]:
# Training
from sklearn.model_selection import train_test_split

def train_Dou(model, train_dataset, test_dataset, num_epochs, loss_fuc):
    """
    Train the model.
    """
    train_losses = []
    val_losses = []
    for epoch in range(num_epochs):

        tra, val = train_test_split(train_dataset, test_size=0.3, shuffle=True)
        train_ds = RandomTripletSelector(tra)
        val_ds = RandomTripletSelector(val)

        train_loader = torch.utils.data.DataLoader(train_ds ,batch_size=10)
        val_loader = torch.utils.data.DataLoader(val_ds ,batch_size=10)

        running_loss = 0.0
        model.train()  
        print("Starting epoch " + str(epoch+1))
        for (anchor, positive, negative) in train_loader: 
            # Forward
            # print(anchor.shape)
            positive = positive.to(device)
            negative = negative.to(device)
            anchor = anchor.to(device)
            pred_anchor = model(anchor)
            pred_positive = model(positive)
            pred_negative = model(negative)
            loss = loss_fuc(pred_anchor, pred_positive, pred_negative)
            
            # Backward and optimize
            optimizer.zero_grad()  
            loss.backward()  
            optimizer.step() 
            running_loss += loss.item()
            
        avg_train_loss = running_loss / len(train_loader)
        train_losses.append(avg_train_loss)

        val_running_loss = 0.0
        
        #check validation loss after every epoch
        with torch.no_grad():  
            model.eval()  
            for (anchor, positive, negative) in val_loader:
                positive = positive.to(device)
                negative = negative.to(device)
                anchor = anchor.to(device)
                pred_anchor = model(anchor)
                pred_positive = model(positive)
                pred_negative = model(negative)

                loss = loss_fuc(pred_anchor,pred_positive,pred_negative)
                val_running_loss += loss.item()

        avg_val_loss = val_running_loss / len(val_loader)
        val_losses.append(avg_val_loss)

        print('Epoch [{}/{}],Train Loss: {:.4f}, Valid Loss: {:.8f}'
            .format(epoch+1, num_epochs, avg_train_loss, avg_val_loss))
    print("Finished Training")  
    return train_losses, val_losses 

def plot_loss(train_loss, val_loss,epoche):
    """
    plot the loss change during the training precedure
    """
    plt.title("Train and validation loss")
    plt.xlabel("epoch")
    plt.ylabel("loss")
    epoche_range=[i+1 for i in range(epoche)]
    plt.plot(epoche_range, train_loss)
    plt.plot(epoche_range, val_loss)
    plt.legend(['train loss', 'validation loss'], loc='best')
    plt.show()

In [None]:
#train the model and plot loss
epoches = 30
model = Mynetwork()

Loss_func=TripletLoss(margin = 0.5)
optimizer=optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device=torch.device('cpu')
model = model.to(device)

train_losses,val_losses=train_Dou(model, train_dataset, test_dataset, epoches, Loss_func)
plot_loss(train_loss=train_losses,val_loss=val_losses,epoche=epoches)

In [None]:
## Evaluation ##
def similarity_eculide(x,y):
    """
    calculate the distance between x and y
    """
    similarity = torch.cdist(x, y, p=2.0)
    return similarity
   

def confusion(l1,l2):  # l2: predicted, l1: ground truth
    """
    get the number in confusion matrix
    """
    l1 = torch.Tensor(l1)
    l2 = torch.Tensor(l2)
    TP = torch.sum(((l2 == 1) & (l1 == 1)) == 1).item()
    FN = torch.sum(((l2 == 0) & (l1 == 1)) == 1).item()
    FP = torch.sum(((l2 == 1) & (l1 == 0)) == 1).item()
    TN = torch.sum(((l2 == 0) & (l1 == 0)) == 1).item()
    return TP, TN, FP, FN

def compareList(l1,l2):
    if(l1 == l2):
        return 1
    else:
        return 0

def confusion_cal(threshold, dataset):
    """
    cal the TPR, FPR, Accuracy and Complete_Accuracy
    """
    result=[]
    for i in range(len(dataset[:][0])):
        sub_result=[]
        for j in range(5):
            dis = similarity_eculide(model.forward(dataset[i][0][-1].view(-1, 1, 28, 28).to(device)), model.forward(dataset[i][0][j].view(-1, 1, 28, 28).to(device)))
            if dis >= threshold:
                value = 0
            else:
                value = 1
            sub_result.append(value)
        result.append(sub_result)

    target=dataset.targets.tolist()
    value1=np.array([confusion(target[i],result[i]) for i in range(len(dataset))])

    TPR = sum(value1[:,0])/(sum(value1[:,0]) + sum(value1[:,3]) + 1e-5)
    FPR = sum(value1[:,2])/(sum(value1[:,1]) + sum(value1[:,2]) + 1e-5)
    ACC = sum(value1[:,0] + value1[:,1])/(sum(value1[:,0]) + sum(value1[:,1]) + sum(value1[:,2]) + sum(value1[:,3]) + 1e-5)

    value2=[compareList(target[i],result[i]) for i in range(len(dataset))]
    complete_acc=sum(value2)/len(value2)

    return TPR, FPR, ACC, complete_acc,target,result


In [None]:
# determine the best threshold
def range_with_floats(start, stop, step):
    while stop > start:
        yield start
        start += step

acc_dou=[confusion_cal(threshold=i,dataset=train_dataset) for i in range_with_floats(0,3.0,0.1)]
threhlod=[i for i in range_with_floats(0,3.0,0.1)]

In [None]:
#best threshold based on ROC
def plot_roc(TPR,FPR):
    """
    plot the roc curve
    """
    plt.title("ROC curve")
    plt.xlabel("FPR")
    plt.ylabel("TPR")
    plt.plot(FPR, TPR)
    plt.show()

plot_roc([x[0] for x in acc_dou], [x[1] for x in acc_dou])

a1a1 = np.array([x[0] for x in acc_dou])
b1b1 = np.array([x[1] for x in acc_dou])

best_threshold=np.argmax(a1a1 - b1b1)
print('The accuarcy with different threshold')
print('The threhlod with the best ROC is :', threhlod[best_threshold])

In [None]:
#best threshold based on ACC
def plot_acc(ACC,threhlod):
    plt.title("ACC curve")
    plt.xlabel("Threhlod")
    plt.ylabel("ACC")
    plt.plot(threhlod, ACC)
    plt.show()

plot_acc([x[2] for x in acc_dou], threhlod)

a1a1 = np.array([x[2] for x in acc_dou])

best_threshold=np.argmax(a1a1)
print('The accuarcy with different threshold')
print('The threhlod with the best accuarcy is :', threhlod[best_threshold])

In [None]:
#best threshold based on complete ACC
def plot_cacc(Complete_ACC,threhlod):
    plt.title("Complete ACC curve")
    plt.xlabel("Threhlod")
    plt.ylabel("Complete_ACC")
    plt.plot(threhlod, Complete_ACC)
    plt.show()

plot_cacc([x[3] for x in acc_dou], threhlod)

a1a1 = np.array([x[3] for x in acc_dou])

best_threshold=np.argmax(a1a1)
print('The accuarcy with different threshold')
print('The threhlod with the best accuarcy is :', threhlod[best_threshold])

In [None]:
# therefore, we get the best threshold
best_threshold = 1.5
#evulation on train set
acc_final = confusion_cal(threshold=best_threshold,dataset=train_dataset)
print('The accuarcy on train set is :', acc_final[2])
print('The complete accuarcy on train set is :', acc_final[3])
print('TPR on train set is:', acc_final[0])
print('FPR on train set is:', acc_final[1])

In [None]:
#evulation on test set
acc_final = confusion_cal(threshold=best_threshold,dataset=test_dataset)
print('The accuarcy on test set is :', acc_final[2])
print('The complete accuarcy on test set is :', acc_final[3])
print('TPR on test set is:', acc_final[0])
print('FPR on test set is:', acc_final[1])