<a href="https://colab.research.google.com/github/Markmei123/Landmark-recognition-/blob/main/CNN%2BVIT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!curl -o train.csv https://s3.amazonaws.com/google-landmark/metadata/train.csv /content/landmark-recognition-2021

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  501M  100  501M    0     0  13.9M      0  0:00:36  0:00:36 --:--:-- 15.4M
curl: (3) URL using bad/illegal format or missing URL


In [None]:
%cd ..

/content


In [None]:
!ls

google-landmark  sample_data  train.csv


In [None]:
!git clone https://github.com/cvdfoundation/google-landmark.git

Cloning into 'google-landmark'...
remote: Enumerating objects: 109, done.[K
remote: Counting objects: 100% (16/16), done.[K
remote: Compressing objects: 100% (10/10), done.[K
remote: Total 109 (delta 6), reused 10 (delta 6), pack-reused 93 (from 1)[K
Receiving objects: 100% (109/109), 30.66 KiB | 10.22 MiB/s, done.
Resolving deltas: 100% (33/33), done.


In [None]:
%cd google-landmark/

/content/google-landmark


In [None]:
!mkdir train

In [None]:
%cd train

/content/google-landmark/train


In [None]:
!bash ../download-dataset.sh train 5

Downloading images_000.tar and its md5sum...
Downloading images_001.tar and its md5sum...
Downloading images_002.tar and its md5sum...
Downloading images_003.tar and its md5sum...
Downloading images_004.tar and its md5sum...
Downloading images_005.tar and its md5sum...
images_004.tar extracted!
images_000.tar extracted!
images_003.tar extracted!
images_002.tar extracted!
images_005.tar extracted!
images_001.tar extracted!


In [None]:
!pip install einops



Define ViT model

In [None]:
import os
import pandas as pd
from PIL import Image
import torch
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from torch.amp import autocast, GradScaler
from torch.optim.lr_scheduler import ReduceLROnPlateau
from einops import rearrange, repeat
from einops.layers.torch import Rearrange

# Helper function
def pair(t):
    return t if isinstance(t, tuple) else (t, t)

# Pre-trained CNN for Feature Extraction (ResNet50)
class CNNFeatureExtractor(nn.Module):
    def __init__(self):
        super(CNNFeatureExtractor, self).__init__()
        self.cnn = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
        self.cnn = nn.Sequential(*list(self.cnn.children())[:-2])  # Remove fully connected layers
        self.global_avg_pool = nn.AdaptiveAvgPool2d((1, 1))  # Global Average Pooling to reduce dimensionality

    def forward(self, x):
        with torch.no_grad():
            features = self.cnn(x)  # Extract CNN features
            features = self.global_avg_pool(features)  # Apply Global Average Pooling
            features = torch.flatten(features, 1)  # Flatten to (batch_size, 2048)
        return features

# ViT Model
class ViTWithCNN(nn.Module):
    def __init__(self, *, image_size, patch_size, num_classes, dim, depth, heads, mlp_dim, pool='cls', channels=3, dim_head=64, dropout=0., emb_dropout=0.):
        super().__init__()
        image_height, image_width = pair(image_size)
        patch_height, patch_width = pair(patch_size)

        assert image_height % patch_height == 0 and image_width % patch_width == 0, 'Image dimensions must be divisible by the patch size.'

        num_patches = (image_height // patch_height) * (image_width // patch_width)
        patch_dim = channels * patch_height * patch_width

        assert pool in {'cls', 'mean'}, 'pool type must be either cls (cls token) or mean (mean pooling)'

        # ViT components
        self.to_patch_embedding = nn.Sequential(
            Rearrange('b c (h p1) (w p2) -> b (h w) (p1 p2 c)', p1=patch_height, p2=patch_width),
            nn.LayerNorm(patch_dim),
            nn.Linear(patch_dim, dim),
            nn.LayerNorm(dim),
        )

        self.pos_embedding = nn.Parameter(torch.randn(1, num_patches + 1, dim))
        self.cls_token = nn.Parameter(torch.randn(1, 1, dim))
        self.dropout = nn.Dropout(emb_dropout)

        self.transformer = Transformer(dim, depth, heads, dim_head, mlp_dim, dropout)
        self.pool = pool
        self.to_latent = nn.Identity()

        # Pretrained CNN (ResNet) for feature extraction
        self.cnn_feature_extractor = CNNFeatureExtractor()

        # Calculate feature dimensions after CNN and ViT
        self.cnn_output_dim = 2048  # After Global Average Pooling, CNN output is 2048
        self.vit_output_dim = dim  # ViT embedding dimension

        # Concatenate CNN and ViT features
        self.concat_mlp = nn.Linear(self.cnn_output_dim + self.vit_output_dim, dim)

        # Final classification head
        self.mlp_head = nn.Linear(dim, num_classes)

    def forward(self, img):
        # ViT Embedding
        vit_emb = self.to_patch_embedding(img)
        b, n, _ = vit_emb.shape

        cls_tokens = repeat(self.cls_token, '1 1 d -> b 1 d', b=b)
        vit_emb = torch.cat((cls_tokens, vit_emb), dim=1)
        vit_emb += self.pos_embedding[:, :(n + 1)]
        vit_emb = self.dropout(vit_emb)
        vit_emb = self.transformer(vit_emb)

        vit_emb = vit_emb.mean(dim=1) if self.pool == 'mean' else vit_emb[:, 0]

        # CNN Feature Extraction with Global Average Pooling
        cnn_features = self.cnn_feature_extractor(img)

        # Concatenate CNN and ViT features
        combined_features = torch.cat([vit_emb, cnn_features], dim=1)
        combined_features = self.concat_mlp(combined_features)

        # Final classification
        return self.mlp_head(combined_features)

# Custom Dataset Class
class CustomImageDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        self.labels_df = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform
        self.labels_df = self.labels_df[self.labels_df.apply(self._check_image_exists, axis=1)].reset_index(drop=True)
        self.label_to_index = {label: idx for idx, label in enumerate(self.labels_df['landmark_id'].unique())}

    def _check_image_exists(self, row):
        img_id = row.iloc[0]
        img_path = os.path.join(self.root_dir, img_id[0], img_id[1], img_id[2], f"{img_id}.jpg")
        return os.path.exists(img_path)

    def __len__(self):
        return len(self.labels_df)

    def __getitem__(self, idx):
        img_id = self.labels_df.iloc[idx, 0]
        original_label = self.labels_df.iloc[idx, 2]
        label = self.label_to_index[original_label]

        img_path = os.path.join(self.root_dir, img_id[0], img_id[1], img_id[2], f"{img_id}.jpg")
        image = Image.open(img_path).convert('RGB')

        if self.transform:
            image = self.transform(image)

        return image, label

# Data Augmentation and Preprocessing
train_transforms = transforms.Compose([
    transforms.Resize((160, 160)),  # Reduced resolution from 224x224 to 160x160 for faster training # CHANGED
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Load dataset and DataLoader
train_dataset = CustomImageDataset(csv_file='../train.csv', root_dir='./train', transform=train_transforms)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=4, pin_memory=True)  # Increased batch size to 64 for faster training # CHANGED

# Get number of unique labels
used_labels = train_dataset.labels_df['landmark_id']
unique_used_labels = used_labels.unique()
num_unique_used_labels = len(unique_used_labels)

# Initialize model
model = ViTWithCNN(
    image_size=160,  # Adjusted for smaller input image size # CHANGED
    patch_size=16,
    num_classes=num_unique_used_labels,
    dim=192,  # Balanced dimension
    depth=8,  # Transformer depth
    heads=8,  # Transformer heads
    mlp_dim=384,
    dropout=0.1,
    emb_dropout=0.1
)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# AMP for mixed precision
scaler = GradScaler()

# Learning rate scheduler
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3)

# Set up device (GPU or CPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# Training loop with AMP (mixed precision)
def train(model, train_loader, criterion, optimizer, scheduler, epochs=20):
    model.train()
    for epoch in range(epochs):
        print(f"Running Epoch {epoch+1}/{epochs}")
        total_loss = 0
        correct = 0
        for imgs, labels in train_loader:
            imgs, labels = imgs.to(device), labels.to(device).long()

            optimizer.zero_grad()

            # Use mixed precision with autocast (updated with 'cuda')
            with autocast(device_type='cuda'):
                outputs = model(imgs)
                loss = criterion(outputs, labels)

            # Scale loss and backpropagate
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            total_loss += loss.item()
            correct += (outputs.argmax(1) == labels).sum().item()

        # Scheduler step
        scheduler.step(total_loss / len(train_loader))

        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss / len(train_loader)}, Accuracy: {correct / len(train_loader.dataset)}")

# Start training
train(model, train_loader, criterion, optimizer, scheduler)

