## Connecting Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## Creating virtual environment

In [None]:
!wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
!chmod +x Miniconda3-latest-Linux-x86_64.sh
!bash ./Miniconda3-latest-Linux-x86_64.sh -b -f -p /usr/local

import sys
sys.path.append('/usr/local/lib/python3.9/site-packages')

In [None]:
!conda create -y -n py36 python=3.6.9

!source activate py36

In [None]:
!conda install -y -n py36 -c pytorch pytorch=1.10.0 torchvision=0.11.0

# !conda run -n py36 pip install pycocotools

In [None]:
!conda run -n py36 python --version

!conda run -n py36 pip list

## Installs and Imports

In [None]:
# !pip install torchmetrics --quiet

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
from torchvision.models.detection import ssdlite320_mobilenet_v3_large
from torch.utils.data import DataLoader, Dataset
from pycocotools.coco import COCO
from PIL import Image
import matplotlib.pyplot as plt
import os
from tqdm import tqdm
import numpy as np
# from torchmetrics.detection.mean_ap import MeanAveragePrecision   # torch metric will not work with older pytorch versions

## Creating dataset

In [None]:
class DroneDataset(Dataset):
    def __init__(self, root, transforms=None):
        self.root = root
        self.transforms = transforms
        self.imgs = [img for img in os.listdir(root) if img.endswith(".JPEG")]

    def __getitem__(self, index):
        img_name = self.imgs[index]
        img_path = os.path.join(self.root, img_name)
        txt_path = os.path.join(self.root, os.path.splitext(img_name)[0] + ".txt")

        img = Image.open(img_path).convert('RGB')
        width, height = img.size


        with open(txt_path, 'r') as f:
            line = f.readline().strip()
            category_id, x_center, y_center, box_width, box_height = map(float, line.split())

            xmin = x_center - (box_width / 2)
            ymin = y_center - (box_height / 2)

            xmin *= width
            ymin *= height
            xmax = xmin + box_width * width
            ymax = ymin + box_height * height

            boxes = [[xmin, ymin, xmax, ymax]]
            labels = [int(category_id)]

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)

        target = {}
        target['boxes'] = boxes
        target['labels'] = labels

        if self.transforms is not None:
            img = self.transforms(img)

        return img, target

    def __len__(self):
        return len(self.imgs)

In [None]:
transform = transforms.Compose([
    transforms.ToTensor(),
    # transforms.Resize([320, 320])
])

train_dataset = DroneDataset(root='/content/drive/MyDrive/drone/train', transforms=transform)
val_dataset = DroneDataset(root='/content/drive/MyDrive/drone/val', transforms=transform)

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=lambda x: tuple(zip(*x)), drop_last=True)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=True, collate_fn=lambda x: tuple(zip(*x)), drop_last=True)

## Initializing model

In [None]:
# model = ssdlite320_mobilenet_v3_large(pretrained=False, num_classes=2)
model = ssdlite320_mobilenet_v3_large(pretrained=True)
model.head.classification_head.num_classes = 2
print(model)

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.0002)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# metric = MeanAveragePrecision()

## Train Loop

In [None]:
def train(model, optimizer, data_loaders, device, num_epochs):

  loss_hist_train = [0] * num_epochs

  for epoch in range(num_epochs):

    print(f"Epoch {epoch}")

    model.train()

    for images, targets in tqdm(data_loaders['train']):
      images = list(image.to(device) for image in images)
      targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

      loss_dict = model(images, targets)
      losses = sum(loss for loss in loss_dict.values())

      loss_hist_train[epoch] += losses.item() * len(targets)

      losses.backward()
      optimizer.step()
      optimizer.zero_grad()

    epoch_loss = loss_hist_train[epoch] / len(data_loaders['train'].dataset)
    print(f"Train Loss: {epoch_loss}")

    # model.eval()

    # for images, targets in tqdm(data_loaders['val']):
    #   images = list(image.to(device) for image in images)
    #   targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

    #   outputs = model(images)

    #   for target, output in zip(targets, outputs):
    #     metric.update([{'boxes': output['boxes'], 'scores': output['scores'], 'labels': output['labels']}], [target])

    # mAP = metric.compute()
    # print(f"Mean Average Precision: {mAP['map']}")
    # metric.reset()

## Evaluating model performance

In [None]:
# model.eval()

# for images, targets in tqdm(val_loader):
#   images = list(image.to(device) for image in images)
#   targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

#   outputs = model(images)

#   for target, output in zip(targets, outputs):
#     # predicted_box = torch.argmax(output['scores'])
#     # p_labels = output['labels'][predicted_box]
#     # p_boxes = output['boxes'][predicted_box].unsqueeze(0)
#     # p_scores = output['scores'][predicted_box]
#     # new_output = [{'boxes': p_boxes, 'scores': p_scores, 'labels': p_labels}]
#     # metric.update(new_output, [target])

#     p_boxes = output['boxes']
#     p_labels = output['labels']
#     p_scores = output['scores']
#     new_output = [{'boxes': p_boxes, 'scores': p_scores, 'labels': p_labels}]
#     metric.update(new_output, [target])

# mAP = metric.compute()
# print(f"Mean Average Precision: {mAP['map']}")
# metric.reset()

In [None]:
num_epochs = 10

data_loaders = {'train': train_loader, 'val': val_loader}

train(model, optimizer, data_loaders, device, num_epochs)

In [None]:
# lr_scheduler.step()

# torch.save(model, f'model_epoch_{epoch}.pth')
# print(f"Model saved at epoch {epoch}")

torch.save(model, '/content/drive/MyDrive/drone_detection_ssd_model_updated.pth')
# print("Model training complete and saved.")

# Checking on validation set

In [None]:
model = torch.load('/content/drive/MyDrive/drone_detection_ssd_model_updated.pth')

In [None]:
import torch
from torchvision.utils import draw_bounding_boxes
import matplotlib.pyplot as plt
from tqdm import tqdm
from torchvision import transforms

model.eval()

for images, targets in tqdm(val_loader):

    images = list(image.to(device) for image in images)
    targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

    for image, target in zip(images, targets):

      p_t = model(image.unsqueeze(0))
      predicted_box = torch.argmax(p_t[0]['scores'])

      p_labels = p_t[0]['labels'][predicted_box]
      p_boxes = p_t[0]['boxes'][predicted_box].unsqueeze(0)

      t_boxes = target['boxes']

      img_dtype_converter = transforms.ConvertImageDtype(torch.uint8)
      img = img_dtype_converter(image)

      annotated_image = draw_bounding_boxes(img, t_boxes, [str(p_labels.item())], colors="yellow", width=2)
      annotated_image = draw_bounding_boxes(annotated_image, p_boxes, [str(p_labels.item())], colors="red", width=2)

      fig, ax = plt.subplots()
      ax.imshow(annotated_image.permute(1, 2, 0).numpy())
      ax.set(xticklabels=[], yticklabels=[], xticks=[], yticks=[])
      fig.tight_layout()
      plt.show()
    break


# End

In [None]:
import torch
from torchvision.utils import draw_bounding_boxes
import matplotlib.pyplot as plt
from tqdm import tqdm
from torchvision import transforms

model.eval()

for images, targets in tqdm(val_loader):
    images = list(image.to(device) for image in images)
    targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
    idx = 0

    p_t = model(images)
    confidence_length = len(torch.where(p_t[idx]['scores'] > 0.5)[0])
    predicted_box = torch.argmax(p_t[idx]['scores'])

    p_labels = p_t[idx]['labels'][predicted_box]
    p_boxes = p_t[idx]['boxes'][predicted_box].unsqueeze(0)

    print(len(targets))
    print(images[0].shape)

    t_boxes = targets[0]['boxes']  # Assuming single target for the example
    # x_min, y_min, x_max, y_max = t_boxes
    # x_min, x_max = x_min * images[0].shape[2], x_max * images[0].shape[2]
    # y_min, y_max = y_min * images[0].shape[1], y_max * images[0].shape[1]
    # t_boxes = torch.tensor([[x_min, y_min, x_max, y_max]])
    print(t_boxes)

    img_dtype_converter = transforms.ConvertImageDtype(torch.uint8)
    img = img_dtype_converter(images[idx])

    annotated_image = draw_bounding_boxes(img, t_boxes, [str(p_labels.item())], colors="yellow", width=2)
    fig, ax = plt.subplots()
    ax.imshow(annotated_image.permute(1, 2, 0).numpy())
    ax.set(xticklabels=[], yticklabels=[], xticks=[], yticks=[])
    fig.tight_layout()
    plt.show()
    break


In [None]:
from torchvision.utils import draw_bounding_boxes

model.eval()

for images, targets in tqdm(val_loader):
  images = list(image.to(device) for image in images)
  targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
  idx = 0

  p_t = model(images)
  confidence_length = len(torch.where(p_t[idx]['scores'] > 0.5)[0])
  predicted_box = torch.argmax(p_t[idx]['scores'])

  p_labels = p_t[idx]['labels'][predicted_box]
  p_boxes = p_t[idx]['boxes'][predicted_box].unsqueeze(0)

  print(images[0].shape)

  x_min, y_min, x_max, y_max = targets[0]['boxes'][0]
  x_min, x_max = x_min * images[0].shape[2], x_max * images[0].shape[2]
  y_min, y_max = y_min * images[0].shape[1], y_max * images[0].shape[1]
  t_boxes = torch.tensor([[x_min - 20, y_min-20, x_max-20, y_max-20]])


  img_dtype_converter = transforms.ConvertImageDtype(torch.uint8)
  img = img_dtype_converter(images[idx])

  annotated_image = draw_bounding_boxes(img, t_boxes, [str(p_labels.item())], colors="yellow", width=2)
  annotated_image = draw_bounding_boxes(annotated_image, p_boxes, [str(p_labels.item())], colors="red", width=2)
  fig, ax = plt.subplots()
  ax.imshow(annotated_image.permute(1, 2, 0).numpy())
  ax.set(xticklabels=[], yticklabels=[], xticks=[], yticks=[])
  fig.tight_layout()
  fig.show()
  break