In [None]:
# !pip install torch
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from torchvision import datasets, models, transforms
import os
import pandas as pd
from PIL import Image
import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

class LandmarkImageDataset(Dataset):
    def __init__(self, annotations_file, img_dir, filenames_dir, transform=None, file_extension='.jpg'):
        """
        Args:
            annotations_file (string): Path to the CSV file with annotations.
            img_dir (string): Directory with all the images.
            transform (callable, optional): Optional transform to be applied on a sample.
            file_extension (string, optional): Extension of the image files in the directory.
        """
        self.img_labels = pd.read_csv(annotations_file)
        self.img_dir = img_dir
        self.transform = transform
        self.file_extension = file_extension
        # Create a dictionary to map image IDs to labels
        self.id_to_label = {str(row[0]): row[3] for row in self.img_labels.values}
        # List all files in the img_dir
        # self.image_files = [f for f in os.listdir(img_dir) if f.endswith(file_extension)]
        df = pd.read_csv(filenames_dir)
        self.image_files = df['id'].tolist()

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_file = self.image_files[idx]
        img_id = os.path.splitext(img_file)[0]  # Remove extension to get the ID
        label = self.id_to_label[img_id]
        # print(f"label: {label}")
        img_path = os.path.join(self.img_dir, img_file)
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, label

# Define transformations for the training data
train_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

label_csv = "/content/drive/MyDrive/cs444-final-project/project/data/train/selected_train.csv"
training_dir = "/content/drive/MyDrive/cs444-final-project/project/data/selected_train"
filenames = "/content/drive/MyDrive/cs444-final-project/project/data/train_file_names.csv"
# Create an instance of the dataset
train_dataset = LandmarkImageDataset(
    annotations_file=label_csv,
    img_dir=training_dir,
    filenames_dir=filenames,
    transform=train_transform,
    file_extension='.jpg'
)

# Define the DataLoader
train_loader = DataLoader(dataset=train_dataset, batch_size=32, shuffle=True, num_workers=2)



In [None]:
# # Load a pretrained ResNet-50 model
# model = models.resnet50(pretrained=True)

# # Modify the final layer for the number of classes we have
# num_classes = len(train_dataset.classes)
# model.fc = torch.nn.Linear(model.fc.in_features, num_classes)

from torchvision import models
import torch

def initialize_model(num_classes):
    # Load a pretrained ResNet-18 model
    model = models.resnet18(pretrained=True)
    # Modify the final layer to match the number of classes
    num_ftrs = model.fc.in_features
    model.fc = torch.nn.Linear(num_ftrs, num_classes)
    return model

# Determine the number of unique classes
# num_classes = len(set(train_dataset.id_to_label.values()))
num_classes = len(set(train_dataset.id_to_label.values()))

# num_classes=2000
print(f"number of classes {num_classes}")
model = initialize_model(num_classes=num_classes)



In [None]:
# def train_model(model, dataloader, loss_fn, optimizer, device, epochs=10):
#     model.to(device)
#     for epoch in tqdm.tqdm(range(epochs)):
#         model.train()
#         running_loss = 0.0
#         correct_predictions = 0
#         for inputs, labels in dataloader:
#             inputs, labels = inputs.to(device), labels.to(device)

#             optimizer.zero_grad()
#             outputs = model(inputs)
#             loss = loss_fn(outputs, labels)
#             loss.backward()
#             optimizer.step()

#             running_loss += loss.item()
#             _, preds = torch.max(outputs, 1)
#             correct_predictions += torch.sum(preds == labels.data)

#         epoch_loss = running_loss / len(dataloader)
#         epoch_acc = correct_predictions.double() / len(dataloader.dataset)
#         print(f'Epoch {epoch+1}/{epochs}, Loss: {epoch_loss}, Accuracy: {epoch_acc}')

# def evaluate_model(model, dataloader, loss_fn, device):
#     model.eval()
#     running_loss = 0.0
#     correct_predictions = 0
#     with torch.no_grad():
#         for inputs, labels in dataloader:
#             inputs, labels = inputs.to(device), labels.to(device)
#             outputs = model(inputs)
#             loss = loss_fn(outputs, labels)

#             running_loss += loss.item()
#             _, preds = torch.max(outputs, 1)
#             correct_predictions += torch.sum(preds == labels.data)

#     total_loss = running_loss / len(dataloader)
#     accuracy = correct_predictions.double() / len(dataloader.dataset)
#     print(f'Validation Loss: {total_loss}, Accuracy: {accuracy}')

from tqdm import tqdm

def train_model(model, dataloader, loss_fn, optimizer, device, epochs=10):
    model.to(device)
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        correct_predictions = 0
        progress_bar = tqdm(enumerate(dataloader), total=len(dataloader), desc=f"Epoch {epoch+1}/{epochs}")
        for i, (inputs, labels) in progress_bar:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = loss_fn(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, preds = torch.max(outputs, 1)
            correct_predictions += torch.sum(preds == labels.data)

            progress_bar.set_postfix({'loss': loss.item(), 'acc': torch.sum(preds == labels.data).item() / len(labels)})

        epoch_loss = running_loss / len(dataloader)
        epoch_acc = correct_predictions.double() / len(dataloader.dataset)
        print(f'Epoch {epoch+1}/{epochs}, Loss: {epoch_loss}, Accuracy: {epoch_acc}')

def evaluate_model(model, dataloader, loss_fn, device):
    model.eval()
    running_loss = 0.0
    correct_predictions = 0
    with torch.no_grad():
        progress_bar = tqdm(enumerate(dataloader), total=len(dataloader), desc="Evaluating")
        for i, (inputs, labels) in progress_bar:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = loss_fn(outputs, labels)

            running_loss += loss.item()
            _, preds = torch.max(outputs, 1)
            correct_predictions += torch.sum(preds == labels.data)

            progress_bar.set_postfix({'loss': loss.item(), 'acc': torch.sum(preds == labels.data).item() / len(labels)})

    total_loss = running_loss / len(dataloader)
    accuracy = correct_predictions.double() / len(dataloader.dataset)
    print(f'Validation Loss: {total_loss}, Accuracy: {accuracy}')


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
loss_fn = torch.nn.CrossEntropyLoss()

if torch.cuda.is_available():
  print("CUDA USED")
else:
  print("CPU USED")

In [None]:
# Initialize the model, optimizer, and loss function
model = initialize_model(num_classes)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
loss_fn = torch.nn.CrossEntropyLoss()

# Move the model to the appropriate device
model.to(device)

# Train the model
train_model(model, train_loader, loss_fn, optimizer, device, epochs=10)

# Suppose you have a validation loader set up similarly
# evaluate_model(model, validation_loader, loss_fn, device)
