In [None]:
pip install google-auth google-auth-oauthlib google-auth-httplib2 pydrive


In [None]:
import os
import torch
import numpy as np
import matplotlib.pyplot as plt
import torchvision
from torchvision import transforms
from torch.utils.data import DataLoader
from torchvision.datasets import CocoDetection
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection import FasterRCNN
from torchvision.datasets import ImageFolder
from torch.utils.data import Dataset
from torchvision import transforms as T
from torch.utils.data.dataloader import default_collate
from PIL import Image
import json



In [None]:
TRAIN_IMG_FOLDER_PATH = '/kaggle/input/icpr-vistac-dataset-for-object-detection/Combined-VISTAC-Challenge-Dataset/Combined-VISTAC-Challenge-Dataset/train' #Training image path.
VALID_IMG_FOLDER_PATH = '/kaggle/input/icpr-vistac-dataset-for-object-detection/Combined-VISTAC-Challenge-Dataset/Combined-VISTAC-Challenge-Dataset/validation' # Valid image path for checking.
TRAIN_ANNOTATION_FILE = '/kaggle/input/icpr-vistac-dataset-for-object-detection/train.json'
VAL_ANNOTATION_FILE = '/kaggle/input/icpr-vistac-dataset-for-object-detection/validation.json'
LR = 0.001 #LEARNING RATE
BATCH_SIZE = 8
EPOCHS = 10

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
MODEL_NAME = 'object_detection_using_faster_RCNN'
print(DEVICE)

In [None]:
import json

# Load the JSON files
with open(TRAIN_ANNOTATION_FILE, 'r') as f:
    train_annotations = json.load(f)

with open(VAL_ANNOTATION_FILE, 'r') as f:
    validation_annotations = json.load(f)


In [None]:
# Data transformations
train_augs = T.Compose([
          T.RandomHorizontalFlip(p = 0.5),
          T.RandomRotation(degrees=(-20, +20)),
          T.ToTensor()
          #PIL/numpy array -> torch tensor -> (height, width, channel) -> (channel, height, width)
    ])
valid_augs = T.Compose([
    T.ToTensor()
])

In [None]:
# Load annotations from a JSON file
def load_annotations(file_path):
    with open(file_path, 'r') as f:
        annotations = json.load(f)
    return annotations

class VistaDataset(Dataset):
    def __init__(self, img_folder, annotations, transforms=None):
        self.img_folder = img_folder
        self.annotations = annotations
        self.transforms = transforms
        self.imgs = []
        for video_dir, data in annotations.items():
            for img_name in data['img_names']:
                self.imgs.append((video_dir, img_name))

    def __len__(self):
        return len(self.imgs)

    def __getitem__(self, idx):
        video_dir, img_name = self.imgs[idx]
        img_path = os.path.join(self.img_folder, img_name)

        # Debugging print statements
        #print(f"video_dir: {video_dir}")
        #print(f"img_name: {img_name}")
        #print(f"img_path: {img_path}")

        # Check if the file exists before opening it
        if not os.path.exists(img_path):
            print(f"File not found: {img_path}")
            raise FileNotFoundError(f"File not found: {img_path}")

        img = Image.open(img_path).convert("RGB")

        # Find the corresponding ground truth box
        boxes = []
        labels = []

        for i, name in enumerate(self.annotations[video_dir]['img_names']):
            if name == img_name:
                box = self.annotations[video_dir]['gt_rect'][i]
                boxes.append([box[0], box[1], box[0] + box[2], box[1] + box[3]])
                labels.append(1)  # Assuming label 1 for now; adjust according to class mapping

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)

        target = {}
        target["boxes"] = boxes
        target["labels"] = labels

        if self.transforms:
            img = self.transforms(img)

        return img, target

In [None]:
def collate_fn(batch):
    return tuple(zip(*batch))

In [None]:
# Load annotations
train_annotations = load_annotations(TRAIN_ANNOTATION_FILE)
valid_annotations = load_annotations(VAL_ANNOTATION_FILE)

trainset = VistaDataset(TRAIN_IMG_FOLDER_PATH, train_annotations, transforms=train_augs)
validset = VistaDataset(VALID_IMG_FOLDER_PATH, valid_annotations, transforms=valid_augs)

print(f"Total no. of examples in trainset : {len(trainset)}")
print(f"Total no. of examples in validset : {len(validset)}")

In [None]:
trainloader = DataLoader(trainset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
validloader = DataLoader(validset, batch_size=BATCH_SIZE, collate_fn=collate_fn)

print(f"Total no. of batches in trainloader : {len(trainloader)}")
print(f"Total no. of batches in validloader : {len(validloader)}")

In [None]:
# Define the model
def get_instance_segmentation_model(num_classes):
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    return model
# Initialize the model and load it to the device
model = get_instance_segmentation_model(num_classes=71)
model.to(DEVICE)

In [None]:
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=LR, momentum=0.9, weight_decay=0.0005)

In [None]:
# Define the function to train the model

def train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq):
    model.train()
    running_loss = 0.0
    for i, (images, targets) in enumerate(data_loader):
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        running_loss += losses.item()
        if i % print_freq == 0:
            print(f"Epoch [{epoch}/{EPOCHS}], Step [{i}/{len(data_loader)}], Loss: {losses.item():.4f}")

    print(f"Epoch [{epoch}] Loss: {running_loss/len(data_loader):.4f}")


In [None]:
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def evaluate(model, data_loader, device):
    model.eval()
    running_loss = 0.0
    num_batches = 0

    with torch.no_grad():
        for images, targets in data_loader:
            try:
                images = list(image.to(device) for image in images)
                targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

                loss_dict = model(images, targets)
                losses = sum(loss for loss in loss_dict.values())
                running_loss += losses.item()
                num_batches += 1
            except Exception as e:
                logger.error(f"Error during evaluation: {e}")
                continue

    if num_batches > 0:
        avg_loss = running_loss / num_batches
        print(f"Validation Loss: {avg_loss:.4f}")
    else:
        print("No loss computed during evaluation.")

    return avg_loss if num_batches > 0 else None


In [None]:
import matplotlib.patches as patches

def visualize_results(image, results):
    fig, ax = plt.subplots(figsize=(12, 12))
    ax.imshow(image)

    for box, label in zip(results['boxes'].detach().numpy(), results['labels']):
        x1, y1, x2, y2 = box
        rect = patches.Rectangle((x1, y1), x2 - x1, y2 - y1, linewidth=2, edgecolor='green', facecolor="none")
        ax.add_patch(rect)
        plt.text(x1, y1, label, bbox=dict(facecolor='white', alpha=0.5), fontsize=12)

    plt.show()

In [None]:
for epoch in range(EPOCHS):
    train_one_epoch(model, optimizer, trainloader, device=DEVICE, epoch=epoch, print_freq=50)
    eval_loss = evaluate(model, validloader, device=DEVICE)
    
    # Save the model after each epoch
    model_save_path = f'/kaggle/working/faster_rcnn_model_epoch_{epoch}.pth'
    torch.save(model.state_dict(), model_save_path)
    
    if eval_loss is None:
        print(f"No valid evaluation loss for epoch {epoch}, please check the validation process.")
    else:
        print(f"Epoch [{epoch}] Evaluation Loss: {eval_loss:.4f}")


In [None]:
#Load the Model
import torch
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

def get_instance_segmentation_model(num_classes):
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    return model

def load_model(model_path, num_classes):
    model = get_instance_segmentation_model(num_classes=num_classes)
    model.load_state_dict(torch.load(model_path))
    model.to(DEVICE)
    model.eval()  # Set the model to evaluation mode
    return model

model_path = '/kaggle/working/faster_rcnn_model.pth'
loaded_model = load_model(model_path, num_classes=71)


In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as patches

def visualize_results(image, predictions):
    fig, ax = plt.subplots(figsize=(12, 12))
    ax.imshow(image)

    for box, label, score in zip(predictions['boxes'].cpu().numpy(), predictions['labels'].cpu().numpy(), predictions['scores'].cpu().numpy()):
        if score > 0.5:  # Adjust the threshold as needed
            x1, y1, x2, y2 = box
            rect = patches.Rectangle((x1, y1), x2 - x1, y2 - y1, linewidth=2, edgecolor='green', facecolor="none")
            ax.add_patch(rect)
            plt.text(x1, y1, f"{label}: {score:.2f}", bbox=dict(facecolor='white', alpha=0.5), fontsize=12)

    plt.show()

# Run inference and visualize results for a few samples
for images, targets in validloader:
    images = list(image.to(DEVICE) for image in images)
    
    with torch.no_grad():
        predictions = loaded_model(images)

    for i, image in enumerate(images):
        image_np = image.permute(1, 2, 0).cpu().numpy()  # Convert tensor to numpy array for plotting
        visualize_results(image_np, predictions[i])

    break  # Remove this to run on the entire validation set
