# Libraries

In [1]:
import os
import json
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
import torch.nn as nn
import torchvision.models as models
from tqdm import tqdm


In [2]:
torch.cuda.is_available()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


# Dataset

In [3]:
class CocoCountingDataset(Dataset):
    def __init__(self, img_dir, annotation_file, transform=None):
        self.img_dir = img_dir
        self.transform = transform

        # Load the annotations (instances_train2017.json)
        with open(annotation_file, 'r') as f:
            annotations = json.load(f)
        
        self.images = annotations['images']
        self.annotations = annotations['annotations']

        # Create a mapping for object counts
        self.image_to_counts = {}
        for ann in self.annotations:
            image_id = ann['image_id']
            self.image_to_counts[image_id] = self.image_to_counts.get(image_id, 0) + 1

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        # Get image metadata
        image_meta = self.images[idx]
        image_id = image_meta['id']
        image_path = os.path.join(self.img_dir, image_meta['file_name'])

        # Load and transform the image
        image = Image.open(image_path).convert("RGB")
        if self.transform:
            image = self.transform(image)

        # Get total object count
        total_count = self.image_to_counts.get(image_id, 0)

        return image, total_count


In [4]:
# Define paths
train_images_dir = "data/COCO/train2017"
val_images_dir = "data/COCO/val2017"
train_annotations = "data/COCO/annotations/instances_train2017.json"
val_annotations = "data/COCO/annotations/instances_val2017.json"

# Define transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Create datasets and dataloaders
train_dataset = CocoCountingDataset(train_images_dir, train_annotations, transform=transform)
val_dataset = CocoCountingDataset(val_images_dir, val_annotations, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)


In [5]:
# Sanity check

print("train_dataset size:", len(train_dataset))
print("val_dataset size:", len(val_dataset))
print("train_loader size:", len(train_loader))
print("val_loader size:", len(val_loader))

train_dataset size: 118287
val_dataset size: 5000
train_loader size: 3697
val_loader size: 157


# Model

In [6]:
class ObjectCounterModel(nn.Module):
    def __init__(self):
        super(ObjectCounterModel, self).__init__()
        # Use a pre-trained ResNet model
        self.resnet = models.resnet50(pretrained=True)

        # Freeze all layers except for the final fully connected layer
        for param in self.resnet.parameters():
            param.requires_grad = False  # Freeze all parameters

        # Unfreeze the parameters of the last fully connected layer (fc)
        for param in self.resnet.fc.parameters():
            param.requires_grad = True

        # Replace the final layer for regression (for object counting)
        self.resnet.fc = nn.Linear(self.resnet.fc.in_features, 1)

    def forward(self, x):
        return self.resnet(x)

In [7]:
def train_model(model, dataloader, optimizer, criterion, device):
    model.train()
    running_loss = 0.0

    for images, counts in tqdm(dataloader, desc="Training"):
        images = images.to(device)
        counts = counts.to(device, dtype=torch.float32).unsqueeze(1)

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, counts)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    return running_loss / len(dataloader)


In [8]:
def evaluate_model(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    total_absolute_error = 0.0

    with torch.no_grad():
        for images, counts in tqdm(dataloader, desc="Evaluating"):
            images = images.to(device)
            counts = counts.to(device, dtype=torch.float32).unsqueeze(1)

            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, counts)

            running_loss += loss.item()
            total_absolute_error += torch.sum(torch.abs(outputs - counts)).item()

    avg_loss = running_loss / len(dataloader)
    avg_absolute_error = total_absolute_error / len(dataloader.dataset)
    return avg_loss, avg_absolute_error


# Training

In [None]:
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize model, optimizer, and loss function
model = ObjectCounterModel().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()  # Mean Squared Error for regression

# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    print(f"Epoch {epoch+1}/{num_epochs}")

    train_loss = train_model(model, train_loader, optimizer, criterion, device)
    val_loss, val_mae = evaluate_model(model, val_loader, criterion, device)

    print(f"Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val MAE: {val_mae:.4f}")

# Save the trained model
torch.save(model.state_dict(), "object_counter_model.pth")



Epoch 1/10


Training:   0%|          | 0/3697 [00:00<?, ?it/s]

# Testing

In [None]:
def predict(model, image_path, transform, device):
    model.eval()

    # Load and preprocess the image
    image = Image.open(image_path).convert("RGB")
    image = transform(image).unsqueeze(0).to(device)

    # Predict object count
    with torch.no_grad():
        count = model(image)
    return count.item()

# Example usage
test_image_path = "data/test2017/example.jpg"
predicted_count = predict(model, test_image_path, transform, device)
print(f"Predicted object count: {predicted_count:.2f}")
