In [3]:
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import torchvision.transforms as transforms
from datasets import load_dataset
import torchvision.models as models
import torch.nn as nn
import torch.optim as optim
import math

# Custom Dataset Class for COCO Data
class COCODataset(Dataset):
    def __init__(self, dataset, transform=None, max_objects=10):
        self.dataset = dataset
        self.transform = transform
        self.max_objects = max_objects

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        record = self.dataset[idx]
        image = record['image'].convert("RGB")
        
        if self.transform:
            image = self.transform(image)
        
        bboxes = record['objects']['bbox']
        labels = record['objects']['category']
        
        # Pad bboxes and labels
        bboxes = bboxes + [[0, 0, 0, 0]] * (self.max_objects - len(bboxes))
        labels = labels + [0] * (self.max_objects - len(labels))
        
        return image, torch.tensor(bboxes[:self.max_objects], dtype=torch.float32), torch.tensor(labels[:self.max_objects], dtype=torch.long)

# Define the transformation
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Load the dataset
dataset = load_dataset("detection-datasets/coco", split='train[:1%]')

# Create the dataset and dataloader
coco_dataset = COCODataset(dataset, transform=transform)

def custom_collate_fn(batch):
    images, bboxes, labels = zip(*batch)
    images = torch.stack(images, dim=0)
    bboxes = torch.stack(bboxes, dim=0)
    labels = torch.stack(labels, dim=0)
    return images, bboxes, labels

coco_dataloader = DataLoader(coco_dataset, batch_size=2, shuffle=True, collate_fn=custom_collate_fn)

# Load pre-trained ResNet model
resnet_model = models.resnet50(pretrained=True)
resnet_model.eval()
resnet_model = torch.nn.Sequential(*list(resnet_model.children())[:-1])

def extract_features(dataloader, model, max_objects):
    all_features = []
    all_bboxes = []
    all_labels = []
    
    with torch.no_grad():
        for images, bboxes, labels in dataloader:
            outputs = model(images)
            features = outputs.view(outputs.size(0), -1)  # Flatten the tensor
            features = features.unsqueeze(1).expand(-1, max_objects, -1)  # (batch_size, max_objects, feature_dim)
            all_features.append(features)
            all_bboxes.append(bboxes)
            all_labels.append(labels)
    
    return torch.cat(all_features), torch.cat(all_bboxes), torch.cat(all_labels)

# Extract features from COCO data
features, bboxes, labels = extract_features(coco_dataloader, resnet_model, max_objects=10)

class ResNetFeatureDataset(Dataset):
    def __init__(self, features, bboxes, labels):
        self.features = features
        self.bboxes = bboxes
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        feature = self.features[idx]
        bbox = self.bboxes[idx]
        label = self.labels[idx]
        return feature, bbox, label

# Create the dataset and dataloader
feature_dataset = ResNetFeatureDataset(features, bboxes, labels)
feature_dataloader = DataLoader(feature_dataset, batch_size=2, shuffle=True)

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:x.size(0), :]

class TransformerModel(nn.Module):
    def __init__(self, feature_dim, num_classes, num_heads=8, num_layers=6, max_objects=10):
        super(TransformerModel, self).__init__()
        self.feature_dim = feature_dim
        self.max_objects = max_objects
        self.pos_encoder = PositionalEncoding(feature_dim)
        encoder_layers = nn.TransformerEncoderLayer(d_model=feature_dim, nhead=num_heads)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers)
        self.fc_bbox = nn.Linear(feature_dim, 4)
        self.fc_class = nn.Linear(feature_dim, num_classes)

    def forward(self, x):
        #print(f"Input features shape: {x.shape}")
        x = x.permute(1, 0, 2)  # Change shape to (seq_len, batch_size, feature_dim)
        #print(f"Reshaped and permuted shape: {x.shape}")
        x = self.pos_encoder(x)
        x = self.transformer_encoder(x)
        x = x.permute(1, 0, 2)  # Change shape back to (batch_size, max_objects, feature_dim)
        #print(f"After transformer: {x.shape}")
        bboxes = self.fc_bbox(x)
        classes = self.fc_class(x)
        return bboxes, classes

# Define the model
feature_dim = 2048  # This is the dimension of the ResNet features
num_classes = len(dataset.features['objects'].feature['category'].names)
model = TransformerModel(feature_dim, num_classes)

criterion_bbox = nn.MSELoss()
criterion_class = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

def train_model(model, dataloader, criterion_bbox, criterion_class, optimizer, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        running_loss_bbox = 0.0
        running_loss_class = 0.0
        for i, (features, bboxes, labels) in enumerate(dataloader):
            #print(f"Features shape in batch: {features.shape}")
            optimizer.zero_grad()
            outputs_bboxes, outputs_classes = model(features)
            #print(f"Outputs bboxes shape: {outputs_bboxes.shape}")
            #print(f"Outputs classes shape: {outputs_classes.shape}")
            
            # Flatten and print labels to check values
            flattened_labels = labels.view(-1)
            #print(f"Labels before passing to loss: {flattened_labels}")
            
            # Check if all label values are within the valid range
            if torch.any(flattened_labels >= num_classes):
                print(f"Invalid label found: {flattened_labels[flattened_labels >= num_classes]}")
                continue  # Skip this batch if invalid labels are found
            
            loss_bbox = criterion_bbox(outputs_bboxes, bboxes)
            loss_class = criterion_class(outputs_classes.view(-1, num_classes), flattened_labels)
            loss = loss_bbox + loss_class
            loss.backward()
            optimizer.step()
            running_loss_bbox += loss_bbox.item()
            running_loss_class += loss_class.item()
        print(f"Epoch [{epoch + 1}/{num_epochs}], BBox Loss: {running_loss_bbox / len(dataloader)}, Class Loss: {running_loss_class / len(dataloader)}")

train_model(model, feature_dataloader, criterion_bbox, criterion_class, optimizer)

# Save the trained model
torch.save(model.state_dict(), "transformer_object_detection.pth")

Epoch [1/10], BBox Loss: 33534.72290080654, Class Loss: 3.450539509726139
Epoch [2/10], BBox Loss: 32253.073559944634, Class Loss: 3.385037877592155
Epoch [3/10], BBox Loss: 32157.05880503354, Class Loss: 3.4155356373250787
Epoch [4/10], BBox Loss: 32182.724289954218, Class Loss: 3.402547035651881
Epoch [5/10], BBox Loss: 32175.39709826182, Class Loss: 3.343142723001451
Epoch [6/10], BBox Loss: 32211.49145383039, Class Loss: 3.3637689298218723
Epoch [7/10], BBox Loss: 32110.958244050787, Class Loss: 3.3236998879117494
Epoch [8/10], BBox Loss: 32145.606372111903, Class Loss: 3.267943659165852
Epoch [9/10], BBox Loss: 32149.192091673765, Class Loss: 3.3338012232284937
Epoch [10/10], BBox Loss: 32190.174590076662, Class Loss: 3.2974150052874824


# v2

In [4]:
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import torchvision.transforms as transforms
from datasets import load_dataset
import torchvision.models as models
import torch.nn as nn
import torch.optim as optim
import math

# Custom Dataset Class for COCO Data
class COCODataset(Dataset):
    def __init__(self, dataset, transform=None, max_objects=10):
        self.dataset = dataset
        self.transform = transform
        self.max_objects = max_objects

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        record = self.dataset[idx]
        image = record['image'].convert("RGB")
        
        if self.transform:
            image = self.transform(image)
        
        bboxes = record['objects']['bbox']
        labels = record['objects']['category']
        
        # Pad bboxes and labels
        bboxes = bboxes + [[0, 0, 0, 0]] * (self.max_objects - len(bboxes))
        labels = labels + [0] * (self.max_objects - len(labels))
        
        return image, torch.tensor(bboxes[:self.max_objects], dtype=torch.float32), torch.tensor(labels[:self.max_objects], dtype=torch.long)

# Define the transformation
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Load the dataset
dataset = load_dataset("detection-datasets/coco", split='train[:1%]')

# Create the dataset and dataloader
coco_dataset = COCODataset(dataset, transform=transform)

def custom_collate_fn(batch):
    images, bboxes, labels = zip(*batch)
    images = torch.stack(images, dim=0)
    bboxes = torch.stack(bboxes, dim=0)
    labels = torch.stack(labels, dim=0)
    return images, bboxes, labels

coco_dataloader = DataLoader(coco_dataset, batch_size=2, shuffle=True, collate_fn=custom_collate_fn)

# Load pre-trained ResNet model
resnet_model = models.resnet50(pretrained=True)
resnet_model.eval()
resnet_model = torch.nn.Sequential(*list(resnet_model.children())[:-1])

def extract_features(dataloader, model, max_objects):
    all_features = []
    all_bboxes = []
    all_labels = []
    
    with torch.no_grad():
        for images, bboxes, labels in dataloader:
            outputs = model(images)
            features = outputs.view(outputs.size(0), -1)  # Flatten the tensor
            features = features.unsqueeze(1).expand(-1, max_objects, -1)  # (batch_size, max_objects, feature_dim)
            all_features.append(features)
            all_bboxes.append(bboxes)
            all_labels.append(labels)
    
    return torch.cat(all_features), torch.cat(all_bboxes), torch.cat(all_labels)

# Extract features from COCO data
features, bboxes, labels = extract_features(coco_dataloader, resnet_model, max_objects=10)

class ResNetFeatureDataset(Dataset):
    def __init__(self, features, bboxes, labels):
        self.features = features
        self.bboxes = bboxes
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        feature = self.features[idx]
        bbox = self.bboxes[idx]
        label = self.labels[idx]
        return feature, bbox, label

# Create the dataset and dataloader
feature_dataset = ResNetFeatureDataset(features, bboxes, labels)
feature_dataloader = DataLoader(feature_dataset, batch_size=2, shuffle=True)

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:x.size(0), :]

class MLPBlock(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(MLPBlock, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

class TransformerModel(nn.Module):
    def __init__(self, feature_dim, num_classes, num_heads=8, num_layers=6, max_objects=10):
        super(TransformerModel, self).__init__()
        self.feature_dim = feature_dim
        self.max_objects = max_objects
        self.pos_encoder = PositionalEncoding(feature_dim)
        self.pos_decoder = PositionalEncoding(feature_dim)
        
        encoder_layers = nn.TransformerEncoderLayer(d_model=feature_dim, nhead=num_heads)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers)
        
        decoder_layers = nn.TransformerDecoderLayer(d_model=feature_dim, nhead=num_heads)
        self.transformer_decoder = nn.TransformerDecoder(decoder_layers, num_layers)
        
        hidden_dim = 512  # You can adjust this value as needed
        self.mlp_bbox = MLPBlock(feature_dim, hidden_dim, 4)
        self.mlp_class = MLPBlock(feature_dim, hidden_dim, num_classes)

    def forward(self, src, tgt):
        # Encoder
        src = src.permute(1, 0, 2)  # Change shape to (seq_len, batch_size, feature_dim)
        src = self.pos_encoder(src)
        memory = self.transformer_encoder(src)
        
        # Decoder
        tgt = tgt.permute(1, 0, 2)  # Change shape to (seq_len, batch_size, feature_dim)
        tgt = self.pos_decoder(tgt)
        output = self.transformer_decoder(tgt, memory)
        output = output.permute(1, 0, 2)  # Change shape back to (batch_size, max_objects, feature_dim)
        
        # Apply MLP blocks
        bboxes = self.mlp_bbox(output)
        classes = self.mlp_class(output)
        
        return bboxes, classes

# Define the model
feature_dim = 2048  # This is the dimension of the ResNet features
num_classes = len(dataset.features['objects'].feature['category'].names)
model = TransformerModel(feature_dim, num_classes)

criterion_bbox = nn.MSELoss()
criterion_class = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

def train_model(model, dataloader, criterion_bbox, criterion_class, optimizer, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        running_loss_bbox = 0.0
        running_loss_class = 0.0
        for i, (features, bboxes, labels) in enumerate(dataloader):
            # Prepare target input for the decoder (could be shifted version of features)
            tgt = features
            
            optimizer.zero_grad()
            outputs_bboxes, outputs_classes = model(features, tgt)
            
            # Flatten and print labels to check values
            flattened_labels = labels.view(-1)
            
            # Check if all label values are within the valid range
            if torch.any(flattened_labels >= num_classes):
                print(f"Invalid label found: {flattened_labels[flattened_labels >= num_classes]}")
                continue  # Skip this batch if invalid labels are found
            
            loss_bbox = criterion_bbox(outputs_bboxes, bboxes)
            loss_class = criterion_class(outputs_classes.view(-1, num_classes), flattened_labels)
            loss = loss_bbox + loss_class
            loss.backward()
            optimizer.step()
            running_loss_bbox += loss_bbox.item()
            running_loss_class += loss_class.item()
        print(f"Epoch [{epoch + 1}/{num_epochs}], BBox Loss: {running_loss_bbox / len(dataloader)}, Class Loss: {running_loss_class / len(dataloader)}")

train_model(model, feature_dataloader, criterion_bbox, criterion_class, optimizer)

# Save the trained model
#torch.save(model.state_dict(), "transformer_object_detection.pth")

Epoch [1/10], BBox Loss: 33160.591650473805, Class Loss: 2.3762212528686915


KeyboardInterrupt: 