In [None]:
from tqdm import tqdm
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

from skimage.io import imread
from sklearn.metrics import accuracy_score

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

In [None]:
IMAGE_HEIGHT = 224
IMAGE_WIDTH = 224
BATCH_SIZE = 16

RANDOM_SEED = 42
EPOCHS = 10

PATH = './resnet50.pth'

In [None]:
train_triplets = np.loadtxt('./data/train_triplets.txt', dtype='str')
test_triplets = np.loadtxt('./data/test_triplets.txt', dtype='str')
len(train_triplets)

In [None]:
# train_set, valid_set = train_test_split(train_triplets, test_size=0.1, random_state=RANDOM_SEED)
# len(train_set)

In [None]:
df_train = pd.DataFrame(train_triplets)
df_test = pd.DataFrame(test_triplets)

df_train

In [None]:
df_train.columns = ['A', 'B', 'C']
df_train.insert(df_train.shape[1], 'y', 1)
df_train_append = df_train.copy()
df_train_append['A'] = df_train['B'].copy()
df_train_append['B'] = df_train['A'].copy()
df_train = df_train.append(df_train_append).reset_index(drop=True)
df_train = df_train.sample(frac=1, random_state=RANDOM_SEED).reset_index(drop=True)
df_train

In [None]:
train_set, valid_set = train_test_split(df_train.to_numpy(), test_size=0.1, random_state=RANDOM_SEED)
valid_set

In [None]:
transform_train = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize(size=(IMAGE_HEIGHT, IMAGE_WIDTH)),
    transforms.RandomHorizontalFlip(), 
    transforms.Normalize((0.608, 0.516, 0.412),(0.264, 0.275, 0.296)), 
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize(size=(IMAGE_HEIGHT, IMAGE_WIDTH)),
    transforms.Normalize((0.608, 0.516, 0.412),(0.264, 0.275, 0.296)),
])

In [None]:
class MyDataset(torch.utils.data.Dataset):
    def __init__(self, data, transform=None, trainning=False):

        self.imgs = data

        self.img_A = self.imgs[:, 0]
        self.img_B = self.imgs[:, 1]
        self.img_C = self.imgs[:, 2]
        # self.label = data[:, 3]
        self.transform = transform
        self.trainning = trainning
        
        if self.trainning:
            self.label = self.imgs[:, 3]
    
    def __getitem__(self, index):
        img_A = imread( './food/' + self.img_A[index] + '.jpg')
        img_B = imread( './food/' + self.img_B[index] + '.jpg')
        img_C = imread( './food/' + self.img_C[index] + '.jpg')

        if self.transform is not None:
            img_A = self.transform(img_A)
            img_B = self.transform(img_B)
            img_C = self.transform(img_C)

        if self.trainning:
            label = self.label[index]
            return img_A, img_B, img_C, label
        else:
            return img_A, img_B, img_C

    def __len__(self):
        return self.imgs.shape[0]


train_data = MyDataset(data=train_set, transform=transform_train, trainning=True)
valid_data = MyDataset(data=valid_set, transform=transform_test, trainning=True)
test_data = MyDataset(data=test_triplets, transform=transform_test, trainning=False)
train_data


In [None]:
from torch.utils.data import DataLoader 

train_loader = DataLoader(dataset=train_data, batch_size=BATCH_SIZE, shuffle=True)
valid_loader = DataLoader(dataset=valid_data, batch_size=BATCH_SIZE)
test_loader = DataLoader(dataset=test_data,batch_size=BATCH_SIZE)

In [None]:
import torch.nn as nn
import torchvision.models as models

class ConvNet(nn.Module):
    def __init__(self, net):
        super(ConvNet, self).__init__()
        self.net = net
    def forward(self, img_A, img_B, img_C):
        anchor = self.net(img_A)
        postive = self.net(img_B)
        negative = self.net(img_C)
        return anchor, postive, negative

class myLayer(nn.Module):
    def __init__(self, input):
        super(myLayer, self).__init__()
        input_size = input.fc.in_features
        self.fc1 = nn.Linear(input_size, 1000)
        self.fc2 = nn.Linear(1000, 512)

        self.bn = nn.BatchNorm1d(1000)
        self.drop = nn.Dropout(p=0.4)

        self.relu = nn.ReLU()
    def forward(self, x):
        x = x.view(x.size(0), -1)
        x = self.relu(self.bn(self.fc1(x)))
        x = self.drop(x)
        x = self.fc2(x)
        return x

model = models.resnet50(pretrained=True)
pretrained_model = nn.Sequential(*list(model.children()))[:-1]

for param in pretrained_model.parameters():
    param.requires_grad = False

pretrained_model.add_module('myLayer',myLayer(model))

net = ConvNet(pretrained_model).to(device)

In [None]:
import torch.optim as optim

criterion = nn.TripletMarginLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

In [None]:
# for A, B, C, label in train_loader:
#     print(A)
#     break
# for i, data in enumerate(train_loader, 0):
#     print(data[3].reshape(-1,1))
#     print(data[3].size())
#     break
# y_pred = [0, 2, 1, 3]
# y_true = [0, 1, 2, 3]
# accuracy_score(y_true, y_pred)

In [None]:
def valid(anchor, positive, negative, label):
    dist_pos = torch.norm(anchor - positive, p=2, dim=1)
    dist_neg = torch.norm(anchor - negative, p=2, dim=1)
    diff = (dist_neg - dist_pos).detach().cpu().numpy()
    predict = np.ceil(diff.clip(0,1))
    
    return accuracy_score(label, predict, normalize=False)

In [None]:
def train(model):
    valid_accuracy = 0.0
    for epoch in range(EPOCHS):
        running_loss = 0.0
        valid_loss = 0.0
        correct = 0
        total = 0
        accuracy = 0.0

        print('Training')
        # trainning
        for i, data in tqdm(enumerate(train_loader, 0)):
            img_A = data[0].to(device)
            img_B = data[1].to(device)
            img_C = data[2].to(device)
            
            optimizer.zero_grad()

            anchor, positive, negetive = model(img_A, img_B, img_C)

            loss = criterion(anchor, positive, negetive)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
        print(f'[{epoch + 1}] average loss per epoch: {running_loss / len(train_loader):.3f}')

        print('Validation')
        # validation
        model.eval()
        with torch.no_grad():
            for i, data in tqdm(enumerate(valid_loader, 0)):
                img_A = data[0].to(device)
                img_B = data[1].to(device)
                img_C = data[2].to(device)
                labels = data[3].reshape(-1, 1)

                anchor, positive, negetive = model(img_A, img_B, img_C)
                loss = criterion(anchor, positive, negetive)
                valid_loss += loss
                correct += valid(anchor, positive, negetive, labels)
                total += labels.size(0)
                accuracy = correct / total
        print(f'[{epoch + 1}] Accuracy of the network on the {total} valid images: {100 * accuracy} %')

        if accuracy >= valid_accuracy:
            valid_accuracy = accuracy
            torch.save(model.state_dict(), PATH)
    print('Finished Training')
    

In [None]:
train(net)

In [None]:
net.load_state_dict(torch.load(PATH))
net.to(device)

In [None]:
def predict(anchor, positive, negative):
    dist_pos = torch.norm(anchor - positive, p=2, dim=1)
    dist_neg = torch.norm(anchor - negative, p=2, dim=1)
    diff = (dist_neg - dist_pos).detach().to('cpu').numpy()
    predict = np.ceil(diff.clip(0,1))

    return(predict)

In [None]:
def test(model):
    predictions = []
    model.eval()
    with torch.no_grad():
        for i, data in tqdm(enumerate(test_loader, 0)):
            img_A = data[0].to(device)
            img_B = data[1].to(device)
            img_C = data[2].to(device)

            anchor, positive, negetive = model(img_A, img_B, img_C)

            pred = predict(anchor, positive, negetive)
            predictions.append(pred)
    return predictions

In [None]:
output = test(net)
predictions = []
for i in range(len(output)):
    output[i].astype(np.int8)
    for j in range(len(output[i])):
        predictions.append(output[i][j])
np.savetxt('predictions_resnet50.txt', predictions, fmt='%i')