In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import torchvision
from torchvision import *
from torch.utils.data import Dataset, DataLoader
import torchvision.models as models

from torchvision.datasets import CIFAR10
from collections import Counter
import random

import matplotlib.pyplot as plt
from IPython.display import clear_output
import os

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

# 1. Transfer Learning


1.1 Возьмите датасет Cifar10 и ResNet

1.2 Замените у Resnet послежний слой

1.3 Проведите эксперимент: обучите ResNet на Cifar10 без аугментации

In [None]:
batch_size = 128*2

In [None]:
transforms = torchvision.transforms.ToTensor() # не используем аугментацию

cifar_train = CIFAR10(root = "/content", train=True, download = True, transform=transforms)
train_loader = torch.utils.data.DataLoader(cifar_train, batch_size=batch_size, shuffle=True)

cifar_test = CIFAR10(root = "/content", train=False, download = True, transform=transforms)
test_loader = torch.utils.data.DataLoader(cifar_test, batch_size=batch_size, shuffle=True)

cifar = CIFAR10(root = "/content", train=True, download = True)

y = cifar.targets
X = cifar.data

X_mean = torch.Tensor(np.mean(X,0))

print(X.shape)

In [None]:
# Посмотрим на распределение данных в классах
keys = list(cifar.class_to_idx.keys())
#for i in range(len(keys)):
#    print(Counter(y)[i], cifar10.class_to_idx[keys[i]], keys[i])
len(keys)

In [None]:
def show_example(X, y, label, grid = (3,3)):
    y_label = [i for i, tag in enumerate(y) if label == tag]
    random.shuffle(y_label)
    
    rows = grid[0]
    columns = grid[1]
    
    fig, axes = plt.subplots(rows, columns)
    fig.set_figheight(7)
    fig.set_figwidth(7)
 
    for row in axes:
        for col in row: col.imshow(X[y_label.pop()])
    plt.show()

#airplane
show_example(X, y, 0)

## 1.1 Обучение ResNet на Cifar10 без аугментации

In [None]:
net = models.resnet18(pretrained=True).to(device)

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.0001, momentum=0.9)

num_ftrs = net.fc.in_features # заменили у ResNet последний слой
net.fc = nn.Linear(num_ftrs, len(keys)).to(device)
#net

In [None]:
n_epochs = 5
print_every = 10
valid_loss_min = np.Inf
val_loss = []
val_acc = []
train_loss = []
train_acc = []
total_step = len(train_loader)

for epoch in range(1, n_epochs+1):
    running_loss = 0.0
    correct = 0
    total=0
    print(f'Epoch {epoch}')
    for batch_idx, (data_, target_) in enumerate(train_loader):
        data_, target_ = data_.to(device), target_.to(device)
        optimizer.zero_grad()
        
        outputs = net(data_)
        loss = criterion(outputs, target_)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _,pred = torch.max(outputs, dim=1)
        correct += torch.sum(pred==target_).item()
        total += target_.size(0)
        #if (batch_idx) % 20 == 0:
        #    print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}' 
        #           .format(epoch, n_epochs, batch_idx, total_step, loss.item()))
    train_acc.append(100 * correct / total)
    train_loss.append(running_loss/total_step)
    print(f'train-loss: {np.mean(train_loss):.4f}, train-acc: {(100 * correct/total):.4f}')
    batch_loss = 0
    total_t=0
    correct_t=0

    with torch.no_grad():
        net.eval()
        for data_t, target_t in (test_loader):
            data_t, target_t = data_t.to(device), target_t.to(device)
            outputs_t = net(data_t)
            loss_t = criterion(outputs_t, target_t)
            batch_loss += loss_t.item()
            _,pred_t = torch.max(outputs_t, dim=1)
            correct_t += torch.sum(pred_t==target_t).item()
            total_t += target_t.size(0)
        val_acc.append(100 * correct_t/total_t)
        val_loss.append(batch_loss/len(test_loader))
        network_learned = batch_loss < valid_loss_min
        print(f'validation loss: {np.mean(val_loss):.4f}, validation acc: {(100 * correct_t/total_t):.4f}\n')

# =============================== пример сохранения лучшей модели ==================
        #if network_learned:
        #    valid_loss_min = batch_loss
        #    torch.save(net.state_dict(), 'resnet.pt')
        #    print('Improvement-Detected, save-model')
    net.train()

In [None]:
fig = plt.figure(figsize=(6,4))
plt.title("Train-Validation Accuracy")
plt.plot(train_acc, label='train')
plt.plot(val_acc, label='validation')
plt.xlabel('num_epochs', fontsize=12)
plt.ylabel('accuracy', fontsize=12)
plt.grid()
plt.legend(loc='best')

## 1.2 Обучение ResNet на Cifar10 с аугментацией

1.1 Возьмите датасет ResNet из предыдущего задания

1.2 Проведите эксперимент: обучите ResNet на Cifar10 с аугментацией

1.3 Сделайте выводы, сравнив обучения в 1м и 2м заданиях

[How Data Augmentation Improves your CNN performance? — An Experiment in PyTorch and Torchvision](https://medium.com/swlh/how-data-augmentation-improves-your-cnn-performance-an-experiment-in-pytorch-and-torchvision-e5fb36d038fb)

Возможные трансформации можно взять в руководстве [TORCHVISION.TRANSFORMS](https://pytorch.org/vision/stable/transforms.html?highlight=augmentation%20examples)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
#normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
#                                 std=[0.229, 0.224, 0.225])

train_transform = transforms.Compose([
    transforms.GaussianBlur(7, sigma=(0.1, 2.0)),
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    #normalize,
])

test_transform = transforms.Compose([ # тестовую выборку не аугментируют!!
    transforms.ToTensor(),
    #normalize,
])


In [None]:
# Посмотрим на распределение данных в классах
keys = list(cifar.class_to_idx.keys())
len(keys)

In [None]:
net = models.resnet18(pretrained=True).to(device)

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.0001, momentum=0.9)

num_ftrs = net.fc.in_features # заменили у ResNet последний слой
net.fc = nn.Linear(num_ftrs, len(keys)).to(device)
#net

In [None]:
n_epochs = 5
print_every = 10
valid_loss_min = np.Inf
val_loss = []
val_acc = []
train_loss = []
train_acc = []
total_step = len(train_loader)

for epoch in range(1, n_epochs+1):
    running_loss = 0.0
    correct = 0
    total=0
    print(f'Epoch {epoch}')
    for batch_idx, (data_, target_) in enumerate(train_loader):
        data_, target_ = data_.to(device), target_.to(device)
        optimizer.zero_grad()
        
        outputs = net(data_)
        loss = criterion(outputs, target_)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _,pred = torch.max(outputs, dim=1)
        correct += torch.sum(pred==target_).item()
        total += target_.size(0)
        #if (batch_idx) % 20 == 0:
        #    print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}' 
        #           .format(epoch, n_epochs, batch_idx, total_step, loss.item()))
    train_acc.append(100 * correct / total)
    train_loss.append(running_loss/total_step)
    print(f'train-loss: {np.mean(train_loss):.4f}, train-acc: {(100 * correct/total):.4f}')
    batch_loss = 0
    total_t=0
    correct_t=0

    with torch.no_grad():
        net.eval()
        for data_t, target_t in (test_loader):
            data_t, target_t = data_t.to(device), target_t.to(device)
            outputs_t = net(data_t)
            loss_t = criterion(outputs_t, target_t)
            batch_loss += loss_t.item()
            _,pred_t = torch.max(outputs_t, dim=1)
            correct_t += torch.sum(pred_t==target_t).item()
            total_t += target_t.size(0)
        val_acc.append(100 * correct_t/total_t)
        val_loss.append(batch_loss/len(test_loader))
        network_learned = batch_loss < valid_loss_min
        print(f'validation loss: {np.mean(val_loss):.4f}, validation acc: {(100 * correct_t/total_t):.4f}\n')

    net.train()

In [None]:
fig = plt.figure(figsize=(6,4))
plt.title("Train-Validation Accuracy")
plt.plot(train_acc, label='train')
plt.plot(val_acc, label='validation')
plt.xlabel('num_epochs', fontsize=12)
plt.ylabel('accuracy', fontsize=12)
plt.grid()
plt.legend(loc='best')

*Примечание:*
- обратите внимание, что кривые `train` и  `test` после аугментации идут параллельно, а значит модель не переучивается
- если у студентов качество с аугментацией будет хуже, чем без нее, про проблема в том, что Аугментациию нужно применять с умом!! не на каждом датасете она улучшит качество обучения
- впрроченм, сейчас задача просто научиться аугментировать датасет

# 3. Если нужно, то добавлю задание еще для одного датасета с пчелами и муравьями - transfer learning в resnet
---
но, кажется, изза скорости обучения, и так много времени займут первые 2 задания

# 2. One-Shot Learning
Решим задачу с Kaggle:

Необходимо обучить сиамскую можель распознават подписи из [kaggle signature verification dataset](https://www.kaggle.com/robinreni/signature-verification-dataset) - отсюда нужно скачать датасет и перенести на свой гугл диск

[Do More With Less Data! — One-shot Learning with Siamese Neural Networks](https://medium.com/sfu-cspmp/do-more-with-less-data-one-shot-learning-with-siamese-neural-networks-760357a2f5cc) - подписи

[SigNet: Convolutional Siamese Network for Writer Independent Offline Signature Verification](https://arxiv.org/pdf/1707.02131.pdf) - статья

[Siamese-Network-for-Signature-Verification](https://github.com/gskdhiman/Siamese-Network-for-Signature-Verification/blob/master/siamese-network-for-signature-verification.ipynb)

<img src ="http://edunet.kea.su/repo/src/L11_Transfer_learning/img/EX2.png" width="600">

**Siamese Neural Network**

Сиамская нейронная сеть - это класс архитектур нейронных сетей, которые содержат две или более идентичных подсетей.

«Идентичные» здесь означает, что они имеют одинаковую конфигурацию с одинаковыми параметрами и весом.

Обновление параметров зеркально отражается в обеих подсетях и используется для поиска сходства входных данных путем сравнения их векторов признаков.

----
В этом задании Вам надо, скачав готовый датасет, реализовать задачу One-Shot Learning для распознавания оригиналиных и поддельных подписей. По аналогии с примером с лицами в лекции

<img src ="http://edunet.kea.su/repo/src/L11_Transfer_learning/img/EX1.png" width="600">

### План


1.   **Data Preprocessing**
2.   **Define the Siamese Network**
3.   **Feature Vector Extraction**
4.   **Similarity Score Calculation**
5.   **Defininf Loss Function**
6.   **Optimizer**
7.   **Testing using One-Shot Learnig**
8.   **Making Predictions**



### Import and Install all the necessary packages

In [None]:
# Import all the necessary Library 
import torchvision
import torch.utils.data as utils
from torchvision import datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader,Dataset
from torch.autograd import Variable
import matplotlib.pyplot as plt
import torchvision.utils
import numpy as np
import time
import copy
from torch.optim import lr_scheduler
import os
from PIL import Image
import torch
from torch.autograd import Variable
import PIL.ImageOps    
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
import pandas as pd 

from IPython.display import clear_output
from zipfile import ZipFile

In [None]:
# архив с подписями находится тут:
# hhttps://drive.google.com/file/d/14DzmqKJmSzt1KMn6AB0DZ9yvSzkeo8BK/view?usp=sharing

In [None]:
from google.colab import drive
drive.mount('/content/drive')

### Additional Utility Functions 

In [None]:
def imshow(img,text=None,should_save=False):
    npimg = img.numpy()
    plt.axis("off")
    if text:
        plt.text(75, 8, text, style='italic',fontweight='bold',
            bbox={'facecolor':'white', 'alpha':0.8, 'pad':10})
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()    

def show_plot(iteration,loss):
    plt.plot(iteration,loss)
    plt.show()

In [None]:
class Config():
    training_dir = "/content/sign_data_mini/train"
    testing_dir = "/content/sign_data_mini/test"
    train_batch_size = 32
    train_number_epochs = 5

### Load Dataset :

Datasets может быть загружен по ссылке:  https://drive.google.com/file/d/14DzmqKJmSzt1KMn6AB0DZ9yvSzkeo8BK/view?usp=sharing 


In [None]:
!cp "/content/drive/MyDrive/МГУ/L11 Transfer Learning/data/sign_data_mini.zip" .
!unzip sign_data_mini.zip
clear_output()
!ls /content/sign_data_mini

In [None]:
training_dir="/content/sign_data_mini/train"
training_csv="/content/sign_data_mini/train_data.csv"
testing_csv="/content/sign_data_mini/test_data.csv"
testing_dir="/content/sign_data_mini/test"

### Preprocessing and Loading Dataset

We preprocessed all the images and loaded them as .npy files which is easy to transfer . You can follow your own preprocessing steps .


In [None]:
class SiameseNetworkDataset():
    
    def __init__(self,training_csv=None,training_dir=None,transform=None):
        # used to prepare the labels and images path
        self.training_df=pd.read_csv(training_csv)
        self.training_df.columns =["image1","image2","label"]
        self.training_dir = training_dir    
        self.transform = transform

    def __getitem__(self,index):
        
        # getting the image path
        image1_path=os.path.join(self.training_dir,self.training_df.iat[index,0])
        image2_path=os.path.join(self.training_dir,self.training_df.iat[index,1])
        
        
        # Loading the image
        img0 = Image.open(image1_path)
        img1 = Image.open(image2_path)
        img0 = img0.convert("L")
        img1 = img1.convert("L")
        
        # Apply image transformations
        if self.transform is not None:
            img0 = self.transform(img0)
            img1 = self.transform(img1)
        
        return img0, img1 , torch.from_numpy(np.array([int(self.training_df.iat[index,2])],dtype=np.float32))
    
    def __len__(self):
        return len(self.training_df)

In [None]:
# Load the the dataset from raw image folders
siamese_dataset = SiameseNetworkDataset(training_csv,training_dir,
                                        transform=transforms.Compose([transforms.Resize((105,105)),
                                                                      transforms.ToTensor()
                                                                      ])
                                       )

In [None]:
# Viewing the sample of images and to check whether its loading properly
print('"0" - подписи идентичны, "1" - подделка')
vis_dataloader = DataLoader(siamese_dataset,
                        shuffle=True,
                        batch_size=8)
dataiter = iter(vis_dataloader)


example_batch = next(dataiter)
concatenated = torch.cat((example_batch[0],example_batch[1]),0)
imshow(torchvision.utils.make_grid(concatenated))
print(example_batch[2].numpy())

## Siamese Network Definition

In [None]:
class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()
        
        # Setting up the Sequential of CNN Layers
        self.cnn1 = nn.Sequential(
            
            nn.Conv2d(1, 96, kernel_size=11,stride=1),
            nn.ReLU(inplace=True),
            nn.LocalResponseNorm(5,alpha=0.0001,beta=0.75,k=2),
            nn.MaxPool2d(3, stride=2),
            
            nn.Conv2d(96, 256, kernel_size=5,stride=1,padding=2),
            nn.ReLU(inplace=True),
            nn.LocalResponseNorm(5,alpha=0.0001,beta=0.75,k=2),
            nn.MaxPool2d(3, stride=2),
            nn.Dropout2d(p=0.3),

            nn.Conv2d(256,384 , kernel_size=3,stride=1,padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(384,256 , kernel_size=3,stride=1,padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(3, stride=2),
            nn.Dropout2d(p=0.3),

        )
        
        # Defining the fully connected layers
        self.fc1 = nn.Sequential(
            nn.Linear(30976, 1024),
            nn.ReLU(inplace=True),
            nn.Dropout2d(p=0.5),
            
            nn.Linear(1024, 128),
            nn.ReLU(inplace=True),
            
            nn.Linear(128,2))
        
    def forward_once(self, x):
        # Forward pass 
        output = self.cnn1(x)
        output = output.view(output.size()[0], -1)
        output = self.fc1(output)
        return output

    def forward(self, input1, input2):
        # forward pass of input 1
        output1 = self.forward_once(input1)
        # forward pass of input 2
        output2 = self.forward_once(input2)
        return output1, output2


### Loss Function

In [None]:
class ContrastiveLoss(torch.nn.Module):
    """
    Contrastive loss function.
    Based on: http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf
    """

    def __init__(self, margin=2.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        euclidean_distance = F.pairwise_distance(output1, output2)
        loss_contrastive = torch.mean((1-label) * torch.pow(euclidean_distance, 2) +
                                      (label) * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2))


        return loss_contrastive

### Train the Model

In [None]:
# Load the dataset as pytorch tensors using dataloader
train_dataloader = DataLoader(siamese_dataset,
                        shuffle=True,
                        batch_size=Config.train_batch_size)

In [None]:
# Declare Siamese Network
net = SiameseNetwork().cuda()
# Decalre Loss Function
criterion = ContrastiveLoss()
# Declare Optimizer
optimizer = optim.RMSprop(net.parameters(), lr=1e-4, alpha=0.99, eps=1e-8, weight_decay=0.0005, momentum=0.9)

In [None]:
def train():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    counter = []
    loss_history = [] 
    iteration_number= 0
    
    for epoch in range(0,Config.train_number_epochs):
        for i, data in enumerate(train_dataloader,0):
            img0, img1 , label = data
            img0, img1 , label = img0.cuda(), img1.cuda() , label.cuda()
            optimizer.zero_grad()
            output1,output2 = net(img0,img1)
            loss_contrastive = criterion(output1,output2,label)
            loss_contrastive.backward()
            optimizer.step()
            if i %200 == 0 :
                print("Epoch number {}\n Current loss {}\n".format(epoch,loss_contrastive.item()))
                iteration_number +=10
                counter.append(iteration_number)
                loss_history.append(loss_contrastive.item())
    return net

In [None]:
# Train the model
model = train()
torch.save(model.state_dict(), "/content/model.pt")
print("Model Saved Successfully")

In [None]:
# Load the saved model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = SiameseNetwork().to(device)
model.load_state_dict(torch.load("/content/model.pt"))

In [None]:
# Load the test dataset
test_dataset = SiameseNetworkDataset(training_csv=testing_csv,training_dir=testing_dir,
                                        transform=transforms.Compose([transforms.Resize((105,105)),
                                                                      transforms.ToTensor()
                                                                      ])
                                       )

test_dataloader = DataLoader(test_dataset,batch_size=1,shuffle=True)

In [None]:
# Print the sample outputs to view its dissimilarity
counter=0
list_0 = torch.FloatTensor([[0]])
list_1 = torch.FloatTensor([[1]])
for i, data in enumerate(test_dataloader,0): 
  x0, x1 , label = data
  concatenated = torch.cat((x0,x1),0)
  output1,output2 = model(x0.to(device),x1.to(device))
  eucledian_distance = F.pairwise_distance(output1, output2)
  if label==list_0:
    label="Orginial"
  else:
    label="Forged"
  imshow(torchvision.utils.make_grid(concatenated),'Dissimilarity: {:.2f} Label: {}'.format(eucledian_distance.item(),label))
  counter=counter+1
  if counter ==20:
     break

### Accuracy Check

In [None]:
test_dataloader = DataLoader(test_dataset,batch_size=1,shuffle=True)
accuracy=0
counter=0
correct=0
for i, data in enumerate(test_dataloader,0): 
  x0, x1 , label = data
  # onehsot applies in the output of 128 dense vectors which is then converted to 2 dense vectors
  output1,output2 = model(x0.to(device),x1.to(device))
  res=torch.abs(output1.cuda() - output2.cuda())
  label=label[0].tolist()
  label=int(label[0])
  result=torch.max(res,1)[1].data[0].tolist()
  if label == result:
    correct=correct+1
  counter=counter+1
#   if counter ==20:
#      break
    
accuracy=(correct/len(test_dataloader))*100
print("Accuracy:{}%".format(accuracy))

### **ВЫВОД:**
Мы видим эффективность такого полхода, так как даже при низком `accuracy`, мы достигаем высокое качество распознавания поддельных подписей.

Вопрос: можно ли тут верить `accurary`? Или нет, тк у нас не сбалансированный датасет