<a href="https://colab.research.google.com/github/Rumeysakeskin/Custom-Object-Detection-PyTorch/blob/main/object_tracking_on_videos.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# Install and unzip dataset
import zipfile, urllib.request, shutil
url = "https:YOUR_DATASET_LINK/DATASET_FILE.zip" 
file_name = 'DATASET_FILE.zip'

with urllib.request.urlopen(url) as response, open(file_name, 'wb') as out_file:
    shutil.copyfileobj(response, out_file)
    with zipfile.ZipFile(file_name) as zf:
        zf.extractall()

In [3]:
import json
import cv2
import os
from torch.utils.data import Dataset, DataLoader
import torch
import torchvision.transforms as T
import torch.nn as nn
import glob as glob
import numpy as np
from tqdm import tqdm

In [4]:
def create_image_dataset(data_dir, data_path, annotations):

    frame_counter = 0
    ret = True
    
    # Load the video and extract the frames
    capture = cv2.VideoCapture(data_dir)
    
    # Get the total number of frames in the video
    total_frames = int(capture.get(cv2.CAP_PROP_FRAME_COUNT)) #test: 1800 frames

    # Calculate the interval between frames to keep
    # Determine which frames to keep and which to discard based on the interval value
    # interval = int(capture.get(cv2.CAP_PROP_FPS) / FPS) # FPS = 30

    while ret: # Break the loop if the video has ended
        ret, frame = capture.read() # frame shape 640x640x3
        if not ret:
          break
        frame_name = annotations["images"][frame_counter]['file_name']
        cv2.imwrite(f'{data_path}/{frame_name}', frame)
        frame_counter +=1

    print(f"{total_frames} frames saved to {data_path}")

        # plt.imshow(frame[:,:,::-1])  # Plot frames
        # plt.show()        

In [5]:
# Load video dataset and annotation files
def load_annotations(file_path):
  with open(file_path, 'r') as f:
        annotations = json.load(f)
  return annotations
  
annotations = [("test", "/content/challenge/annotations/instances_test.json"),
               ("val", "/content/challenge/annotations/instances_val.json"),
               ("train", "/content/challenge/annotations/instances_train.json")]

videos = [("test", "/content/challenge/images/test/test.mp4"),
               ("val", "/content/challenge/images/val/val.mp4"),
               ("train", "/content/challenge/images/train/train.mp4")]

for annotation in annotations:
    name, path = annotation
    locals()[f"{name}_annotations"] = load_annotations(path)

for video in videos:
    name, path = video
    locals()[f"{name}_data"] = path

In [6]:
# Create image data folders from videos
data_dirs = ["train_data","test_data","val_data"]
if not os.path.exists('./frames'):
    os.mkdir('./frames/')
    for data_dir in data_dirs:
      os.mkdir(f'./frames/{data_dir}/')  

# Create images from video frames
create_image_dataset(train_data, "./frames/train_data", train_annotations) # data, data_path, annotations
create_image_dataset(val_data, "./frames/val_data", val_annotations) 
create_image_dataset(test_data, "./frames/test_data", test_annotations) 

7200 frames saved to ./frames/train_data
1800 frames saved to ./frames/val_data
1800 frames saved to ./frames/test_data


In [7]:
class VideoDataset(Dataset):
    def __init__(self, data_dir, annotations, transform=None):
        self.data_dir = data_dir
        self.annotations = annotations
        self.transform = transform
        
        # get all the image paths in sorted order
        self.image_paths = glob.glob(f"{self.data_dir}/*.jpg")
        self.all_images = [image_path.split('/')[-1] for image_path in self.image_paths]
        self.all_images = sorted(self.all_images)
        
    def __len__(self):
        # return len(self.annotations)
        return len(self.all_images)
        

# loads a video and extracts frames from it
    def __getitem__(self, idx):

        image_name = self.all_images[idx]
        image_path = os.path.join(self.data_dir, image_name)

        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB).astype(np.float32) 
        image /= 255.0
        image = torch.as_tensor(image)
        image = np.transpose(image, (2, 0, 1))
        
        # Get the annotations for each frame
        target = {}

        # [x, y, width, height] --> [x_min, y_min, x_max, y_max]
        x, y, w, h = self.annotations["annotations"][idx]['bbox']

        target["boxes"] = torch.as_tensor([x, y, x + w, y + h], dtype=torch.float32).unsqueeze(0)  # Add a batch dimension
        target["labels"] = torch.as_tensor([self.annotations["annotations"][idx]['category_id']], dtype=torch.int64)  
        target["image_id"] = torch.as_tensor([self.annotations["annotations"][idx]['image_id']])
        target["area"] = torch.as_tensor([self.annotations["annotations"][idx]['area']], dtype=torch.float32)
        target["iscrowd"] = torch.as_tensor([self.annotations["annotations"][idx]['iscrowd']], dtype=torch.int64)
        # target["track_id"] = torch.as_tensor([self.annotations["annotations"][idx]['track_id']])
       
        # Apply the transformations if provided
        # if self.transform is not None:
        #   image = self.transform(image)
        #   target = self.transform(target)
        # print(f"image:{image}, target:{target}")
       
        return image, target

In [14]:
# Define the data transforms to be applied to the video frames
# preprocess the input images in a dataset before feeding them into a neural network for training.
# converts the input image from its original format to a PyTorch tensor.

# the transform pipeline makes it possible to perform operations such as 
# normalization, data augmentation, and other preprocessing steps that can 
# help to improve the performance of a neural network during training.

def get_transform(train):
    transforms = []
    transforms.append(T.ToTensor())
    # transforms.append(T.RandomHorizontalFlip(0.5))
    return T.Compose(transforms)

def collate_fn(batch):
    """
    To handle the data loading as different images may have different number 
    of objects and to handle varying size tensors as well.
    """
    return tuple(zip(*batch))

In [15]:
# Create the dataset object for videos and their corresponding annotations

train_dataset = VideoDataset("./frames/train_data", train_annotations, get_transform(True))
test_dataset = VideoDataset("./frames/test_data", test_annotations, get_transform(False))
val_dataset = VideoDataset("./frames/val_data", val_annotations, get_transform(False))
# Define the dataloader load the data in batches during training and inference
train_dataloader = DataLoader(train_dataset, batch_size=2, shuffle=True, num_workers=0,collate_fn=collate_fn)
val_dataloader = DataLoader(val_dataset, batch_size=2, shuffle=True, num_workers=0, collate_fn=collate_fn)
test_dataloader = DataLoader(test_dataset, batch_size=2, shuffle=False, num_workers=0, collate_fn=collate_fn)

In [16]:
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

def fasterrcnn_model(num_classes):
  # load a model pre-trained on COCO
  model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT")
  # get number of input features for the classifier
  in_features = model.roi_heads.box_predictor.cls_score.in_features
  # replace the pre-trained head with a new one
  model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
  return model

In [17]:
# function for running training iterations

def train(train_data_loader, model):
    global train_itr
    global train_loss_list

    model.train()
    
     # initialize tqdm progress bar
    prog_bar = tqdm(train_data_loader, total=len(train_data_loader))
    
    for i, data in enumerate(prog_bar):
        optimizer.zero_grad()
        images, targets = data
        images = list(image.to(DEVICE) for image in images)
        targets = [{k: v.to(DEVICE) for k, v in t.items()} for t in targets]
        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        loss_value = losses.item()
        train_loss_list.append(loss_value)
        losses.backward()
        optimizer.step()
        train_itr += 1
    
        # update the loss value beside the progress bar for each iteration
        prog_bar.set_description(desc=f"Loss: {loss_value:.4f}")
    return train_loss_list

# function for running validation iterations
def validate(valid_data_loader, model):
    
    global val_itr
    global val_loss_list
    
    # initialize tqdm progress bar
    prog_bar = tqdm(valid_data_loader, total=len(valid_data_loader))
    
    for i, data in enumerate(prog_bar):
        images, targets = data
        
        images = list(image.to(DEVICE) for image in images)
        targets = [{k: v.to(DEVICE) for k, v in t.items()} for t in targets]
        
        with torch.no_grad():
            loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        loss_value = losses.item()
        val_loss_list.append(loss_value)
        val_itr += 1
        # update the loss value beside the progress bar for each iteration
        prog_bar.set_description(desc=f"Loss: {loss_value:.4f}")
    return val_loss_list

In [18]:
NUM_CLASSES = 2
NUM_EPOCHS = 20

DEVICE = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print("DEVICE:", DEVICE)

model = fasterrcnn_model(num_classes=NUM_CLASSES)

model = model.to(DEVICE)

params = [p for p in model.parameters() if p.requires_grad]
# Define criterion, optimizer, and scheduler
criterion = nn.CrossEntropyLoss()  # standard crossentropy loss for classification
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1) # the scheduler divides the lr by 10 every 10 epochs

train_itr = 1
val_itr = 1
train_loss_list = []
val_loss_list = []


for epoch in range(NUM_EPOCHS):
    print(f"\nEPOCH {epoch+1} of {NUM_EPOCHS}")
    # train for one epoch, printing every 10 iterations
    train_loss = train(train_dataloader, model)
    val_loss = validate(val_dataloader, model)

    # update the learning rate
    
    # evaluate on the test dataset
    # evaluate(model, data_loader_test, device=device)

DEVICE: cpu

EPOCH 1 of 20
Training


Loss: 1.1327:   0%|          | 1/3600 [00:49<49:26:15, 49.45s/it]


IndexError: ignored