# Visual Transformer with Linformer

Training Visual Transformer on *Dogs vs Cats Data*

* Dogs vs. Cats Redux: Kernels Edition - https://www.kaggle.com/c/dogs-vs-cats-redux-kernels-edition
* Base Code - https://www.kaggle.com/reukki/pytorch-cnn-tutorial-with-cats-and-dogs/
* Efficient Attention Implementation - https://github.com/lucidrains/vit-pytorch#efficient-attention

## Import Libraries

In [None]:
from __future__ import print_function

import glob
from itertools import chain
import os
import random
import zipfile

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from linformer import Linformer
from PIL import Image
from sklearn.model_selection import train_test_split
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms
from tqdm.notebook import tqdm

from vit_pytorch.efficient import ViT


In [None]:
print(f"Torch: {torch.__version__}")

In [None]:
class Config():
    training_dir = 'data4preTrained/train/'
    testing_dir = 'data4preTrained/test/'
    weights_dir = "weights/"
    train_batch_size = 64
    train_number_epochs = 20
# Create output dir if they're not exist
if not os.path.exists(Config.weights_dir):
    os.makedirs(Config.weights_dir)


In [None]:
# Training settings
batch_size = Config.train_batch_size
epochs = Config.train_number_epochs
lr = 3e-5
gamma = 0.7
seed = 42

In [None]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True

seed_everything(seed)

In [None]:
device = 'cuda'

## Load Data

In [None]:
train_dir = Config.training_dir
test_dir = Config.testing_dir

In [None]:
train_list = glob.glob(os.path.join(train_dir,'class**.npy'))
test_list = glob.glob(os.path.join(test_dir, 'class**.npy'))

In [None]:
test_list

In [None]:
test_data = np.load(train_list[0])
print(test_data.shape)

In [None]:
print(f"Train Data: {len(train_list)}")
print(f"Test Data: {len(test_list)}")

In [None]:
# labels = [path.split('/')[-1].split('.')[0] for path in train_list]
# for lp in range(len(labels)):
#     if 'pureNoise' in labels[lp]:
#         labels[lp] = 'pureNoise'
#     elif 'class0' in labels[lp]:
#         labels[lp] = 'class0'
#     elif 'class1' in labels[lp]:
#         labels[lp] = 'class1'
#     elif 'class2' in labels[lp]:
#         labels[lp] = 'class2'
# print(labels)    

labels = [path.split('/')[-1].split('.')[0] for path in train_list]
for lp in range(len(labels)):
    # if 'pureNoise' in labels[lp]:
    #     labels[lp] = 'pureNoise'
    if 'class0' in labels[lp]:
        labels[lp] = 'class0'
    elif 'class1' in labels[lp]:
        labels[lp] = 'class1'
    elif 'class2' in labels[lp]:
        labels[lp] = 'class2'
print(labels)    

## Random Plots

In [None]:
random_idx = np.random.randint(1, len(train_list), size=9)
fig, axes = plt.subplots(3, 3, figsize=(16, 12))
for idx, ax in enumerate(axes.ravel()):
    img = np.load(train_list[idx])
    ax.set_title(labels[idx])
    ax.imshow(img,origin='lower',aspect='auto')
    


## Split

In [None]:
train_list, valid_list = train_test_split(train_list, 
                                          test_size=0.1,
                                          stratify=labels,
                                          random_state=seed)

In [None]:
print(f"Train Data: {len(train_list)}")
print(f"Validation Data: {len(valid_list)}")
print(f"Test Data: {len(test_list)}")

## Image Augmentation

In [None]:
train_transforms = transforms.Compose(
    [
        transforms.Resize((224, 224)),
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        # transforms.ToTensor(),
    ]
)

val_transforms = transforms.Compose(
    [
        transforms.Resize(256),
        transforms.CenterCrop(224),
        # transforms.ToTensor(),
    ]
)


test_transforms = transforms.Compose(
    [
        transforms.Resize(256),
        transforms.CenterCrop(224),
        # transforms.ToTensor(),
    ]
)


## Load Datasets

In [None]:



class stationaryDataset(Dataset):
    def __init__(self, file_list, transform=None):
        self.file_list = file_list
        self.transform = transform

    def __len__(self):
        self.filelength = len(self.file_list)
        return self.filelength

    def __getitem__(self, idx):
        img_path = self.file_list[idx]
        img = np.load(img_path)

        img = torch.tensor(img).permute(2,0,1)
        # print(img.shape)
        img_transformed = self.transform(img).float()

        # if 'pureNoise' in img_path:
        #     label = 0
        for classNum in range(3):
            if 'class'+str(classNum) in img_path:
                label = classNum
        
       
        return img_transformed, label


In [None]:
train_data = stationaryDataset(train_list, transform=train_transforms)
valid_data = stationaryDataset(valid_list, transform=test_transforms)
test_data = stationaryDataset(test_list, transform=test_transforms)

# refWGNpath = '/home/guoyiyang/github_repo/vit-pytorch/examples/data4preTrained/test/pureNoise_cqtRGB_SNR=1_3.npy'
# refWGNinTensor = np.load(refWGNpath)
# refWGNinTensor = torch.tensor(refWGNinTensor).permute(2,0,1).float()

# train_data = stationaryDataset(refWGNinTensor,train_list, transform=train_transforms)
# valid_data = stationaryDataset(refWGNinTensor,valid_list, transform=test_transforms)
# test_data = stationaryDataset(refWGNinTensor,test_list, transform=test_transforms)

In [None]:
train_loader = DataLoader(dataset = train_data, batch_size=batch_size, shuffle=True )
valid_loader = DataLoader(dataset = valid_data, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset = test_data, batch_size=batch_size, shuffle=True)

In [None]:
print(len(train_data), len(train_loader))

In [None]:
print(len(valid_data), len(valid_loader))

## Efficient Attention

### Linformer

In [None]:
efficient_transformer = Linformer(
    dim=128,
    seq_len=49+1,  # 7x7 patches + 1 cls-token
    depth=12,
    heads=8,
    k=64
)

### Visual Transformer

In [None]:
model = ViT(
    dim=128,
    image_size=224,
    patch_size=32,
    num_classes=3,
    transformer=efficient_transformer,
    channels=3,
).to(device)

### Training

In [None]:
# loss function
criterion = nn.CrossEntropyLoss()
# optimizer
optimizer = optim.Adam(model.parameters(), lr=lr)
# scheduler
scheduler = StepLR(optimizer, step_size=1, gamma=gamma)

In [None]:
epoch_loss_all = []
epoch_accuracy_all = []
epoch_valAcc_all = []
epoch_ValLoss_all = []

### Classification

In [None]:
for epoch in range(epochs):
    epoch_loss = 0
    epoch_accuracy = 0

    for data, label in tqdm(train_loader):
        data = data.to(device)
        label = label.to(device)

        output = model(data)
        loss = criterion(output, label)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        acc = (output.argmax(dim=1) == label).float().mean()
        epoch_accuracy += acc / len(train_loader)
        epoch_loss += loss / len(train_loader)
    epoch_loss_all.append(epoch_loss.cpu().detach().numpy())
    epoch_accuracy_all.append(epoch_accuracy)
    with torch.no_grad():
        epoch_val_accuracy = 0
        epoch_val_loss = 0
        for data, label in valid_loader:
            data = data.to(device)
            label = label.to(device)

            val_output = model(data)
            val_loss = criterion(val_output, label)

            acc = (val_output.argmax(dim=1) == label).float().mean()
            epoch_val_accuracy += acc / len(valid_loader)
            epoch_val_loss += val_loss / len(valid_loader)
        epoch_ValLoss_all.append(epoch_val_loss.cpu().detach().numpy())
        epoch_valAcc_all.append(epoch_val_accuracy)
    print(
        f"Epoch : {epoch+1} - loss : {epoch_loss:.4f} - acc: {epoch_accuracy:.4f} - val_loss : {epoch_val_loss:.4f} - val_acc: {epoch_val_accuracy:.4f}\n"
    )
# torch.save(model.state_dict(), Config.weights_dir+'epoch_{}.pth'.format(epoch))

### Contrastive loss

In [None]:
# plot loss of the figure
plt.plot(range(epochs),epoch_loss_all)
plt.title('Loss')

In [None]:

for lp in range(len(epoch_accuracy_all)):
    epoch_accuracy_all[lp] =epoch_accuracy_all[lp]
for lp in range(len(epoch_valAcc_all)):
    epoch_valAcc_all[lp] =epoch_valAcc_all[lp]


In [None]:

for lp in range(len(epoch_accuracy_all)):
    epoch_accuracy_all[lp] =epoch_accuracy_all[lp].cpu()
for lp in range(len(epoch_valAcc_all)):
    epoch_valAcc_all[lp] =epoch_valAcc_all[lp].cpu()


In [None]:
# Plotting
print(type)
fig, ax1 = plt.subplots()
# ax1.plot(np.arange(len(epoch_accuracy_all)), epoch_accuracy_all, marker='o', color='red', label='train')

ax1.set_xlabel('Epoch')
ax1.set_ylabel('Test Accuracy', color='black')
ax1.tick_params('y', colors='black')
ax1.legend()

ax2 = ax1.twinx()
ax2.plot(np.arange(len(epoch_valAcc_all)), epoch_valAcc_all, marker='o', color='blue', label='valid')
# ax2.plot(np.arange(len(epoch_loss_all)), epoch_loss_all, color='green', label='train_loss')
# ax2.plot(np.arange(len(epoch_ValLoss_all)), epoch_ValLoss_all, color='orange', label='valid_loss')
ax2.set_ylabel('Valid Accuracy', color='black')
ax2.tick_params('y', colors='black')
ax2.legend()

plt.show()

In [None]:
import torch.cuda
from keras.layers import Flatten,Dense,Dropout,Input
from keras.applications import VGG16
# from load_data import load_data_split
from keras.optimizers import SGD
from keras.optimizers import Adam
from keras.models import Model 
# from keras.utils import np_utils
from tensorflow.python.keras.utils.np_utils import to_categorical
# import config
import os
# import tensorflow as tf

print(torch.cuda.is_available())

# print(tf.config.experimental.list_physical_devices(device_type='GPU'))

os.environ["CUDA_DEVICES_ORDER"]="PCI_BUS_IS"
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
dim = 3



# gpus = tf.config.list_physical_devices('GPU')
# if gpus:
#     print("GPU is available and will be used.")
# else:
#     print("GPU is not available. CPU will be used.")


print('[INFO] loading dataset......')
# Convert PyTorch tensors to Numpy arrays
x_train_numpy = []
y_train_numpy = []
x_valid_numpy = []
y_valid_numpy = []

for data, label in train_loader:
    x_train_numpy.append(data.numpy())
    y_train_numpy.append(label.numpy())

for data, label in valid_loader:
    x_valid_numpy.append(data.numpy())
    y_valid_numpy.append(label.numpy())

x_train_numpy = np.concatenate(x_train_numpy, axis=0)
y_train_numpy = np.concatenate(y_train_numpy, axis=0)
x_valid_numpy = np.concatenate(x_valid_numpy, axis=0)
y_valid_numpy = np.concatenate(y_valid_numpy, axis=0)

x_train_numpy = x_train_numpy.transpose((0, 2, 3, 1))
x_valid_numpy = x_valid_numpy.transpose((0, 2, 3, 1))
y_train_numpy = to_categorical(y_train_numpy, num_classes=3)
y_valid_numpy = to_categorical(y_valid_numpy, num_classes=3)


print('[INFO] initializing model......')
base_model=VGG16(weights='imagenet',include_top=False,input_tensor=Input(shape=(224,224,3)))
#微调
head_model=base_model.output
head_model=Flatten(name="flatten")(head_model)
head_model = Dense(512, activation="relu")(head_model)
head_model = Dropout(0.5)(head_model)
head_model=Dense(64,activation='relu')(head_model)
head_model = Dense(3, activation="softmax")(head_model)
model=Model(base_model.input,head_model)
#冻结前面的5个卷积组，只训练自定义的全连接层
for layer in base_model.layers:
    layer.trainable=False
print('[INFO] compiling model')
sgd=SGD(lr=0.0001,momentum=0.9)
adam = Adam(lr=0.0001)
model.compile(loss='categorical_crossentropy',metrics=['accuracy'],optimizer=adam)
print('[INFO] training model')
model.fit(x_train_numpy, y_train_numpy, batch_size=32, epochs=20, validation_data=(x_valid_numpy, y_valid_numpy))
print('[INFO] saving model and weights')
#保存模型（不含权重）
model_json=model.to_json()
open('model_architecture.json','w').write(model_json)
#保存权重
model.save_weights('transfer_learning_weights.h5', overwrite=True)