# **Prerequisites**

## Installing dependencies

Please make a copy of this notebook.


In [1]:
# Installing the dependencies
%pip install geopy > delete.txt
%pip install datasets > delete.txt
%pip install torch torchvision datasets > delete.txt
%pip install huggingface_hub > delete.txt

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [2]:
import os

file_path = "delete.txt"

# Check if the file exists
if os.path.exists(file_path):
    os.remove(file_path)  # Remove the file
    print(f"{file_path} has been deleted.")
else:
    print(f"{file_path} does not exist. No action taken.")

delete.txt has been deleted.


## Huggingface login
You will require your personal token.

In [None]:
! huggingface-cli login 

# **Dataset handling**
This section is in charge of
- Donwloading Data from Hugging Face to local runtime
- Custom Dataset Class
-

## Downloading the train and test dataset from HF

In [None]:
from datasets import load_dataset, Image

# Loading the training, validation, and test dataset
dataset_train = load_dataset("cis519/train", split="train")
dataset_val = load_dataset("cis519/train", split="validation")
dataset_test = load_dataset("cis519/train", split="test")

print(len(dataset_train))
print(len(dataset_test))
print(len(dataset_val))

## Custom Dataset Class
- Below section creates a Custom Dataset Class

In [None]:
# Dependencies
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
from transformers import AutoImageProcessor, AutoModelForImageClassification
from huggingface_hub import PyTorchModelHubMixin
from PIL import Image
import os
import numpy as np


class GPSImageDataset(Dataset):
    def __init__(self, hf_dataset, transform=None, lat_mean=None, lat_std=None, lon_mean=None, lon_std=None):
        self.hf_dataset = hf_dataset
        self.transform = transform

        # Compute mean and std from the dataframe if not provided
        self.latitude_mean = lat_mean if lat_mean is not None else np.mean(np.array(self.hf_dataset['Latitude']))
        self.latitude_std = lat_std if lat_std is not None else np.std(np.array(self.hf_dataset['Latitude']))
        self.longitude_mean = lon_mean if lon_mean is not None else np.mean(np.array(self.hf_dataset['Longitude']))
        self.longitude_std = lon_std if lon_std is not None else np.std(np.array(self.hf_dataset['Longitude']))

    def __len__(self):
        return len(self.hf_dataset)

    def __getitem__(self, idx):
        # Extract data
        example = self.hf_dataset[idx]

        # Load and process the image
        image = example['image']
        latitude = example['Latitude']
        longitude = example['Longitude']
        # image = image.rotate(-90, expand=True)
        if self.transform:
            image = self.transform(image)

        # Normalize GPS coordinates
        latitude = (latitude - self.latitude_mean) / self.latitude_std
        longitude = (longitude - self.longitude_mean) / self.longitude_std
        gps_coords = torch.tensor([latitude, longitude], dtype=torch.float32)

        return image, gps_coords

## Creating dataloaders and visualizing the data
- Compute train mean and std
- Create Train dataset and dataloader
- Create Validation dataset and dataloader
- Create Test dataset and dataloader


In [None]:
# Define the Transformation of the Data

transform = transforms.Compose([
    transforms.RandomResizedCrop(224),  # Random crop and resize to 224x224
    transforms.RandomHorizontalFlip(),  # Random horizontal flip
    transforms.RandomRotation(degrees=15),  # Random rotation between -15 and 15 degrees
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),  # Random color jitter
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

# Optionally, you can create a separate transform for inference without augmentations
inference_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

# Create the training dataset and dataloader
train_dataset = GPSImageDataset(hf_dataset=dataset_train, transform=transform)
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# Retrieve normalization parameters from the training dataset
lat_mean = train_dataset.latitude_mean
lat_std = train_dataset.latitude_std
lon_mean = train_dataset.longitude_mean
lon_std = train_dataset.longitude_std

# Create the validation dataset and dataloader using training mean and std
val_dataset = GPSImageDataset(
    hf_dataset=dataset_val,
    transform=inference_transform,
    lat_mean=lat_mean,
    lat_std=lat_std,
    lon_mean=lon_mean,
    lon_std=lon_std
)
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False)

# Create the Test dataset and loader using training mean and std
test_dataset = GPSImageDataset(
    hf_dataset=dataset_test,
    transform=inference_transform,
    lat_mean=lat_mean,
    lat_std=lat_std,
    lon_mean=lon_mean,
    lon_std=lon_std
)

test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
# Verify loading
for images, gps_coords in train_dataloader:
    print(images.size(), gps_coords.size())
    break
for images, gps_coords in val_dataloader:
    print(images.size(), gps_coords.size())
    break
for images, gps_coords in test_dataloader:
    print(images.size(), gps_coords.size())
    break


In [None]:
import matplotlib.pyplot as plt
import torchvision.transforms as transforms
import numpy as np

def denormalize(tensor, mean, std):
    mean = np.array(mean)
    std = np.array(std)
    tensor = tensor.numpy().transpose((1, 2, 0))  # Convert from C x H x W to H x W x C
    tensor = std * tensor + mean  # Denormalize
    tensor = np.clip(tensor, 0, 1)  # Clip to keep pixel values between 0 and 1
    return tensor

data_iter = iter(train_dataloader)
images, gps_coords = next(data_iter)  # Get a batch of images and labels
# Denormalize the first image in the batch for display
itr = 0
for im in images:
  image = denormalize(im, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])

  # Plot the image
  plt.imshow(image)
  plt.title(f'Latitude: {gps_coords[itr][0].item():.4f}, Longitude: {gps_coords[itr][1].item():.4f}')
  plt.axis('off')
  plt.show()
  itr += 1

# **DinoResNetModel Definition and Training Testing Loop**
- The following section defines the Model, Training loop and testing loop.
- The model is a fusion of ResNet-18 and uses Dino ViT for feature extractions.

## Fine Tune Parameters
- @todo Tune the parameters to lower the Loss

In [None]:
# Parameter Configurations
# You can also figure out other parameters to fine-tune
cfg = {
    "train_bs":32,
    "test_bs":32,
    "val_bs":32,
    "num_epochs": 5,
    "lr": 0.001,
}

## DinoResNetModel Definition

In [None]:
# Import all the dependencies
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
from transformers import AutoImageProcessor, AutoModelForImageClassification, AutoModel
from huggingface_hub import PyTorchModelHubMixin
from PIL import Image
import os
import numpy as np


class CustomDinoResNetModel(nn.Module, PyTorchModelHubMixin):
    def __init__(self, resnet_name="microsoft/resnet-50", dino_name="facebook/dino-vits16", num_classes=2):
      super().__init__()

      # Load pre-trained Dino model from Hugging Face
      self.dino = AutoModel.from_pretrained(dino_name)

      # Load pre-trained ResNet model from Hugging Face
      self.resnet = AutoModelForImageClassification.from_pretrained(resnet_name)

      # Access the Linear layer within the Sequential classifier
      self.resnet_features = self.resnet.config.num_labels  # Number of output labels (ResNet logits)

      # Freeze DINO layers (no gradient updates)
      for param in self.dino.parameters():
          param.requires_grad = False

      # Get feature dimensions
      self.dino_features = self.dino.config.hidden_size  # Typically 768 for DINO models
      total_features = self.dino_features + self.resnet_features  # Combine DINO and ResNet features

      # Create the fusion network to combine features
      self.fusion = nn.Sequential(
          nn.Linear(total_features, 1024),  # Match total_features
          nn.ReLU(),
          nn.Dropout(0.3),
          nn.Linear(1024, 512),
          nn.ReLU(),
          nn.Dropout(0.3),
          nn.Linear(512, 256),
          nn.ReLU(),
          nn.Dropout(0.2),
          nn.Linear(256, num_classes)
      )

    
    def forward(self, x):
        '''
        Input size x: [batch_size, num_channels, 224, 224]
        '''

        # Get Dino features
        dino_output = self.dino(x).last_hidden_state[:,0] # shape [batch_size, 768]

        # Get ReNet Features
        resnet_output = self.resnet.resnet(x)
        resnet_features = self.resnet(images).logits

        # Concatenate features
        combined_features = torch.cat((dino_output, resnet_features), dim=1) # Shape: [batch_size, 768 + resnet_features]


        # Pass through the fusion network
        output = self.fusion(combined_features) # shape [batch_size, 2]

        return output


    def save_model(self, save_path):
        """
        Save model locally using the Hugging Face format.
        save_path - Defined saving path
        """
        self.save_pretrained(save_path)

    def push_model(self, repo_name):
        """
        Push the model to the Hugging Face Hub.
        """
        self.push_to_hub(repo_name)

## Training and Validation Loop Definition
- Below section defines the training and validation loop

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from geopy.distance import geodesic
from torch.optim.lr_scheduler import StepLR
import numpy as np
from tqdm import tqdm

# Define the loss function and optimizer
criterion = nn.MSELoss()

# Initialize the model
model = CustomDinoResNetModel(
    resnet_name="microsoft/resnet-50",
    dino_name="facebook/dino-vitb16"
)

# Since DINO is frozen, we only need to train ResNet and fusion network parameters
trainable_params = list(model.resnet.parameters()) + list(model.fusion.parameters())
optimizer = torch.optim.Adam(trainable_params, lr=cfg["lr"])

# Add a learning rate scheduler
scheduler = StepLR(optimizer, step_size=5, gamma=0.1)

# Move the model to the appropriate device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device}')
model = model.to(device)

# Training loop with validation
num_epochs = 5
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for images, gps_coords in tqdm(train_dataloader, desc=f"Epoch {epoch + 1} / {num_epochs}"):
        images, gps_coords = images.to(device), gps_coords.to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass through the fusion model
        outputs = model.forward(images)
        loss = criterion(outputs, gps_coords)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    scheduler.step()
    epoch_loss = running_loss / len(train_dataloader)
    print(f"Epoch [{epoch + 1}/{num_epochs}], Training Loss: {epoch_loss:.4f}")

    # Validation phase
    model.eval()
    val_loss = 0.0
    baseline_loss = 0.0
    total_samples = 0

    with torch.no_grad():
        for images, gps_coords in tqdm(val_dataloader, desc=f"Validating Epoch {epoch + 1}"):
            # Moving all relevant data to GPU
            images = images.to(device)
            gps_coords = gps_coords.to(device)

            batch_size = gps_coords.size(0)
            total_samples += batch_size

            # Model predictions
            outputs = model(images)

            # Denormalize predictions and actual GPS coordinates
            preds_denorm = outputs.cpu().numpy() * np.array([lat_std, lon_std]) + np.array([lat_mean, lon_mean])
            actuals_denorm = gps_coords.cpu().numpy() * np.array([lat_std, lon_std]) + np.array([lat_mean, lon_mean])

            # Compute geodesic distances between predictions and actuals
            for pred, actual in zip(preds_denorm, actuals_denorm):
                distance = geodesic((actual[0], actual[1]), (pred[0], pred[1])).meters
                val_loss += distance ** 2  # Squared distance

            # Baseline predictions (mean coordinates)
            baseline_preds = np.array([lat_mean, lon_mean])

            # Compute geodesic distances between baseline preds and actuals
            for actual in actuals_denorm:
                distance = geodesic((actual[0], actual[1]), (baseline_preds[0], baseline_preds[1])).meters
                baseline_loss += distance ** 2  # Squared distance

    # Compute average losses
    val_loss /= total_samples
    baseline_loss /= total_samples

    # Compute RMSE
    val_rmse = np.sqrt(val_loss)
    baseline_rmse = np.sqrt(baseline_loss)

    print(f"Epoch [{epoch + 1}/{num_epochs}], Validation Loss (meters^2): {val_loss:.2f}, Baseline Loss (meters^2): {baseline_loss:.2f}")
    print(f"Epoch [{epoch + 1}/{num_epochs}], Validation RMSE (meters): {val_rmse:.2f}, Baseline RMSE (meters): {baseline_rmse:.2f}")

print('Training complete')

# Save the trained model
torch.save({
    'epoch': num_epochs,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'scheduler_state_dict': scheduler.state_dict(),
    'val_rmse': val_rmse,
}, 'dino_resnet_gps_regressor.pth')

## Testing the learnt model

In [None]:
from sklearn.metrics import mean_absolute_error, mean_squared_error
from tqdm import tqdm
# Initialize lists to store predictions and actual values
all_preds = [] # Prediction values
all_actuals = [] # Actual Values

model.eval() # set the model to be in evaluation mode

with torch.no_grad():
    for images, gps_coords in tqdm(test_dataloader, desc = f"Testing Progress"):
        images, gps_coords = images.to(device), gps_coords.to(device)

        outputs = model(images)

        # Denormalize predictions and actual values
        preds = outputs.cpu() * torch.tensor([lat_std, lon_std]) + torch.tensor([lat_mean, lon_mean])
        actuals = gps_coords.cpu() * torch.tensor([lat_std, lon_std]) + torch.tensor([lat_mean, lon_mean])

        all_preds.append(preds)
        all_actuals.append(actuals)

# Concatenate all batches
all_preds = torch.cat(all_preds).numpy()
all_actuals = torch.cat(all_actuals).numpy()

# Compute error metrics
mae = mean_absolute_error(all_actuals, all_preds)
rmse = mean_squared_error(all_actuals, all_preds, squared=False)

print(f'Mean Absolute Error: {mae}')
print(f'Root Mean Squared Error: {rmse}')

In [None]:
import numpy as np

# Retrieve normalization parameters from the training dataset
lat_mean = train_dataset.latitude_mean
lat_std = train_dataset.latitude_std
lon_mean = train_dataset.longitude_mean
lon_std = train_dataset.longitude_std

# Denormalize predictions and actual values
all_preds_denorm = all_preds * np.array([lat_std, lon_std]) + np.array([lat_mean, lon_mean])
all_actuals_denorm = all_actuals * np.array([lat_std, lon_std]) + np.array([lat_mean, lon_mean])

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 5))

# Plot actual points
plt.scatter(all_actuals_denorm[:, 1], all_actuals_denorm[:, 0], label='Actual', color='blue', alpha=0.6)

# Plot predicted points
plt.scatter(all_preds_denorm[:, 1], all_preds_denorm[:, 0], label='Predicted', color='red', alpha=0.6)

# Draw lines connecting actual and predicted points
for i in range(len(all_actuals_denorm)):
    plt.plot(
        [all_actuals_denorm[i, 1], all_preds_denorm[i, 1]],
        [all_actuals_denorm[i, 0], all_preds_denorm[i, 0]],
        color='gray', linewidth=0.5
    )

plt.legend()
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.title('Actual vs. Predicted GPS Coordinates with Error Lines')
plt.show()

# 1. *Pushing the Model to the Hugging Face(HF Model)*

Use this code if you loaded model from Hugging Face

In [None]:
model.push_to_hub("cis519/dino_resnet_fusion")

You load this model by running

In [None]:
model = CustomDinoResNetModel.from_pretrained("cis519/dino_resnet_fusion")

# 2. Pushing the model to the Hugging Face (Torchvision Model)

Use this code if you loaded the model from Torchvision or built the model from scratch using PyTorch. If you built your model from scratch, make sure to follow the guidelines described here - https://huggingface.co/docs/hub/en/models-uploading#upload-a-pytorch-model-using-huggingfacehub


In [None]:
path_name = "resnet_gps_regressor_complete.pth"
model_save_path = "/content/resnet_gps_regressor_complete.pth"
torch.save(model, model_save_path)

In [None]:
from huggingface_hub import HfApi, Repository

# Initialize HfApi
api = HfApi()

# modify repo_name if necessary
repo_name = "dino_resnet_fusion"
organization_name = "cis519"
repo_url = api.create_repo(repo_id=f"{organization_name}/{repo_name}", exist_ok=True)

In [None]:
repo_local_dir = "/content/ImageToGPSproject_dino_resnet_fusion"
repo = Repository(local_dir=repo_local_dir, clone_from=repo_url)

In [None]:
os.rename(model_save_path, f"{repo_local_dir}/resnet_gps_regressor_complete.pth")

In [None]:
readme_content = f"""
# Image to GPS Model: DINO-ResNet Fusion

## Training Data Statistics
The following mean and standard deviation values were used to normalize the GPS coordinates:

- **Latitude Mean**: {lat_mean:.6f}
- **Latitude Std**: {lat_std:.6f}
- **Longitude Mean**: {lon_mean:.6f}
- **Longitude Std**: {lon_std:.6f}
"""
readme_path = f"{repo_local_dir}/README.md"
with open(readme_path, "w") as readme_file:
    readme_file.write(readme_content)

In [None]:
!git config --global user.email "danielzhong2000@gmail.com"
!git config --global user.name "DanioZhong"

In [None]:
repo.push_to_hub(commit_message="Add fine-tuned dino-ResNet50 model for Image to GPS project")

You load this model by running

In [None]:
from huggingface_hub import hf_hub_download
import torch

# Specify the repository and the filename of the model you want to load
repo_id = "cis519/dino_resnet_fusion"  # Replace with your repo name
filename = "resnet_gps_regressor_complete.pth"

model_path = hf_hub_download(repo_id=repo_id, filename=filename)

# Load the model using torch
model_test = torch.load(model_path)
model_test.eval()  # Set the model to evaluation mode