In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [11]:
from os import walk
import torch
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import os
from PIL import Image
import cv2
from torchsummary import summary
from torch.utils.data import Dataset, DataLoader, random_split
import pandas as pd
import numpy as np
import random
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms.functional as TF
join = os.path.join

In [3]:
class TrainDataset(Dataset):
    def __init__(self, categories, root_dir, setSize, transform=None):
        self.categories = categories
        self.rootdir = root_dir
        self.transform = transform
        self.setSize = setSize
    def __len__(self):
        return self.setSize
    def __getitem__(self, idx):
        img1 = None
        img2 = None
        label = None
        if idx % 2 == 0: 
            category = random.choice(self.categories)
            img1Name = random.choice(category[1])
            img2Name = random.choice(category[1])
            img1 = Image.open(join(self.rootdir,join(category[0],img1Name)))
            img2 = Image.open(join(self.rootdir,join(category[0],img2Name)))
            label = 1.0
        else: 
            category1, category2 = random.choice(self.categories), random.choice(self.categories)
            while category1[0] == category2[0]:
                category2 = random.choice(self.categories)
            img1Name = random.choice(category1[1])
            img2Name = random.choice(category2[1])
            label = 0.0
            img1 = Image.open(join(self.rootdir,join(category1[0],img1Name)))
            img2 = Image.open(join(self.rootdir,join(category2[0],img2Name)))
        if self.transform:
            img1 = self.transform(img1)
            img2 = self.transform(img2)
        return img1, img2, torch.from_numpy(np.array([label], dtype=np.float32))     

In [4]:
dataSize = 10000 
TRAIN_PCT = 0.8 
train_size = int(dataSize * TRAIN_PCT)
val_size = dataSize - train_size

transformations = transforms.Compose(
    [transforms.Resize((105,105)),
        transforms.ToTensor()]) 
root_dir = '/content/drive/MyDrive/Task/Egg/oneshot_data'
k= []
for folder in os.listdir(root_dir):
     if not folder.startswith('.'):
        k.append([folder, os.listdir(join(root_dir , folder))])
omniglotDataset = TrainDataset(k, root_dir, dataSize, transformations)
train_set, val_set = random_split(omniglotDataset, [train_size, val_size])
train_loader = torch.utils.data.DataLoader(train_set, batch_size=128, num_workers=16)
val_loader = torch.utils.data.DataLoader(val_set, batch_size=1, num_workers=16, shuffle=True)



In [5]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        
        self.conv1 = nn.Conv2d(3, 64, 10) 
        self.conv2 = nn.Conv2d(64, 128, 7)  
        self.conv3 = nn.Conv2d(128, 128, 4)
        self.conv4 = nn.Conv2d(128, 256, 4)
        self.bn1 = nn.BatchNorm2d(64)
        self.bn2 = nn.BatchNorm2d(128)
        self.bn3 = nn.BatchNorm2d(128)
        self.bn4 = nn.BatchNorm2d(256)
        self.dropout1 = nn.Dropout(0.1)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(256 * 6 * 6, 4096)
        self.fcOut = nn.Linear(4096, 1)
        self.sigmoid = nn.Sigmoid()


    def convs(self, x):

        
        x = F.relu(self.bn1(self.conv1(x)))

        x = F.max_pool2d(x, (2,2))

        x = F.relu(self.bn2(self.conv2(x)))

        x = F.max_pool2d(x, (2,2))

        x = F.relu(self.bn3(self.conv3(x)))

        x = F.max_pool2d(x, (2,2))

        x = F.relu(self.bn4(self.conv4(x)))

        return x


    def forward(self, x1, x2):
        x1 = self.convs(x1)
        x1 = x1.view(-1, 256 * 6 * 6)
        x1 = self.sigmoid(self.fc1(x1))

        
        x2 = self.convs(x2)
        x2 = x2.view(-1, 256 * 6 * 6)
        x2 = self.sigmoid(self.fc1(x2))

        x = torch.abs(x1 - x2)
        x = self.fcOut(x)
        return x

In [6]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
siameseBaseLine = Net()
siameseBaseLine = siameseBaseLine.to(device)


In [7]:
def save_checkpoint(save_path, model, optimizer, val_loss):
    if save_path==None:
        return
    save_path = save_path 
    state_dict = {'model_state_dict': model.state_dict(),
                  'optimizer_state_dict': optimizer.state_dict(),
                  'val_loss': val_loss}

    torch.save(state_dict, save_path)

    print(f'Model saved to ==> {save_path}')

def load_checkpoint(model, optimizer):
    save_path = f'siameseNet-batchnorm50.pt'
    state_dict = torch.load(save_path)
    model.load_state_dict(state_dict['model_state_dict'])
    optimizer.load_state_dict(state_dict['optimizer_state_dict'])
    val_loss = state_dict['val_loss']
    print(f'Model loaded from <== {save_path}')
    
    return val_loss

In [8]:
import torch.optim as optim

optimizer = optim.Adam(siameseBaseLine.parameters(), lr = 0.0006)
num_epochs = 50
criterion = nn.BCEWithLogitsLoss()
save_path = '/content/drive/MyDrive/Task/Egg/model/siameseNet-batchnorm50.pt'

In [9]:
def train(model, train_loader, val_loader, num_epochs, criterion, save_name):
    best_val_loss = float("Inf") 
    train_losses = []
    val_losses = []
    cur_step = 0
    for epoch in range(num_epochs):
        running_loss = 0.0
        model.train()
        print("Starting epoch " + str(epoch+1))
        for img1, img2, labels in train_loader:
            

            img1 = img1.to(device)
            img2 = img2.to(device)
            labels = labels.to(device)
            outputs = model(img1, img2)
            loss = criterion(outputs, labels)
            
        
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        avg_train_loss = running_loss / len(train_loader)
        train_losses.append(avg_train_loss)
        
        val_running_loss = 0.0
        with torch.no_grad():
            model.eval()
            for img1, img2, labels in val_loader:
                img1 = img1.to(device)
                img2 = img2.to(device)
                labels = labels.to(device)
                outputs = model(img1, img2)
                loss = criterion(outputs, labels)
                val_running_loss += loss.item()
        avg_val_loss = val_running_loss / len(val_loader)
        val_losses.append(avg_val_loss)
        
        print('Epoch [{}/{}],Train Loss: {:.4f}, Valid Loss: {:.8f}'
            .format(epoch+1, num_epochs, avg_train_loss, avg_val_loss))
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            save_checkpoint(save_name, model, optimizer, best_val_loss)
    
    print("Finished Training")  
    return train_losses, val_losses  


In [10]:
train_losses, val_losses = train(siameseBaseLine, train_loader, val_loader, num_epochs, criterion, save_path)

Starting epoch 1
Epoch [1/50],Train Loss: 0.2723, Valid Loss: 0.14103082
Model saved to ==> /content/drive/MyDrive/Task/Egg/model/siameseNet-batchnorm50.pt
Starting epoch 2
Epoch [2/50],Train Loss: 0.0972, Valid Loss: 0.08534995
Model saved to ==> /content/drive/MyDrive/Task/Egg/model/siameseNet-batchnorm50.pt
Starting epoch 3
Epoch [3/50],Train Loss: 0.0633, Valid Loss: 0.07765579
Model saved to ==> /content/drive/MyDrive/Task/Egg/model/siameseNet-batchnorm50.pt
Starting epoch 4
Epoch [4/50],Train Loss: 0.0544, Valid Loss: 0.07426170
Model saved to ==> /content/drive/MyDrive/Task/Egg/model/siameseNet-batchnorm50.pt
Starting epoch 5
Epoch [5/50],Train Loss: 0.0451, Valid Loss: 0.06892415
Model saved to ==> /content/drive/MyDrive/Task/Egg/model/siameseNet-batchnorm50.pt
Starting epoch 6
Epoch [6/50],Train Loss: 0.0295, Valid Loss: 0.07679268
Starting epoch 7
Epoch [7/50],Train Loss: 0.0260, Valid Loss: 0.06444442
Model saved to ==> /content/drive/MyDrive/Task/Egg/model/siameseNet-batchn

In [15]:
img = Image.open('/content/drive/MyDrive/Task/Egg/oneshot_data/Vi/531.jpg')
img = TF.resize(img,(105,105))
img = TF.to_tensor(img)

In [19]:
img.unsqueeze(0).size()

torch.Size([1, 3, 105, 105])

In [21]:
def Average(lst):
    return sum(lst) / len(lst)

In [34]:
with torch.no_grad():
  siameseBaseLine.eval()
  mainImg = Image.open('/content/drive/MyDrive/Task/Egg/1/Screenshot 2023-05-17 161142.png')
  mainImg = mainImg.convert('RGB')      
  mainImg = TF.resize(mainImg,(105,105))
  mainImg = TF.to_tensor(mainImg).to(device).unsqueeze(0)
  root = '/content/drive/MyDrive/Task/Egg/oneshot_data'
  result = {
  }
  for champ in os.listdir(root):
    avr = []
    for name in os.listdir(join(root,champ)):
      img2 = Image.open(join(root,join(champ,name)))
      img2 = TF.resize(img2,(105,105))
      img2 = TF.to_tensor(img2)
      img2 = img2.to(device).unsqueeze(0)
      output = siameseBaseLine(mainImg,img2)
      avr.append(output)
    result[champ] = Average(avr)

max_value = max(result, key=result.get)
print(max_value)



Malphite
