<h1>Install required libraries</h1>

In [5]:
!pip install albumentations pytorch-metric-learning --quiet

<h1>Library</h1>

In [27]:
import time
import os
import cv2
import torch
import random
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from torch.optim import Adam, lr_scheduler
from torch.optim.lr_scheduler import CosineAnnealingLR, ReduceLROnPlateau
from torch.cuda.amp import GradScaler, autocast
from torchvision import models
from PIL import Image
from tqdm import tqdm
import matplotlib.pyplot as plt
import warnings
import albumentations as A
from albumentations.pytorch import ToTensorV2
from sklearn.neighbors import KNeighborsClassifier

warnings.filterwarnings("ignore", category=UserWarning)

<h1>Device</h1>

In [7]:
# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

if device.type == 'cuda':
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.2f} GB")

Using device: cuda
GPU: Tesla T4
Memory: 14.74 GB


<h1>Hyperparameters </h1>

In [8]:
# Hyperparameters 
train_dir = "/kaggle/input/data-augmented-model-2/augmented_data_model_2" 
test_dir = "/kaggle/input/logo-verify-test/logo_verify_test"   
INPUT_SIZE = 224
BATCH_SIZE = 64
EMBEDDING_DIM = 256
MARGIN = 1.0  # Margin for contrastive loss
LR = 3e-4
WEIGHT_DECAY = 1e-4
NUM_EPOCHS = 50
ACCUM_STEPS = 2
KEY_IMAGE = "000000"  # Reference image identifier

print(f"Training directory: {train_dir}")
print(f"Test directory: {test_dir}")

Training directory: /kaggle/input/data-augmented-model-2/augmented_data_model_2
Test directory: /kaggle/input/logo-verify-test/logo_verify_test


<h1>Show check data and Visualization</h1>

In [9]:
# # Kiểm tra cấu trúc thư mục
# def analyze_dataset(directory):
#     class_names = sorted([d for d in os.listdir(directory) if os.path.isdir(os.path.join(directory, d))])
#     num_classes = len(class_names)
    
#     class_stats = {}
#     total_images = 0
#     min_images = float('inf')
#     max_images = 0
    
#     for cls in class_names:
#         cls_dir = os.path.join(directory, cls)
#         num_images = len([f for f in os.listdir(cls_dir) if f.lower().endswith(('.jpg','.jpeg','.png'))])
#         class_stats[cls] = num_images
#         total_images += num_images
#         min_images = min(min_images, num_images)
#         max_images = max(max_images, num_images)
    
#     print(f"Dataset Analysis: {directory}")
#     print(f"Number of classes: {num_classes}")
#     print(f"Total images: {total_images}")
#     print(f"Min images per class: {min_images}")
#     print(f"Max images per class: {max_images}")
#     print(f"Average images per class: {total_images/num_classes:.2f}")
    
#     # Vẽ biểu đồ phân bố
#     plt.figure(figsize=(12, 6))
#     plt.bar(class_stats.keys(), class_stats.values())
#     plt.xticks(rotation=90)
#     plt.title("Class Distribution")
#     plt.ylabel("Number of Images")
#     plt.tight_layout()
#     plt.show()
    
#     return class_stats

# train_stats = analyze_dataset(train_dir)
# test_stats = analyze_dataset(test_dir)

<h1>Transform</h1>

In [10]:
def get_transforms():
    train_transform = A.Compose([
        A.Resize(INPUT_SIZE, INPUT_SIZE),
        A.HorizontalFlip(p=0.5),
        A.RandomBrightnessContrast(p=0.3),
        A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.1, rotate_limit=15, p=0.5),
        A.OneOf([
            A.GaussNoise(var_limit=(10.0, 50.0)),
            A.GaussianBlur(blur_limit=(3, 5)),
        ], p=0.3),
        A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ToTensorV2()
    ])
    
    test_transform = A.Compose([
        A.Resize(INPUT_SIZE, INPUT_SIZE),
        A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ToTensorV2()
    ])
    
    return train_transform, test_transform

train_transform, test_transform = get_transforms()

<h2>Check data Augment</h2>

In [11]:
# # Load một ảnh mẫu để kiểm tra transform
# sample_img_path = next(iter(train_stats.keys())) + '/' + os.listdir(os.path.join(train_dir, next(iter(train_stats.keys()))))[0]
# sample_img = cv2.imread(os.path.join(train_dir, sample_img_path))
# sample_img = cv2.cvtColor(sample_img, cv2.COLOR_BGR2RGB)

# # Áp dụng transform
# transformed = train_transform(image=sample_img)["image"]

# # Hiển thị kết quả
# plt.figure(figsize=(12, 6))
# plt.subplot(1, 2, 1)
# plt.imshow(sample_img)
# plt.title("Original Image")
# plt.axis('off')

# plt.subplot(1, 2, 2)
# plt.imshow(unnormalize(transformed))
# plt.title("Augmented Image")
# plt.axis('off')
# plt.show()

<h1>DataLoader</h1>

In [12]:
from collections import defaultdict
import os
import numpy as np
import matplotlib.pyplot as plt
import random
import time
import cv2
from PIL import Image
import torch
from torch.utils.data import Dataset

class ImageDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.samples = []
        self.class_to_idx = {}
        self.idx_to_class = {}
        self.label_to_indices = defaultdict(list)
        self._build_dataset()
        
    def _build_dataset(self):
        # Find classes
        class_names = sorted([d for d in os.listdir(self.root_dir) 
                             if os.path.isdir(os.path.join(self.root_dir, d))])
        
        # Create class mapping
        self.class_to_idx = {cls_name: idx for idx, cls_name in enumerate(class_names)}
        self.idx_to_class = {idx: cls_name for cls_name, idx in self.class_to_idx.items()}
        
        # Build samples list
        for cls_name in class_names:
            cls_dir = os.path.join(self.root_dir, cls_name)
            for fname in os.listdir(cls_dir):
                if fname.lower().endswith(('.jpg','.jpeg','.png')):
                    img_path = os.path.join(cls_dir, fname)
                    label = self.class_to_idx[cls_name]
                    self.samples.append((img_path, label))
                    self.label_to_indices[label].append(len(self.samples)-1)
        
        print(f"Loaded {len(self.samples)} images from {len(class_names)} classes")
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        img_path, label = self.samples[idx]
        
        try:
            img = cv2.imread(img_path)
            if img is None:
                raise Exception("OpenCV failed to load image")
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        except:
            img = np.array(Image.open(img_path).convert('RGB'))
        
        if self.transform:
            img = self.transform(image=img)["image"]
            
        return img, label, img_path

<h1>BACBONE</h1>

<h2>RESNET 50</h2>

In [13]:
import torch
import torch.nn as nn
from torchvision import models

class Network(nn.Module):
    def __init__(self, emb_dim=128):
        super(Network, self).__init__()

        base_model = models.resnet50(pretrained=True)

        # Bỏ layer cuối cùng (fc)
        self.backbone = nn.Sequential(*list(base_model.children())[:-1])  # Output: [B, 2048, 1, 1]

        # FC Head
        self.fc = nn.Sequential(
            nn.Linear(2048, 512),
            nn.PReLU(),
            nn.Linear(512, emb_dim)
        )

    def forward(self, x):
        x = self.backbone(x)           # [B, 2048, 1, 1]
        x = torch.flatten(x, 1)        # [B, 2048]
        x = self.fc(x)                 # [B, emb_dim]
        return x

<h2>RESNET 18</h2>

In [17]:
# class Network(nn.Module):
#     def __init__(self, emb_dim=256):
#         super(Network, self).__init__()
#         base_model = models.resnet18(pretrained=True)
        
#         # Remove final classification layer
#         self.backbone = nn.Sequential(*list(base_model.children())[:-1])
        
#         # Embedding head
#         self.fc = nn.Sequential(
#             nn.Linear(512, 512),
#             nn.BatchNorm1d(512),
#             nn.ReLU(),
#             nn.Linear(512, emb_dim)
#         )
        
#     def forward(self, x):
#         x = self.backbone(x)
#         x = torch.flatten(x, 1)
#         x = self.fc(x)
#         return x


<h1>Contrastive Loss</h1>

In [18]:
class ContrastiveLoss(nn.Module):
    def __init__(self, margin=1.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin
        
    def forward(self, output1, output2, label):
        # Euclidean distance
        euclidean_distance = F.pairwise_distance(output1, output2)
        
        # Contrastive loss
        loss_contrastive = torch.mean(
            (1 - label) * torch.pow(euclidean_distance, 2) +
            label * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2)
        )
        
        return loss_contrastive

<h2>Check Model</h2>

In [20]:
model = Network(EMBEDDING_DIM).to(device)
dummy_input = torch.randn(32, 3, INPUT_SIZE, INPUT_SIZE).to(device)
output = model(dummy_input)
print(f"Model output shape: {output.shape}")

Model output shape: torch.Size([32, 256])


<h1>Test Function</h1>

In [21]:
def evaluate_one_shot(test_dir, model, transform, device, key=KEY_IMAGE):
    model.eval()
    reference_embeddings = {}
    test_results = []
    
    # Process all classes
    for class_name in sorted(os.listdir(test_dir)):
        class_path = os.path.join(test_dir, class_name)
        if not os.path.isdir(class_path):
            continue
            
        # Find reference image
        ref_image = None
        for fname in os.listdir(class_path):
            if key in fname:
                ref_image = os.path.join(class_path, fname)
                break
                
        if ref_image is None:
            print(f"Warning: No reference image found in {class_name}")
            continue
            
        # Load and process reference image
        try:
            img = cv2.imread(ref_image)
            if img is None:
                img = np.array(Image.open(ref_image).convert('RGB'))
            else:
                img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        except Exception as e:
            print(f"Error loading reference image {ref_image}: {e}")
            continue
            
        with torch.no_grad():
            img_tensor = transform(image=img)["image"].unsqueeze(0).to(device)
            embedding = model(img_tensor).cpu().numpy().squeeze()
            reference_embeddings[class_name] = embedding
    
    # Process test images
    correct = 0
    total = 0
    
    for class_name in sorted(os.listdir(test_dir)):
        class_path = os.path.join(test_dir, class_name)
        if not os.path.isdir(class_path):
            continue
            
        for fname in os.listdir(class_path):
            if key in fname:  # Skip reference images
                continue
                
            img_path = os.path.join(class_path, fname)
            try:
                img = cv2.imread(img_path)
                if img is None:
                    img = np.array(Image.open(img_path).convert('RGB'))
                else:
                    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            except Exception as e:
                print(f"Error loading test image {img_path}: {e}")
                continue
                
            with torch.no_grad():
                img_tensor = transform(image=img)["image"].unsqueeze(0).to(device)
                test_embedding = model(img_tensor).cpu().numpy().squeeze()
            
            # Calculate similarities
            similarities = {}
            for ref_class, ref_emb in reference_embeddings.items():
                sim = cosine_similarity([test_embedding], [ref_emb])[0][0]
                similarities[ref_class] = sim
            
            # Get prediction
            pred_class = max(similarities, key=similarities.get)
            is_correct = (pred_class == class_name)
            
            test_results.append({
                'image_path': img_path,
                'true_class': class_name,
                'pred_class': pred_class,
                'similarity': similarities[pred_class],
                'is_correct': is_correct
            })
            
            if is_correct:
                correct += 1
            total += 1
    
    accuracy = correct / total if total > 0 else 0.0
    print(f"Test Accuracy: {accuracy:.4f} ({correct}/{total})")
    return accuracy, test_results

<h1>Initialize Dataset</h1>

In [22]:
# # 1. Khởi tạo datasets
# raw_ds = OptimizedImageDataset(train_dir)
# pair_ds = FastPairDataset(raw_ds, train_transform, max_pairs=100000)

# # 2. Tạo DataLoader với num_workers cao
# train_loader = DataLoader(
#     pair_ds,
#     batch_size=BATCH_SIZE,
#     shuffle=True,
#     num_workers=4,
#     pin_memory=True,
#     persistent_workers=True
# )

# # Kiểm tra một batch
# batch = next(iter(train_loader))
# imgs1, imgs2, labels = batch
# print(f"Batch shapes: imgs1 {imgs1.shape}, imgs2 {imgs2.shape}, labels {labels.shape}")

# # Hiển thị batch
# plt.figure(figsize=(15, 7))
# for i in range(4):
#     plt.subplot(2, 4, i*2+1)
#     plt.imshow(unnormalize(imgs1[i]))
#     plt.title(f"Label: {labels[i].item()}")
#     plt.axis('off')
    
#     plt.subplot(2, 4, i*2+2)
#     plt.imshow(unnormalize(imgs2[i]))
#     plt.axis('off')
# plt.suptitle("Batch Samples (Left: Anchor, Right: Positive/Negative)")
# plt.show()

<h1>Train Loop</h1>

In [31]:
def train(model, train_loader, criterion, optimizer, scheduler, scaler, num_epochs, test_dir):
    best_acc = 0.0
    history = {'train_loss': [], 'val_acc': []}
    
    for epoch in range(1, num_epochs + 1):
        start_time = time.time()
        model.train()
        epoch_loss = 0.0
        
        # Training phase
        for batch_idx, (images, labels, _) in enumerate(tqdm(train_loader, desc=f"Epoch {epoch}/{num_epochs}")):
            images = images.to(device)
            labels = labels.to(device)
            
            # Generate pairs within batch
            pair_indices = torch.randperm(len(labels))
            pair_images = images[pair_indices]
            pair_labels = labels[pair_indices]
            
            # Create labels for pairs (0 = same class, 1 = different class)
            pair_targets = (labels != pair_labels).float()
            
            optimizer.zero_grad()
            
            with autocast():
                embeddings1 = model(images)
                embeddings2 = model(pair_images)
                loss = criterion(embeddings1, embeddings2, pair_targets) / ACCUM_STEPS
            
            scaler.scale(loss).backward()
            
            # Gradient accumulation
            if (batch_idx + 1) % ACCUM_STEPS == 0 or (batch_idx + 1) == len(train_loader):
                scaler.step(optimizer)
                scaler.update()
                optimizer.zero_grad()
            
            epoch_loss += loss.item() * ACCUM_STEPS
        
        avg_loss = epoch_loss / len(train_loader)
        history['train_loss'].append(avg_loss)
        
        # Evaluation
        if epoch % 1 == 0 or epoch == num_epochs:
            val_acc, _ = evaluate_one_shot(test_dir, model, test_transform, device)
            history['val_acc'].append(val_acc)
            
            # Save best model
            if val_acc > best_acc:
                best_acc = val_acc
                torch.save({
                    'epoch': epoch,
                    'model_state_dict': model.state_dict(),
                    'optimizer_state_dict': optimizer.state_dict(),
                    'loss': avg_loss,
                    'acc': val_acc
                }, 'best_model.pth')
        else:
            val_acc = -1  # Mark as not evaluated
        
        # Update learning rate
        scheduler.step(val_acc if val_acc != -1 else avg_loss)
        
        # Logging
        epoch_time = time.time() - start_time
        lr = optimizer.param_groups[0]['lr']
        acc_str = f"{val_acc:.4f}" if val_acc != -1 else "N/A"
        print(
            f"Epoch {epoch}/{num_epochs} | "
            f"Loss: {avg_loss:.4f} | "
            f"Acc: {acc_str} | "
            f"LR: {lr:.2e} | "
            f"Time: {epoch_time:.1f}s | "
            f"Best Acc: {best_acc:.4f}"
        )
    
    return history

<h1>Main Execution</h1>

In [32]:
import os
import time
import random
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.optim import Adam, lr_scheduler
from torch.cuda.amp import GradScaler, autocast
from torchvision import models
import albumentations as A
from albumentations.pytorch import ToTensorV2
from PIL import Image
import cv2
from tqdm import tqdm
import matplotlib.pyplot as plt
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.metrics import accuracy_score
import warnings
from collections import defaultdict

In [None]:
if __name__ == "__main__":
    # Initialize dataset and loader
    train_dataset = ImageDataset(train_dir, transform=train_transform)
    train_loader = DataLoader(
        train_dataset,
        batch_size=BATCH_SIZE,
        shuffle=True,
        num_workers=4,
        pin_memory=True,
        drop_last=True
    )
    
    # Initialize model, loss, optimizer
    model = Network(emb_dim=EMBEDDING_DIM).to(device)
    criterion = ContrastiveLoss(margin=MARGIN)
    optimizer = Adam(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)
    scheduler = lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='max', factor=0.5, patience=3, verbose=True
    )
    scaler = GradScaler()
    
    # Start training
    history = train(
        model=model,
        train_loader=train_loader,
        criterion=criterion,
        optimizer=optimizer,
        scheduler=scheduler,
        scaler=scaler,
        num_epochs=NUM_EPOCHS,
        test_dir=test_dir
    )
    
    # Final evaluation
    print("\n=== Final Evaluation ===")
    checkpoint = torch.load('best_model.pth')
    model.load_state_dict(checkpoint['model_state_dict'])
    final_acc, test_results = evaluate_one_shot(test_dir, model, test_transform, device)
    print(f"Final Accuracy: {final_acc:.4f}")
    
    # Save test results
    with open('test_results.csv', 'w') as f:
        f.write("image_path,true_class,pred_class,similarity,is_correct\n")
        for res in test_results:
            f.write(f"{res['image_path']},{res['true_class']},{res['pred_class']},{res['similarity']},{res['is_correct']}\n")

Loaded 103302 images from 135 classes


  scaler = GradScaler()
  with autocast():
Epoch 1/50: 100%|██████████| 1614/1614 [04:39<00:00,  5.77it/s]


Test Accuracy: 0.0916 (58/633)
Epoch 1/50 | Loss: 0.5835 | Acc: 0.0916 | LR: 3.00e-04 | Time: 290.9s | Best Acc: 0.0916


  with autocast():
Epoch 2/50: 100%|██████████| 1614/1614 [04:37<00:00,  5.82it/s]


Test Accuracy: 0.1374 (87/633)
Epoch 2/50 | Loss: 0.2620 | Acc: 0.1374 | LR: 3.00e-04 | Time: 288.0s | Best Acc: 0.1374


  with autocast():
Epoch 3/50: 100%|██████████| 1614/1614 [04:37<00:00,  5.81it/s]


Test Accuracy: 0.1122 (71/633)
Epoch 3/50 | Loss: 0.3063 | Acc: 0.1122 | LR: 3.00e-04 | Time: 288.0s | Best Acc: 0.1374


  with autocast():
Epoch 4/50: 100%|██████████| 1614/1614 [04:38<00:00,  5.80it/s]


Test Accuracy: 0.1485 (94/633)
Epoch 4/50 | Loss: 0.0386 | Acc: 0.1485 | LR: 3.00e-04 | Time: 289.0s | Best Acc: 0.1485


  with autocast():
Epoch 5/50: 100%|██████████| 1614/1614 [04:38<00:00,  5.80it/s]


Test Accuracy: 0.1422 (90/633)
Epoch 5/50 | Loss: 0.0369 | Acc: 0.1422 | LR: 3.00e-04 | Time: 288.2s | Best Acc: 0.1485


  with autocast():
Epoch 6/50: 100%|██████████| 1614/1614 [04:38<00:00,  5.80it/s]


Test Accuracy: 0.0995 (63/633)
Epoch 6/50 | Loss: 0.1472 | Acc: 0.0995 | LR: 3.00e-04 | Time: 288.4s | Best Acc: 0.1485


  with autocast():
Epoch 7/50: 100%|██████████| 1614/1614 [04:37<00:00,  5.82it/s]
