# EARTH: **E**nvironmental **A**i **R**ubbish de**T**ection tec**H**nology.

# What the Application can do:
- Segments Garbage inside of Images
   - Could be used in autonomous robot system
- Logs Litter Consumption/Production inside of geographic locations, to organize citywide cleanups
  - Displays a heatmap of litter inside of canadian cities

# Load Drive 

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Download the EfficientDet Library
- Pytorch EfficientDet

In [None]:
%%capture
!wget https://github.com/rwightman/efficientdet-pytorch/archive/refs/heads/master.zip
!unzip ./master.zip
!rm -f ./master.zip

# Download the TACO Dataset:
- Luckily, the TACO dataset is hosted on Kaggle(Oct 2020 - so a bit outdated)
- Pipeline: 
  - YOLOv5 Model
  - Ported to PyTorch Mobile
  - Hosted on Android Studio, for mobile detection(For Future Robotics and Autonomous garbage cleanup robots)
  



In [None]:
%%capture
!kaggle

In [None]:
!cp -f kaggle.json /root/.kaggle/kaggle.json
!chmod 600 /root/.kaggle/kaggle.json

In [None]:
%%capture
!kaggle datasets download -d kneroma/tacotrashdataset -p ../input/tacotrashdataset
!unzip ../input/tacotrashdataset/tacotrashdataset.zip -d ../input/tacotrashdataset/ 

# install EfficientDet Pytorch

In [None]:
%%capture 
!pip install pycocotools
!pip install timm
!pip install omegaconf

In [None]:
%%capture
%cd efficientdet-pytorch-master/
from effdet import get_efficientdet_config, EfficientDet, DetBenchTrain
from effdet.efficientdet import HeadNet
import effdet
%cd ..

# Import Dependencies 

In [None]:
%%capture 
!pip install --upgrade albumentations==0.5.2

In [None]:
import math
import copy
import numpy as np
import pandas as pd
import random
import matplotlib.pyplot as plt 

import cv2
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import albumentations as A
from albumentations.pytorch import ToTensorV2
from sklearn.model_selection import ShuffleSplit
!pip install pytorch_lightning
import pytorch_lightning as pl

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')



# Process the Data 

In [None]:
class DataModule:
  data_csv = '../input/tacotrashdataset/meta_df.csv' # All the Data inside of TacoTrashDataset
  # Needs to be split.
  data_df = pd.read_csv(data_csv)
  # Change the paths to the BASE DATA DIR
  BASE_DATA_DIR = '../input/tacotrashdataset/data/'
  data_df['img_file'] = BASE_DATA_DIR + data_df['img_file']

  # ---------BASIC DATA PREP--------------
  # Convert the Multiple Rows of the DF into a single Caption
  all_unique_ids = data_df.img_file.unique()
  # One-Hot Encode classes
  classes2idx = {}
  idx2classes = {}
  ALL_CLASSES = sorted(data_df.cat_name.unique())
  for idx, class_name in enumerate(ALL_CLASSES):
    idx2classes[idx] = class_name
    classes2idx[class_name] = idx
  NUM_CLASSES = len(classes2idx)
  
  PAD_BOUNDING_BOXES = -100 # Pad with < -1. This Ignores the Regression Targets.

  TARGET_DIR = './drive/MyDrive/EARTH_Models/'

# EXPORT THE CLASSES

In [None]:
import json
with open(f"{DataModule.TARGET_DIR}classes.json", "w") as file:
  json.dump(DataModule.idx2classes, file)

In [None]:
def yolo_encode(x, y, width, height, img_width, img_height, class_idx):
  x1 = x
  x2 = x + width
  y1 = y 
  y2 = y + height

  cx = int((x1 + x2) / 2) 
  cy = int((y1 + y2) / 2)
  # Normalize By IMG_WIDTH, and IMG_HEIGHT
  cx = cx / img_width
  cy = cy / img_height

  width = width / img_width
  height = height / img_height
  return np.array([cx, cy, width, height, class_idx])

In [None]:
def xyxy_encode(x, y, width, height, class_idx):
  x0 = x
  y0 = y

  x1 = x0 + width
  y1 = y0 + height
  return np.array([x0, y0, x1, y1, class_idx]) 


In [None]:
# Convert the DF into a (cx, cy, w, h, cls) -> (cx / width, cy / height, w / width, h / height, cls)
# Bounding boxes are converted to relative ones in the EffDet module
# relative Bounding Boxes: (ax - cx, ay - cy, log(aw / w), log(ay / h), cls)

GT = {}
for id in DataModule.all_unique_ids:
  all_lines = DataModule.data_df[DataModule.data_df['img_file'] == id]
  bbox_annotations = []
  for line in all_lines.iterrows():
    line = line[1]
    x = line.x
    y = line.y
    img_width = line.img_width
    img_height = line.img_height

    width = line.width
    height = line.height
    annotation_id = DataModule.classes2idx[line.cat_name]

    bbox = xyxy_encode(x, y, width, height, annotation_id) 
    bbox_annotations += [bbox]
  bbox_annotations = np.stack(bbox_annotations)
  GT[id] = bbox_annotations
keys = np.array(tuple(GT))

In [None]:
# Compute Max number of bboxes(to Pad to) - ~100 Bboxes max(After Mosaic + Cutout, it's 600)
max_num = 0.0
for key in GT.keys():
  valid_rows = DataModule.data_df[DataModule.data_df.img_file == key]
  if len(valid_rows) > max_num:
    max_num = len(valid_rows)
print(max_num) 
MAX_NUMBER = 100 # 100 Bounding Boxes .

90


# Data Splits

In [None]:
TRAIN_DATA = {}
TRAIN_KEYS = []
VAL_DATA = {}
VAL_KEYS = []

splitter = ShuffleSplit(n_splits = 1, test_size = 0.1, train_size = 0.9, random_state = 42)

for train_idx, test_idx in splitter.split(keys):

  train_keys = keys[train_idx]
  test_keys = keys[test_idx]

  for key in train_keys:
    TRAIN_DATA[key] = GT[key]
    TRAIN_KEYS.append(key.item())
  for key in test_keys:
    VAL_DATA[key] = GT[key]
    VAL_KEYS.append(key.item())
  

In [None]:
class PadBBoxes(object):
  # Pads the Bounding Boxes.
  def __init__(self):
    self.num_bboxes = MAX_NUMBER
  def __call__(self, images, bboxes, classification):
    # Pads the bounding boxes with -1 obj score.
    # bboxes: Tensor(N, 5)
    # images: Tensor(C, H, W)
    # PAD THE NUMBER OF BOUNDING BOXES to 100.
    N = bboxes.shape[0]
    num_pad = self.num_bboxes - N # (100 - N, 5)
    pad_bboxes = torch.zeros((num_pad, 4), dtype = bboxes.dtype)
    bboxes = torch.cat([bboxes, pad_bboxes], dim = 0)

    pad_classification = torch.zeros((num_pad), dtype = classification.dtype) + DataModule.PAD_BOUNDING_BOXES
    classification = torch.cat([classification, pad_classification], dim = 0) # (N, )
    return images, bboxes, classification

# Torch Datasets

In [None]:
# Augmentations
IMG_SIZE = 512 # This will be deployed on a mobile device, we need the smallest models possible.
data_augmentations = A.Compose([
      A.RandomResizedCrop(IMG_SIZE, IMG_SIZE),
      A.OneOf([
          A.HueSaturationValue(hue_shift_limit=0.2, sat_shift_limit= 0.2, 
                                val_shift_limit=0.2, p=0.9),
          A.RandomBrightnessContrast(brightness_limit=0.2, 
                                      contrast_limit=0.2, p=0.9),
      ],p=0.9),
      A.ToGray(p=0.1),
      A.HorizontalFlip(p=0.5),
      A.VerticalFlip(p=0.5),
      A.RandomRotate90(p=0.5),
  A.Transpose(p=0.5),
  A.JpegCompression(quality_lower=85, quality_upper=95, p=0.2),
  A.OneOf([
  A.Blur(blur_limit=3, p=1.0),
  A.MedianBlur(blur_limit=3, p=1.0)
  ],p=0.1),
  A.Cutout(num_holes=8, max_h_size=16, max_w_size=16, fill_value=0, p=0.5),
  ToTensorV2(p=1.0),   
], bbox_params = A.BboxParams('pascal_voc', label_fields = ['classes']))

test_augmentations = A.Compose([
    A.Resize(IMG_SIZE, IMG_SIZE),
    ToTensorV2()
], bbox_params = A.BboxParams('pascal_voc', label_fields = ['classes']))

In [None]:
class ConvertToBBoxes(object):
  def __init__(self):
    self.num_classes = DataModule.NUM_CLASSES
  def __call__(self, images, bboxes):
    # images: tensor(B, C, H, W)
    # bboxes: tensor(B, N, 5)
    obj_idx = bboxes[:, -1].to(torch.int64) # (N, )
    
    bbox_reg = bboxes[:, :-1] # (N, 4)
    return bbox_reg, obj_idx    

In [None]:
class Mixup(object):
  # 1x1 Mixup.
  def __init__(self, p = 0.5):
    self.p = p
  def __call__(self, images, bounding_boxes, classification):
    # Images: Tensor(B, C, H, W)
    # Bounding Boxes: Tensor(B, N, 4)
    # (B, N, 5) ->(cx, cy, w, h, obj score) 
    B, C, H, W = images.shape
    new_images = []
    new_bounding_boxes = []
    new_classification = []
    for b in range(B):
      # Select One Other Bounding Box
      if random.random() > self.p:
        new_images += [images[b]]
        base_bbox = bounding_boxes[b]
        base_class_scores = classification[b]
        bboxes = torch.cat([base_bbox, base_bbox], dim = 0) 
        new_bounding_boxes += [bboxes]

        padded = torch.ones_like(base_class_scores) * DataModule.PAD_BOUNDING_BOXES
        new_cls = torch.cat([base_class_scores, padded], dim = 0)
        
        new_classification += [new_cls]
        continue 

      else:
        idx = random.randint(0, B - 1) # idx
        base_image = images[b] # (C, H, W)
        base_bounding_boxes = bounding_boxes[b] # (N, 5)
        base_classification = classification[b]

        second_image = images[idx] # (C, H, W)
        second_bounding_boxes = bounding_boxes[idx] # (N, 5)
        second_classification = classification[idx]

        # --------AVERAGE THE IMAGES-----------------
        new_image = (base_image + second_image) / 2 # (C, H, W)
        new_BBOX = torch.cat([base_bounding_boxes, second_bounding_boxes], dim = 0) # (2N, 5)
        new_CLASSES = torch.cat([base_classification, second_classification], dim = 0)


        new_images += [new_image]
        new_bounding_boxes += [new_BBOX]
        new_classification += [new_CLASSES] 

    new_images = torch.stack(new_images, dim = 0)
    new_bounding_boxes = torch.stack(new_bounding_boxes, dim = 0)
    new_classification = torch.stack(new_classification, dim = 0)
    return new_images, new_bounding_boxes, new_classification
class Mosaic(object):
  def __init__(self, p = 0.5):
    self.p = p
  def __call__(self, images, bounding_boxes, classification):
    # Images: Tensor(B, C, H, W)
    # Bounding Boxes: Tensor(B, N, 5)
    H = images.shape[2]
    B = images.shape[0] 
    new_images = []
    new_bounding_boxes = []
    new_classification = []
    for b in range(B):
      if random.random() > self.p:
        base_image = images[b] # (C, H, W) 
        base_bbox = bounding_boxes[b] # (N, 5)
        base_classification = classification[b]
        zero_classification = torch.ones_like(base_classification) * DataModule.PAD_BOUNDING_BOXES

        concatted_bounding_boxes = torch.cat([base_bbox, base_bbox, base_bbox, base_bbox], dim = 0)
        concatted_classification = torch.cat([base_classification, zero_classification, zero_classification, zero_classification])

        new_images += [base_image]
        new_bounding_boxes += [concatted_bounding_boxes]
        new_classification += [concatted_classification]
      else:
        # Select 3 Other indices to use
        idx1 = b
        idx2 = random.randint(0, B - 1)
        idx3 = random.randint(0, B - 1)
        idx4 = random.randint(0, B - 1)

        image1 = images[idx1].clone() # (3, H, W)
        image2 = images[idx2].clone() # (3, H, W)
        image3 = images[idx3].clone() # (3, H, W) 
        image4 = images[idx4].clone() # (3, H, W)

        bbox1 = bounding_boxes[idx1].clone() # (N, 4)
        bbox2 = bounding_boxes[idx2].clone() # (N, 4)
        bbox3 = bounding_boxes[idx3].clone() # (N, 4)
        bbox4 = bounding_boxes[idx4].clone() # (N, 4)

        classes1 = classification[idx1].clone()
        classes2 = classification[idx2].clone()
        classes3 = classification[idx3].clone()
        classes4 = classification[idx4].clone()
        BORDER = 50
        x1 = random.randint(0, H - BORDER)
        y1 = random.randint(0, H - BORDER) 

        corner1 = image1[:, y1:, x1:] # Bottom Right of Image
        corner2 = image2[:, :y1, x1:] # Top Right of Image 
        corner3 = image3[:, y1:, :x1] # Bottom Left of Image
        corner4 = image4[:, :y1, :x1] # Top Left of Image
        def compute_area(bboxes):
          w = torch.clip(bboxes[:, 3] - bboxes[:, 1], min = 0)
          h = torch.clip(bboxes[:, 2] - bboxes[:, 0], min = 0)
          return w * h 
        def add_to(bboxes, shift_x, shift_y):
          bboxes[:, [0, 2]] = bboxes[:, [0, 2]] + shift_y
          bboxes[:, [1, 3]] = bboxes[:, [1, 3]] + shift_x
          return bboxes
        def clip(bboxes, min_x, max_x, min_y, max_y, classes):
          bboxes[:, [0, 2]] = torch.clip(bboxes[:, [0, 2]], min = min_y, max = max_y)
          bboxes[:, [1, 3]] = torch.clip(bboxes[:, [1, 3]], min = min_x, max = max_x)
          area = compute_area(bboxes)
          remove = area <= 0 
          classes[remove] = DataModule.PAD_BOUNDING_BOXES
          return bboxes, classes
        # Shift the Bounding Boxes
        
        bboxes1, classes1 = clip(bbox1, x1, H, y1, H, classes1)  # Bottom Right
        bboxes2, classes2= clip(bbox2, x1, H, 0, y1, classes2) # TOp Right
        bboxes3, classes3 = clip(bbox3, 0, x1, y1, H, classes3) # Bottom Left
        bboxes4, classes4 = clip(bbox4, 0, x1, 0, y1, classes4) # Top Left

        bboxes = torch.cat([bboxes1, bboxes2, bboxes3, bboxes4])
        classes = torch.cat([classes1, classes2, classes3, classes4])

        top_half = torch.cat([corner2, corner1], dim = 1)
        bottom_half = torch.cat([corner4, corner3], dim = 1)

        full_image = torch.cat([bottom_half, top_half], dim = 2)
        new_images += [full_image]
        new_bounding_boxes += [bboxes]
        new_classification += [classes]
    return torch.stack(new_images), torch.stack(new_bounding_boxes), torch.stack(new_classification)

In [None]:
class TrainDataset(torch.utils.data.Dataset):
  def __init__(self, train_data, train_keys):
    self.train_data = train_data
    self.train_keys = train_keys
    
    self.augments = data_augmentations
    self.convert_bboxes = ConvertToBBoxes()
    self.padding_boxes = PadBBoxes()
  def __len__(self):
    return len(self.train_data)
  def __getitem__(self, idx):
    key = self.train_keys[idx] 
    data = self.train_data[key] # (N, 5)
    image = cv2.imread(key)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    # Extract away the classes and augment
    classes = data[:, -1] # (N,)
    bbox_reg = data[:, :-1] # (N, 4)
    _, H, _ = image.shape
 
    # Remove Weird Tiny Bounding Boxes with virtually 0 area
    keep = (bbox_reg > 0.0).astype(np.int32) 
    ultikeep = keep[:, 0]
    for i in range(keep.shape[1]):
      ultikeep = ultikeep * keep[:, i]
    keep = ultikeep.astype(np.bool)

    bbox_reg = bbox_reg[keep].astype(np.float32)
    classes = classes[keep]
    # Clip the Bboxes
    bbox_reg = np.clip(bbox_reg, a_min = 0, a_max = None)
    augmented = self.augments(image = image, bboxes = bbox_reg, classes = classes)
    image = torch.tensor(augmented['image']) / 255.0
    bbox_reg = torch.tensor(augmented['bboxes'])
    classes = torch.tensor(augmented['classes'])

    bbox_reg = bbox_reg.view(-1, 4)
    classes = classes.view(-1) 

    bbox_reg = torch.cat([bbox_reg, torch.unsqueeze(classes, dim = -1)], dim = -1) # (B, 5)
    bbox_reg, classification = self.convert_bboxes(image, bbox_reg)
    # Pad the Bounding Boxes
    image, bbox_reg, classification = self.padding_boxes(image, bbox_reg, classification)

    # Convert to YXYX
    bbox_reg = bbox_reg[:, [1, 0, 3, 2]]
    return image.float(), bbox_reg.float(), classification.float()

In [None]:
class ValDataset(torch.utils.data.Dataset):
  def __init__(self, val_data, val_keys):
    self.val_data = val_data
    self.val_keys = val_keys

    self.augmentations = test_augmentations
    self.convert_bboxes = ConvertToBBoxes()
    self.padding_bboxes = PadBBoxes()
  def __len__(self):
    return len(self.val_data)
  def __getitem__(self, idx):
    key = self.val_keys[idx]
    data = self.val_data[key]

    image = cv2.imread(key)
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

    classes = data[:, -1] # (N, )
    bbox_reg = data[:, :-1] # (N, 4)
    # Remove Weird Tiny Bounding Boxes with virtually 0 area
    keep = (bbox_reg > 0.0).astype(np.int32) 
    ultikeep = keep[:, 0]
    for i in range(keep.shape[1]):
      ultikeep = ultikeep * keep[:, i]
    keep = ultikeep.astype(np.bool)

    bbox_reg = bbox_reg[keep].astype(np.float32)
    classes = classes[keep]
    # Clip the Bboxes
    bbox_reg = np.clip(bbox_reg, a_min = 0, a_max = None)

    augmented = self.augmentations(image = image, bboxes = bbox_reg, classes = classes)
    
    images = torch.tensor(augmented['image']) / 255.0
    bbox_reg = torch.tensor(augmented['bboxes'])
    classes = torch.tensor(augmented['classes'])
    
    bbox_reg = bbox_reg.view(-1, 4)
    classes = classes.view(-1, 1)

    bboxes = torch.cat([bbox_reg, classes], dim = -1)
    bbox_reg, classification = self.convert_bboxes(images, bboxes)
    images, bbox_reg, classification = self.padding_bboxes(images, bbox_reg, classification)
    # Convert YOLO to Regular BBOX format(EffDet Needs [yxyx format]).
    bbox_reg = yxyx_to_xyxy(bbox_reg)
    return images.float(), bbox_reg.float(), classification.float() 

def get_dfs():
  train_dataset = TrainDataset(TRAIN_DATA, TRAIN_KEYS)
  val_dataset = ValDataset(VAL_DATA, VAL_KEYS)
  return train_dataset, val_dataset

# Helper Functions

In [None]:
def select_only_non_padded_bboxes(bboxes, classification):
  indices = classification >= 0.0
  return bboxes[indices], classification[indices]
def yxyx_to_xyxy(bbox):
  bbox = bbox[:, [1, 0, 3, 2]]
  return bbox
def display_bbox_normal(image, bbox):
  for box in bbox:
    if box[-1] == 0:
      continue
    image = image.copy()
    cv2.rectangle(image, (int(box[0]), int(box[1])), (int(box[2]), int(box[3])), 220, 3)
  plt.imshow(image)
  plt.show()   


# Collate Function(With Mosaic and Mixup)

In [None]:
class Collator:
  mosaic = Mosaic(p = 1)
  mixup = Mixup(p = 0.5) 

  @classmethod
  def train_collate_fn(cls, all_boxes):
    images = [ex[0] for ex in all_boxes]
    bboxes = [ex[1] for ex in all_boxes]
    classification = [ex[2] for ex in all_boxes]

    images = torch.stack(images)
    bboxes = torch.stack(bboxes)
    classification = torch.stack(classification)
    
    images, bboxes, classification = cls.mixup(images, bboxes, classification)
    #images, bboxes, classification = cls.mosaic(images, bboxes, classification) # Mosaic is unneccesary.
    return images, bboxes, classification
  @classmethod
  def val_collate_fn(cls, all_boxes):
    images = [ex[0] for ex in all_boxes]
    bboxes = [ex[1] for ex in all_boxes]
    classification = [ex[2] for ex in all_boxes]

    images = torch.stack(images)
    bboxes = torch.stack(bboxes)
    classification = torch.stack(classification)
    return images, bboxes, classification

# Model Used: EfficientDet-D0 - AdvProp

Download Model Files

In [None]:
!wget https://github.com/rwightman/efficientdet-pytorch/releases/download/v0.1/tf_efficientdet_d0_ap-d0cdbd0a.pth

--2021-06-26 01:23:22--  https://github.com/rwightman/efficientdet-pytorch/releases/download/v0.1/tf_efficientdet_d0_ap-d0cdbd0a.pth
Resolving github.com (github.com)... 192.30.255.112
Connecting to github.com (github.com)|192.30.255.112|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://github-releases.githubusercontent.com/250391956/1bd07180-a9cc-11eb-9fcd-e30331ecc639?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20210626%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20210626T012322Z&X-Amz-Expires=300&X-Amz-Signature=fa692e15c529f529fee38b9e5d673931365b937c3996c949663761c5703738e6&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=250391956&response-content-disposition=attachment%3B%20filename%3Dtf_efficientdet_d0_ap-d0cdbd0a.pth&response-content-type=application%2Foctet-stream [following]
--2021-06-26 01:23:22--  https://github-releases.githubusercontent.com/250391956/1bd07180-a9cc-11eb-9fcd-e30331ecc639?X-Amz-Algorithm=

In [None]:
class PyTorchLightningModelWrapper(pl.LightningModule):
  def __init__(self, model):
    super().__init__()
    self.model = model
  def forward(self, *args, **kwargs):
    return self.model(*args, **kwargs)
IMG_SIZE = 512
def get_net():
    config = get_efficientdet_config('tf_efficientdet_d0') # Smallest Model Possible, to make it runable on CPU(Especially an Android CPU)
    net = EfficientDet(config, pretrained_backbone=False)
    checkpoint = torch.load('/content/tf_efficientdet_d0_ap-d0cdbd0a.pth')
    net.load_state_dict(checkpoint)
    effdet.config.config_utils.set_config_writeable(config) 
    config.num_classes = DataModule.NUM_CLASSES
    config.image_size = (IMG_SIZE, IMG_SIZE)
    effdet.config.config_utils.set_config_readonly(config)
    net.class_net = HeadNet(config, num_outputs=config.num_classes)
    model = DetBenchTrain(net, config)
    return PyTorchLightningModelWrapper(model)

# Training Config 

In [None]:
class TrainingConfig:
  batch_size = 16
  shuffle = True
  num_workers = 4
  pin_memory = True
  collate_fn = Collator.val_collate_fn
  num_epochs = 1000000

  config = {
      'batch_size': batch_size,
      'shuffle': shuffle,
      'num_workers': num_workers,
      'pin_memory': pin_memory,
      'collate_fn': collate_fn
  }
class ValidationConfig:
  batch_size = 16
  shuffle = False
  num_workers = 4 
  pin_memory = True
  collate_fn = Collator.val_collate_fn

  config = {
      'batch_size': batch_size,
      'shuffle': shuffle,
      'num_workers': num_workers,
      'pin_memory': pin_memory,
      'collate_fn': collate_fn
  }

# Instantiate Model Object

In [None]:

class TRAININGOBJECT(pl.LightningModule):
  def __init__(self):
    super().__init__()
    # ----------------PARAMETERS----------------
    train, val = get_dfs()
    self.lr = 1e-5
    self.weight_decay = 1e-6
    self.max_lr = 1e-4
    self.NUM_TRAIN = len(train)
    self.NUM_VAL = len(val)
    self.steps_per_epoch = len(train) // TrainingConfig.batch_size 
    self.num_epochs =TrainingConfig.num_epochs
    self.total_steps = self.num_epochs * self.steps_per_epoch
    # ----------------DEFINE OBJECTS------------
    self.model = get_net()
    

# Logger Object

In [None]:
class Logger(pl.LightningModule):
  def __init__(self, training_object):
    super().__init__()
    self.training_object = training_object
    self.best_loss = float('inf')
    self.save_path = '/content/drive/MyDrive/EARTH_Models/'
    self.EPOCHS = 0
    
  def update_states(self, train_loss, val_loss):

    if val_loss <= self.best_loss:
      print("SAVING STATES")
      torch.save(self.training_object.model.model.state_dict(), f"{self.save_path}best.pth")
      
      self.best_loss = val_loss
    print(f"E: {self.EPOCHS}, BL: {self.best_loss}, TL: {train_loss}, VL: {val_loss}")
    self.EPOCHS += 1    

# Create Training Loop

Train Loop

In [None]:
def train_step(images, bboxes, classification, training_object):
  # Images: Tensor(B, 3, 512, 512)
  # Bboxes: Tensor(B, 100, 4)
  # Classes: Tensor(B, 100)
 
  outputs = training_object.model(images, target = {'bbox': bboxes, 'cls': classification})
  # Grab the Loss
  loss = outputs['loss']

  return loss

Val Loop

In [None]:
def val_step(images, bboxes, classification, training_object):
  # Images: Tensor(B, 3, 512, 512)
  # Bboxes: tensor(B, 100, 4)
  # Classes: Tensor(B, 100)
  # Run the model, grab the predictions for later(We use Loss as Eval Metric, but you could compute MAP if you wanted - too slow)

  with torch.no_grad():
    outputs = training_object.model(images, target = {
      'bbox': bboxes.float(), 
      'cls': classification, 
      'img_scale': torch.tensor([1.0] * TrainingConfig.batch_size, dtype=torch.float).to(device), 
      'img_size':  torch.tensor([images[0].shape[-2:]] * TrainingConfig.batch_size, 
      dtype=torch.float).to(device)
    })
  loss = outputs['loss']
  return loss


# Pytorch Lightning Based Training Code

In [None]:
class PyTorchLightningModel(pl.LightningModule):
  def __init__(self):
    super().__init__()
    self.model = self.configure_model()

    self.logger_object = Logger(self.model)


    self.train_loss = 0.0
    self.train_steps = 0
    self.val_loss = 0.0
    self.val_steps = 0
  def reset_states(self):
    self.train_loss, self.train_steps, self.val_loss, self.val_steps = [0] * 4 
  def forward(self, images):
    pass # No Need for Forward Method, never used at Train 
  def configure_model(self):
    model = TRAININGOBJECT()
    return model 
  def configure_optimizers(self):
    optimizer = optim.AdamW(self.logger_object.training_object.model.parameters(), 
        lr = self.logger_object.training_object.lr,
        weight_decay = self.logger_object.training_object.weight_decay)
    
    lr_scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, 5, eta_min = 1e-13, verbose = True)
    return {'optimizer': optimizer, 'lr_scheduler': lr_scheduler}
  def training_step(self, batch, batch_idx):
    images, bboxes, classification = batch
    images = images.half()
    loss = train_step(images, bboxes, classification, training_object=self.logger_object.training_object)
    self.train_loss += loss.item()
    self.train_steps += 1
    return loss
  def validation_step(self, batch, batch_idx):
    images, bboxes, classification = batch
    images = images.half()
    loss = val_step(images, bboxes, classification, training_object=self.logger_object.training_object)
    self.val_loss += loss.item()
    self.val_steps += 1
  def validation_epoch_end(self, logs):
    eps = 1e-10
    self.logger_object.update_states((self.train_loss + eps) / (self.train_steps + eps), (self.val_loss + eps) / (self.val_steps + eps))
    self.train_loss = 0.
    self.train_steps = 0.
    self.val_loss = 0.
    self.val_steps = 0.0


# Training Loop. 

In [None]:
def TRAIN_MODEL(model_path):
  model = PyTorchLightningModel()
  if model_path:
    model.model.model.model.load_state_dict(torch.load(model_path, map_location = torch.device("cuda" if torch.cuda.is_available() else 'cpu')))
  train, val = get_dfs()
  train_dataloader = torch.utils.data.DataLoader(dataset = train, **TrainingConfig.config)
  val_dataloader = torch.utils.data.DataLoader(dataset =  val, **ValidationConfig.config)
  
  # Callbacks
  cbs = []
  # Create Trainer Object
  trainer = pl.Trainer(
      gpus = 1,
      precision = 16,
      overfit_batches = 0.0,
      check_val_every_n_epoch = 1,
      callbacks = cbs,
      gradient_clip_val = 20.0,
      max_epochs =TrainingConfig.num_epochs,
      profiler = None, 
      checkpoint_callback = False,
      benchmark = True,
      deterministic = False,
      num_sanity_val_steps = 0,
      logger = None
  )
  trainer.fit(model, train_dataloader, val_dataloader)


In [None]:
TRAIN_MODEL('/content/drive/MyDrive/EARTH_Models/best.pth')

In [None]:
from effdet.bench import *

In [None]:
class InferenceModel(pl.LightningModule):
  def __init__(self, prev_model):
    super().__init__()
    self.prev_model = prev_model 
    self.model = PyTorchLightningModel()
    self.model.model.model.model.load_state_dict(torch.load(self.prev_model, map_location = device))
    self.model = DetBenchPredict(self.model.model.model.model.model)
  def forward(self, x):
    self.eval()
    B = x.shape[0]
    with torch.no_grad():
      x = x.to(device)
      predictions =  self.model(x, img_info = {'img_scale': torch.tensor([1.0] * B, dtype=torch.float).to(device), 
      'img_size':  torch.tensor([x[0].shape[-2:]] * B, 
      dtype=torch.float).to(device)})
      # Hard NMS
      idx = torchvision.ops.nms(predictions[0, :, :4], predictions[0, :, 4], iou_threshold = 0.5)
      predictions = predictions[0, idx]
      return predictions
    

In [None]:
train, val = get_dfs()

In [None]:
model = InferenceModel('/content/drive/MyDrive/EARTH_Models/best.pth')

In [None]:
for images, bboxes, classification in val:
  pred = model(images.unsqueeze(0))
  #print(classification)
  display_preds(images.transpose(0, 1).transpose(1, 2).numpy(), pred.cpu().numpy(), thresh = 0.25)
  display_bbox_normal(images.transpose(0, 1).transpose(1, 2).numpy(), yxyx_to_xyxy(bboxes.numpy(), ))

In [None]:
def display_preds(image, pred, thresh = 0.2):
  for bbox in pred:
    x1, y1, x2, y2, obj, cls = bbox
    
    if obj > thresh:
      print(DataModule.idx2classes[cls.item()])
      
      image = image.copy()
      cv2.rectangle(image, (int(x1), int(y1)), (int(x2), int(y2)), 220, 3)
  plt.imshow(image)
  plt.show() 