# IMPORT REQUIRED LIBRARIES

In [3]:
import torch
from torch.utils.data import Dataset,DataLoader
import torch.nn as nn
from torch.optim import Adam,lr_scheduler
import torch.nn.functional as F
from torchvision.datasets import ImageFolder
from torchvision import transforms
from PIL import Image
import os
import random
import time

# CUSTOMIZE THE DATASET INTO TRIPLETS

In [2]:
import random
import os

class Triplet:
    def __init__(self, train_folder):
        self.train_folder = train_folder
        self.labels = os.listdir(train_folder)
        self.label_to_path = {label: os.path.join(train_folder, label) for label in self.labels}
    
    def get_triplet(self):
        anchor_label = random.choice(self.labels)
        anchor_path = random.choice(os.listdir(self.label_to_path[anchor_label]))
        positive_label = anchor_label
        positive_path = random.choice(os.listdir(self.label_to_path[positive_label]))
        negative_label = random.choice([label for label in self.labels if label != anchor_label])
        negative_path = random.choice(os.listdir(self.label_to_path[negative_label]))
        
        anchor_image = os.path.join(self.label_to_path[anchor_label], anchor_path)
        positive_image = os.path.join(self.label_to_path[positive_label], positive_path)
        negative_image = os.path.join(self.label_to_path[negative_label], negative_path)
        
        # Get label number and name
        anchor_label_num = self.labels.index(anchor_label)
        positive_label_num = self.labels.index(positive_label)
        negative_label_num = self.labels.index(negative_label)
        
        return anchor_image, anchor_label_num, anchor_label, positive_image, positive_label_num, positive_label, negative_image, negative_label_num, negative_label

Triplet("./Datasets/DOCS/train").get_triplet()

('./Datasets/DOCS/train/news_article/augmented_2_905.jpg',
 2,
 'news_article',
 './Datasets/DOCS/train/news_article/augmented_2_916.jpg',
 2,
 'news_article',
 './Datasets/DOCS/train/email/augmented_1_286.jpg',
 0,
 'email')

In [3]:
class Triplet:
    def __init__(self, train_folder):
        self.train_folder = train_folder
        self.labels = os.listdir(train_folder)
        self.label_to_path = {label: os.path.join(train_folder, label) for label in self.labels}
    
    def get_triplet(self):
        anchor_label = random.choice(self.labels)
        anchor_path = random.choice(os.listdir(self.label_to_path[anchor_label]))
        positive_label = anchor_label
        positive_path = random.choice(os.listdir(self.label_to_path[positive_label]))
        negative_label = random.choice([label for label in self.labels if label != anchor_label])
        negative_path = random.choice(os.listdir(self.label_to_path[negative_label]))
        
        anchor_image = os.path.join(self.label_to_path[anchor_label], anchor_path)
        positive_image = os.path.join(self.label_to_path[positive_label], positive_path)
        negative_image = os.path.join(self.label_to_path[negative_label], negative_path)
        
        return anchor_image, positive_image, negative_image



In [4]:
class TripletDataset(Dataset):
    def __init__(self, train_folder, transform=None):
        self.triplet_generator = Triplet(train_folder)
        self.transform = transform

    def __len__(self):
        return 10014

    def __getitem__(self, index):
        anchor_image, positive_image, negative_image = self.triplet_generator.get_triplet()
        anchor = self._load_image(anchor_image)
        positive = self._load_image(positive_image)
        negative = self._load_image(negative_image)
        return anchor, positive, negative

    def _load_image(self, image_path):
        image = Image.open(image_path).convert("RGB")
        if self.transform is not None:
            image = self.transform(image)
        return image

    def get_triplet_names(self, index):
        anchor_image, positive_image, negative_image = self.triplet_generator.get_triplet()
        return anchor_image, positive_image, negative_image

# DATALOADER

In [5]:
transform = transforms.Compose([
    transforms.Grayscale(),  # Convert image to grayscale
    transforms.ToTensor(),
])

In [6]:
bs = 128
train_folder = "./Datasets/DOCS/train"
dataset = TripletDataset(train_folder, transform=transform)
trainloader = DataLoader(dataset, batch_size=bs, shuffle=True)

In [7]:
len(trainloader)

79

# SIMILARITY CHECKING WITH A MODEL

In [8]:
class EmbeddingNet(nn.Module):
    def __init__(self):
        super(EmbeddingNet, self).__init__()
        self.convnet = nn.Sequential(
            nn.Conv2d(1, 32, 5),
            nn.PReLU(),
            nn.MaxPool2d(2, stride=2),
            nn.Conv2d(32, 64, 5),
            nn.PReLU(),
            nn.MaxPool2d(2, stride=2)
        )

        self.fc = nn.Sequential(
            nn.Linear(64 * 53 * 53, 256),
            nn.PReLU(),
            nn.Linear(256, 256),
            nn.PReLU(),
            nn.Linear(256, 2)
        )

    def forward(self, x):
        output = self.convnet(x)
        output = output.view(output.size()[0], -1)
        output = self.fc(output)
        return output

    def get_embedding(self, x):
        return self.forward(x)


In [9]:
emb = EmbeddingNet()

# TRIPLET WRAPPER

In [10]:
class TripletNet(nn.Module):
    def __init__(self, embedding_net):
        super(TripletNet, self).__init__()
        self.embedding_net = embedding_net

    def forward(self, x1, x2=None, x3=None):
        if x2 is None and x3 is None:
            return self.embedding_net(x1)
        return self.embedding_net(x1),self.embedding_net(x2),self.embedding_net(x3)

    def get_embedding(self, x):
        return self.embedding_net(x)

# TRIPLET LOSS

In [11]:
class TripletLoss(nn.Module):
    def __init__(self, margin):
        super(TripletLoss, self).__init__()
        self.margin = margin
    def forward(self, anchor, positive, negative, size_average=True):
        distance_positive = (anchor - positive).pow(2).sum(1)  # .pow(.5)
        distance_negative = (anchor - negative).pow(2).sum(1)  # .pow(.5)
        losses = F.relu(distance_positive - distance_negative + self.margin)
        return losses.mean() if size_average else losses.sum()


In [12]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TripletNet(emb)
model = model.to(device)
margin = 1
lr = 0.0001
n_epochs = 7
optimizer = Adam(model.parameters(), lr=lr)
loss_fn = TripletLoss(margin)

# TRAIN

In [13]:
def fit(model, num_epochs, train_loader,bs):
    for epoch in range(n_epochs):
        start = time.time()
        model.train()
        train_loss = 0.0
        for idx, batch in enumerate(train_loader):
            anchor, positive, negative = batch
            anchor = anchor.to(device)
            positive = positive.to(device)
            negative = negative.to(device)
            optimizer.zero_grad()
            anchor_embedding, positive_embedding, negative_embedding = model(anchor, positive, negative)
            anchor_embedding.requires_grad_(True)
            positive_embedding.requires_grad_(True)
            negative_embedding.requires_grad_(True)
            loss = loss_fn(anchor_embedding, positive_embedding, negative_embedding)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            print(f"({idx + 1}).  LOSS : {loss.item()}  SEEN : {bs * (idx + 1)}/{len(train_loader.dataset)}")

        print(f"Epoch {epoch + 1}/{n_epochs}, Train Loss: {train_loss / len(train_loader):.4f}, TIME: {time.time()-start}")

In [14]:
fit(model,n_epochs,trainloader,bs)

In [15]:
def evaluate_model(model, triplet_test_loader):
    model.eval()
    correct = 0
    total = 0
    start = time.time()
    with torch.no_grad():
        for (anchor, positive, negative) in triplet_test_loader:
            anchor_embedding, positive_embedding, negative_embedding = model(anchor, positive, negative)
            distance_positive = torch.norm(anchor_embedding - positive_embedding, dim=1)
            distance_negative = torch.norm(anchor_embedding - negative_embedding, dim=1)
            correct += torch.sum(distance_positive < distance_negative).item()
            total += anchor.size(0)
    accuracy = correct / total
    print(accuracy,time.time()-start)

In [16]:
test_folder = "./Datasets/DOCS/test"
test_dataset = TripletDataset(test_folder, transform=transform)
testloader = DataLoader(test_dataset, batch_size=128, shuffle=False)

In [17]:
len(testloader)

7

# TESTING ACCURACY

In [31]:
evaluate_model(model,testloader)

0.9925925925925926 28.17751383781433


In [33]:
torch.save(model,"Models/tripletDOC.pt")

In [18]:
model_loaded = torch.load("Models/tripletDOC.pt")

# EVALUATING WITH FAISS

In [19]:
from torchvision.datasets import ImageFolder
from torchvision.transforms import transforms
from torch.utils.data import DataLoader

train_folder = "./Datasets/DOCS/train"
test_folder = "./Datasets/DOCS/test"

# Create the ImageFolder datasets for train and test folders
train_dataset = ImageFolder(train_folder, transform=transform)
test_dataset = ImageFolder(test_folder, transform=transform)

train_dataloader = DataLoader(train_dataset, shuffle=True)
test_dataloader = DataLoader(test_dataset, shuffle=False)


In [20]:
import faiss

In [21]:
embs1 = None
labels1 = []
for idx,i in enumerate(train_dataloader):
    print(idx)
    I, L = i
    labels1.append(L.item())
    emb = model_loaded(I) # Assuming `model_loaded(I)` returns a PyTorch tensor
    emb = emb.detach()
    if embs1 is None:
        embs1 = emb
    else:
        embs1 = torch.cat((embs1, emb), dim=0)

In [24]:
embs2 = None
labels2 = []
for i in test_dataloader:
    I, L = i
    labels2.append(L)
    emb = model_loaded(I)
    if embs2 is None:
        embs2 = emb
    else:
        embs2 = torch.cat((embs2, emb), dim=0)

In [25]:
import faiss

In [26]:
embs = embs1

In [27]:
index1 = faiss.IndexFlatL2(embs.shape[1])  # Assuming embs.shape[1] represents the dimensionality of the embeddings
index1.add(embs)

nlist = 100  # Number of cells/buckets
quantizer = faiss.IndexFlatL2(embs.shape[1])  # Quantizer index (same as IndexFlatL2)
index2 = faiss.IndexIVFFlat(quantizer, embs.shape[1], nlist)
index2.train(embs)
index2.add(embs)

index3 = faiss.IndexHNSWFlat(embs.shape[1], 32)  # M = 32 for the HNSW index
index3.add(embs)

nbits = 8  # Number of bits for the LSH hash
index4 = faiss.IndexLSH(embs.shape[1], nbits)
index4.add(embs)


In [28]:
def evaluatewithfaiss(embs,index):
    TOTAL = len(embs)
    CORRECT = 0
    start = time.time()
    for idx,emb in enumerate(embs):
        label = index.search(emb.detach().reshape(1,-1),1)[1][0][0]
        CORRECT += labels1[label]==labels2[idx]
    return f'{(CORRECT/TOTAL*100).item()}',f'TIME = {time.time()-start} SECONDS'
        

In [29]:
print(f'IndexFlatL2 : {evaluatewithfaiss(embs2,index1)}')
print(f'IndexIVFFlat : {evaluatewithfaiss(embs2,index2)}')
print(f'IndexHNSWFlat : {evaluatewithfaiss(embs2,index3)}')
print(f'IndexLSH : {evaluatewithfaiss(embs2,index4)}')

IndexFlatL2 : ('97.0112075805664', 'TIME = 0.42181873321533203 SECONDS')
IndexIVFFlat : ('96.63760375976562', 'TIME = 0.14630413055419922 SECONDS')
IndexHNSWFlat : ('97.0112075805664', 'TIME = 0.21867966651916504 SECONDS')
IndexLSH : ('66.62515258789062', 'TIME = 0.11479830741882324 SECONDS')


In [41]:
import torch
import torchvision.transforms as transforms
import cv2
from PIL import Image
import numpy as np
imgs = [
    "./Images/0.jpg", "./Images/1.jpg",
    "./Images/2.jpg", "./Images/3.jpg",
    "./Images/4.png", "./Images/5.jpg",
    "./Images/6.png"
]
x = [np.array(Image.open(img)) for img in imgs]  # Open image without conversion
y = [Image.open(img) for img in imgs]  # Open image without conversion

transform = transforms.Compose([
    transforms.Grayscale(),
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])


In [54]:
labelsd = {'0':"AADHAR",'1':'PAN','2':'NEWS ARTICLE','3':'INVOICE'}

In [59]:
th = 0
while True:
    frame = cv2.resize(x[th], (900, 900))
    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)  # Convert RGB to BGR for OpenCV display
    rect_height = 100
    cv2.rectangle(frame, (0, 0), (frame.shape[1], rect_height), (0, 0, 0), -1)
    image_tensor = transform(y[th])
    image_tensor = image_tensor.unsqueeze(0)
    emb = model_loaded(image_tensor)
    label = index2.search(emb.detach().reshape(1,-1),1)[1][0][0]
    value = labelsd[str(labels1[label])]
    text = f"TYPE : {value}"
    text_size, _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 2, 5)
    text_x = 10
    text_y = int(rect_height / 2) + int(text_size[1] / 2)
    cv2.putText(frame, text, (text_x, text_y), cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 255, 0), 5)
    
    cv2.imshow('DOCUMENT IDENTIFICATION', frame)
    k = cv2.waitKey()
    if k == 27:
        break
    elif k == 110 and th < 15:
        if th + 1 == len(imgs):
            th = 0
        else:
            th += 1
            
    elif k == 112 and th > 0:
        if th - 1 != -1:
            th -= 1

cv2.destroyAllWindows()

