<a href="https://colab.research.google.com/github/chipzm/NUIG_Thesis/blob/main/Project_Final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#installing the super-gradients package to get the YOLO NAS trainers and dataloaders
!pip install super-gradients

In [None]:
!pip install -q split-folders

In [None]:
# improting the required libraries
import glob
import os
import shutil
import json
import random
import numpy as np
import tqdm
import cv2
import sys
import math
import numpy as np
import torch
import splitfolders

from pycocotools.coco import COCO

In [None]:
#to set the encoding
import locale
def getpreferredencoding(do_setlocale = True):
    return "UTF-8"
locale.getpreferredencoding = getpreferredencoding

# Model 1 - YOLO NAS (num classes: 1, epochs: 20)
### Leopard Dataset

In [None]:
#to get the list of available datasets under "wild me" foundation form the public storage of LILA BC
!gsutil ls gs://public-datasets-lila/wild-me/

In [None]:
#copying the leopard dataset tar.gz file into the local directory of colab
!gsutil -m cp -r gs://public-datasets-lila/wild-me/leopard.coco.tar.gz .

In [None]:
# Extract tar.gz files
!tar -xzvf "/content/leopard.coco.tar.gz" -C "/content/"

In [None]:
#reading the COCO .json file
data_source = COCO(annotation_file='/content/leopard.coco/annotations/instances_train2022.json')

In [None]:
# extracting the classes and the resective class_ids and creating dictionaries
img_ids = data_source.getImgIds()
catIds = data_source.getCatIds()

categories = data_source.loadCats(catIds)
categories.sort(key=lambda x: x['id'])

classes = {}
coco_labels = {}
coco_labels_inverse = {}

for c in categories:
    coco_labels[len(classes)] = c['id']
    coco_labels_inverse[c['id']] = len(classes)
    classes[c['name']] = len(classes)

class_num = {}

In [None]:
# making new directories to insert the changes json file and the respective images
!mkdir -p dataset/labels dataset/images

save_base_path  = 'dataset/labels/'
save_image_path = 'dataset/images/'

# remapping label id to 0~1
label_transfer = {0: 0}

In [None]:
'''
The json file is converted into txt files. The image_ids in the json file is used to determine which annotation is for which image. Once getting that information, the bounding box coordinates is obtained and
the format gets converted to the required format for YOLO NAS.
Seperate text files are generated and both the images and the these generated text files are then stored in the new folder "images" and "labels" respectively.
'''
for index, img_id in tqdm.tqdm(enumerate(img_ids), desc='change .json file to .txt file'):
    img_info = data_source.loadImgs(img_id)[0]

    # Change the path containing the folder to the file name
    save_name = img_info['file_name'].replace('/', '_')

    # Removing file extensions
    file_name = save_name.split('.')[0]

    # Get the width and height of each image
    height = img_info['height']
    width = img_info['width']

    # Save the converted txt file into tmp folder created
    save_path = save_base_path + file_name + '.txt'
    is_exist = False # To check whether the file already exists

    with open(save_path, mode='w') as fp:
      annotation_id = data_source.getAnnIds(img_id)
      boxes = np.zeros((0, 5))
      if len(annotation_id) == 0: # if there are no annotations for that image or no classes in the image
            fp.write('')
            continue

      annotations = data_source.loadAnns(annotation_id)
      lines = ''

      for annotation in annotations:
        # to get the label of the object
        label = coco_labels_inverse[annotation['category_id']]

        if label in label_transfer.keys():
          is_exist = True
          box = annotation['bbox']
          if box[2] < 1 or box[3] < 1:
            continue
          box[0] = round((box[0] + box[2] / 2) / width, 6)
          box[1] = round((box[1] + box[3] / 2) / height, 6)
          box[2] = round(box[2] / width, 6)
          box[3] = round(box[3] / height, 6)

          if label not in class_num.keys():
            class_num[label] = 0

          class_num[label] += 1
          lines = lines + str(label)

          for i in box:
            lines += ' ' + str(i)
          lines += '\n'

      fp.writelines(lines)

    if is_exist:
        # if there is a target type directory, copy it
      shutil.copy('/content/leopard.coco/images/train2022/{}'.format(img_info['file_name']), os.path.join(save_image_path, save_name))
    else:
      os.remove(save_path)

In [None]:
# calculating the total number of files in the images and labels folders
img_src = []
ann_src = []

img = r'/content/dataset/images'
ann = r'/content/dataset/labels'

for root, dirs, files in os.walk(img):
  for filename in files:
    img_src.append(filename)

for root, dirs, files in os.walk(ann):
  for filename in files:
    ann_src.append(filename)

len(img_src), len(ann_src), len(img_src)+len(ann_src)

In [None]:
# to remove a directory and its sub directories
directory = ['/content/leopard.coco']

for dir in directory:
  shutil.rmtree(dir)

In [None]:
# to split the dataset into train, test and validation dataset of the ratio 80%, 10% and 10%
import splitfolders
splitfolders.ratio('dataset', output="leopard_dataset", seed=1337, ratio=(.8, 0.1,0.1))

In [None]:
# calculating the total number of files in the train folder
train_img = []
train_ann = []

img = r'/content/leopard_dataset/train/images'
ann = r'/content/leopard_dataset/train/labels'

for root, dirs, files in os.walk(img):
  for filename in files:
    train_img.append(filename)

for root, dirs, files in os.walk(ann):
  for filename in files:
    train_ann.append(filename)

len(train_img), len(train_ann), len(train_img)+len(train_ann)

In [None]:
# calculating the total number of files in the test folder
test_img = []
test_ann = []

img = r'/content/leopard_dataset/test/images'
ann = r'/content/leopard_dataset/test/labels'

for root, dirs, files in os.walk(img):
  for filename in files:
    test_img.append(filename)

for root, dirs, files in os.walk(ann):
  for filename in files:
    test_ann.append(filename)

len(test_img), len(test_ann), len(test_img)+len(test_ann)

In [None]:
# calculating the total number of files in the val folder
val_img = []
val_ann = []

img = r'/content/leopard_dataset/val/images'
ann = r'/content/leopard_dataset/val/labels'

for root, dirs, files in os.walk(img):
  for filename in files:
    val_img.append(filename)

for root, dirs, files in os.walk(ann):
  for filename in files:
    val_ann.append(filename)

len(val_img), len(val_ann), len(val_img)+len(val_ann)

In [None]:
#importing the required libraries and packages
from PIL import Image
import torch

import random
import pathlib
import requests

from torch.utils.data import Dataset
import json
from torchvision import transforms, utils

from pathlib import Path
import cv2
import matplotlib.pyplot as plt

from super_gradients.training import Trainer, dataloaders, models
from super_gradients.training.dataloaders.dataloaders import coco_detection_yolo_format_train, coco_detection_yolo_format_val
from super_gradients.training.losses import PPYoloELoss
from super_gradients.training.metrics import DetectionMetrics_050
from super_gradients.training.models.detection_models.pp_yolo_e import PPYoloEPostPredictionCallback

In [None]:
# creating a class o store the required trainer, dataset, dataloader and model parameters
class config:
  #trainer params
  exp_name = 'leopard_id' #assigning a new experiment name
  chkpt = 'checkpoint' #assigning a new checkpoint name which stores all the log files and weights during the training

  #dataset params
  data_dir = '/content/leopard_dataset'

  train_images = 'train/images'
  train_labels = 'train/labels'

  val_images = 'val/images'
  val_labels = 'val/labels'

  test_images = 'test/images'
  test_labels = 'test/labels'

  classes = ['leopard'] #name of the classes availabe in the dataset.
  num_classes = len(classes)

  #dataloader params - you can add whatever PyTorch dataloader params you have could be different across train, val, and test
  dataloader_params = {'batch_size':8, 'num_workers':2}

  #model params
  model_name = 'yolo_nas_l' #the model of yolo nas used. This is a large model
  pretrained_weights = 'coco' #pre trained coco weights are used

In [None]:
#instantiating the trainer
trainer = Trainer(experiment_name = config.exp_name, ckpt_root_dir = config.chkpt)

In [None]:
#assigning to dataloaders and datasets
train_data = coco_detection_yolo_format_train(dataset_params = {'data_dir': config.data_dir,
                                'images_dir': config.train_images,
                                'labels_dir': config.train_labels,
                                'classes': config.classes},
              dataloader_params = config.dataloader_params)

val_data = coco_detection_yolo_format_val(dataset_params = {'data_dir': config.data_dir,
                                'images_dir': config.val_images,
                                'labels_dir': config.val_labels,
                                'classes': config.classes},
              dataloader_params = config.dataloader_params)

test_data = coco_detection_yolo_format_val(dataset_params = {'data_dir': config.data_dir,
                                'images_dir': config.test_images,
                                'labels_dir': config.test_labels,
                                'classes': config.classes},
              dataloader_params = config.dataloader_params)

In [None]:
#to plot some training images
train_data.dataset.plot()

In [None]:
# Instantiating the model
model = models.get(config.model_name,
                   num_classes=config.num_classes,
                   pretrained_weights=config.pretrained_weights)

In [None]:
# Defining training metrics and parameters
train_params = {# ENABLING SILENT MODE
    "average_best_models":True,
    "warmup_mode": "linear_epoch_step",
    "warmup_initial_lr": 1e-5,
    "lr_warmup_epochs": 5,
    "initial_lr": 3e-4,
    "lr_mode": "cosine",
    "cosine_final_lr_ratio": 0.1,
    "optimizer": "Adam",
    "optimizer_params": {"weight_decay": 0.0001},
    "zero_weight_decay_on_bias_and_bn": True,
    "ema": True,
    "ema_params": {"decay": 0.9, "decay_type": "threshold"},
    "max_epochs": 20, #assigning 20 epochs for this run
    "mixed_precision": True,
    "loss": PPYoloELoss(
        use_static_assigner=False,
        num_classes=config.num_classes,
        reg_max=16
    ),
    "valid_metrics_list": [
        DetectionMetrics_050(
            score_thres=0.1,
            top_k_predictions=300,
            num_cls=config.num_classes,
            normalize_targets=True,
            post_prediction_callback=PPYoloEPostPredictionCallback(
                score_threshold=0.01,
                nms_top_k=1000,
                max_predictions=300,
                nms_threshold=0.7
            )
        )
    ],
    "metric_to_watch": 'mAP@0.50'
}

In [None]:
# Training the model
trainer.train(model = model,
              training_params = train_params,
              train_loader = train_data,
              valid_loader = val_data)

In [None]:
# Analyze training metrics using tensorboard
%load_ext tensorboard
%tensorboard --logdir {config.chkpt}/{config.exp_name}

In [None]:
#to copy the entore dorectory to gdrive
%cp -av "/content/checkpoint" "/content/drive/MyDrive"

#to zip the checkpoint directory to download or copy
!zip -r /content/leopard_chekpt.zip /content/checkpoint

In [None]:
#geting the best model of the run from the chkpt path
best_model = models.get(config.model_name,
                        num_classes = config.num_classes,
                        checkpoint_path = os.path.join(config.chkpt, config.exp_name, 'average_model.pth'))

In [None]:
# Evaluating the best model on testing dataset
trainer.test(model = best_model,
             test_loader = test_data,
             test_metrics_list = DetectionMetrics_050(score_thres=0.1,
                                                   top_k_predictions=300,
                                                   num_cls=config.num_classes,
                                                   normalize_targets=True,
                                                   post_prediction_callback=PPYoloEPostPredictionCallback(score_threshold=0.01,
                                                                                                          nms_top_k=1000,
                                                                                                          max_predictions=300,
                                                                                                          nms_threshold=0.7)
                                                   )
             )

In [None]:
#to analyse and inference the testing data on the best model

In [None]:
# Inference with trained model
import supervision as sv

ds = sv.DetectionDataset.from_yolo(
    images_directory_path = '/content/leopard_dataset/test/images',
    annotations_directory_path = '/content/leopard_dataset/test/labels',
    data_yaml_path='/content/leopard.yaml",
    force_masks = False
)

CONFIDENCE_TRESHOLD = 0.5

predictions = {}

for image_name, image in ds.images.items():
    result = list(best_model.predict(image, conf=CONFIDENCE_TRESHOLD))[0]
    detections = sv.Detections(xyxy = result.prediction.bboxes_xyxy,
                               confidence=result.prediction.confidence,
                               class_id=result.prediction.labels.astype(int)
                               )
    predictions[image_name] = detections

In [None]:
# Visualize inference results
random.seed(10)

MAX_IMAGE_COUNT = 5

n = min(MAX_IMAGE_COUNT, len(ds.images))

keys = list(ds.images.keys())
keys = random.sample(keys, n)

box_annotator = sv.BoxAnnotator()

images = []
titles = []

for key in keys:
    frame_with_annotations = box_annotator.annotate(
        scene=ds.images[key].copy(),
        detections=ds.annotations[key],
        skip_label=True
    )
    images.append(frame_with_annotations)
    titles.append('annotations')
    frame_with_predictions = box_annotator.annotate(
        scene=ds.images[key].copy(),
        detections=predictions[key],
        skip_label=True
    )
    images.append(frame_with_predictions)
    titles.append('predictions')

%matplotlib inline
sv.plot_images_grid(images=images, titles=titles, grid_size=(n, 2), size=(2 * 4, n * 4))

In [None]:
# Calculate confusion matrix
!pip install -q onemetric

from onemetric.cv.object_detection import ConfusionMatrix

keys = list(ds.images.keys())

annotation_batches, prediction_batches = [], []

for key in keys:
    annotation=ds.annotations[key]
    annotation_batch = np.column_stack((
        annotation.xyxy,
        annotation.class_id
    ))
    annotation_batches.append(annotation_batch)

    prediction=predictions[key]
    prediction_batch = np.column_stack((
        prediction.xyxy,
        prediction.class_id,
        prediction.confidence
    ))
    prediction_batches.append(prediction_batch)

confusion_matrix = ConfusionMatrix.from_detections(
    true_batches=annotation_batches,
    detection_batches=prediction_batches,
    num_classes=len(ds.classes),
    conf_threshold=CONFIDENCE_TRESHOLD
)

confusion_matrix.plot(os.path.join(HOME, "confusion_matrix.png"), class_names=ds.classes)

In [None]:
# Predicting with new data after 20 epochs
url = 'https://cdn.britannica.com/40/146340-050-297B2798/Leopard-tree-Kenya-Masai-Mara-National-Reserve.jpg'
best_model.predict(url).show()

In [None]:
#implementing the counter into the images to determine how many of each species is available in the image
import matplotlib.image as mpimg
import matplotlib.pyplot as plt
from matplotlib.pyplot import imshow
%matplotlib inline

from google.colab.patches import cv2_imshow
from collections import Counter

# pred = best_model.predict('https://img.theweek.in/content/dam/week/news/india/images/2020/12/22/leopard-pti.jpg')
pred = best_model.predict('https://cdn.britannica.com/40/146340-050-297B2798/Leopard-tree-Kenya-Masai-Mara-National-Reserve.jpg')
# pred = best_model.predict('https://i.pinimg.com/1200x/da/8f/72/da8f729a05ad064830f30cc757d2d277.jpg')
# pred = best_model.predict('https://media.istockphoto.com/id/1314203068/photo/leopard.jpg?s=2048x2048&w=is&k=20&c=zHRm1lyeITCZBP02sFaGvqoF0D8w1ngBVYrs6R9oR_c=')

result = list(pred)[0]

class_names = list(result.class_names)

counter = Counter(list(result.prediction.labels.astype("int")))
output = list(counter.items())
output = list(map(lambda x: (class_names[x[0]],x[1]),output))

# saving the predicted image
pred.save(output_folder="/content/output")

print(output) # To get the number of objects in each class


# path
path = r'/content/output/pred_0.jpg'

# Reading an image in default mode
image = cv2.imread(path)

for i in range(len(output)):
  org = (50, 50*i+50) #assigning the position in the image where the cout has to be displayed
  image = cv2.putText(image, str(output[i][0]) + ': '+ str(output[i][1]), org, cv2.FONT_HERSHEY_SIMPLEX, fontScale=1, color=(0, 0, 255), thickness =2, lineType = cv2.LINE_AA)

# Displaying the image with the count
cv2_imshow(image)

#Model 2 - YOLO NAS (num classes: 81, epochs: 25)
## Kaggle Dataset and LILA BC Hyena Dataset

In [None]:
#downloading the kaggle data
!pip install -q kaggle

from google.colab import files
files.upload()

!mkdir ~/.kaggle
!cp kaggle.json ~/.kaggle/

!chmod 600 ~/.kaggle/kaggle.json

!kaggle datasets download -d antoreepjana/animals-detection-images-dataset

!unzip /content/animals-detection-images-dataset.zip

In [None]:
data_dir="/content"
train_dir = os.path.join(data_dir, "train")
test_dir = os.path.join(data_dir, "test")

all_train_subdir=glob.glob(train_dir+"/*")
all_test_subdir=glob.glob(test_dir+"/*")

train_classes=[os.path.basename(path) for path in all_train_subdir]
test_classes=[os.path.basename(path) for path in all_test_subdir]

print("There is %d classes in train dataset, and %d classes in test dataset"%(len(train_classes), len(test_classes)))

There is 80 classes in train dataset, and 80 classes in test dataset


In [None]:
#downloading the hyena dataset from the LILA BC public folder
!gsutil -m cp -r gs://public-datasets-lila/wild-me/hyena.coco.tar.gz .

Copying gs://public-datasets-lila/wild-me/hyena.coco.tar.gz...
/ [0/1 files][    0.0 B/  3.2 GiB]   0% Done                                    ==> NOTE: You are downloading one or more large file(s), which would
run significantly faster if you enabled sliced object downloads. This
feature is enabled by default but requires that compiled crcmod be
installed (see "gsutil help crcmod").

- [1/1 files][  3.2 GiB/  3.2 GiB] 100% Done  37.4 MiB/s ETA 00:00:00           
Operation completed over 1 objects/3.2 GiB.                                      


In [None]:
# Extract tar.gz files
!tar -xzvf "/content/hyena.coco.tar.gz" -C "/content/"

In [None]:
file_path = '/content/hyena.coco/annotations/instances_train2022.json'

with open(file_path, 'r', encoding='utf-8') as json_file:
    data = json.load(json_file)

data.keys()

dict_keys(['info', 'licenses', 'categories', 'images', 'annotations', 'parts'])

In [None]:
with open(file_path, 'r', encoding='utf-8') as json_file:
    data = json.load(json_file)

for item in data['categories']:
  item['id'] = 80

for item1 in data['annotations']:
  item1['category_id'] = 80

with open(file_path, 'w', encoding='utf-8') as json_file:
    json.dump(data, json_file)

In [None]:
file_path = '/content/hyena.coco/annotations/instances_train2022.json'

with open(file_path, 'r', encoding='utf-8') as json_file:
    data = json.load(json_file)

data['categories']

[{'id': 80, 'name': 'hyena', 'supercategory': 'animal'}]

In [None]:
#loading the json file and converting the class_id to 80 to match
data_source = COCO(annotation_file='/content/hyena.coco/annotations/instances_train2022.json')

img_ids = data_source.getImgIds()
catIds = data_source.getCatIds()

categories = data_source.loadCats(catIds)
categories.sort(key=lambda x: x['id'])

classes = {}
coco_labels = {}
coco_labels_inverse = {}

for c in categories:
  print(c)
  coco_labels[80] = c['id']
  coco_labels_inverse[c['id']] = 80
  classes[c['name']] = 80

class_num = {}

loading annotations into memory...
Done (t=0.09s)
creating index...
index created!
{'id': 80, 'name': 'hyena', 'supercategory': 'animal'}


In [None]:
classes, coco_labels, coco_labels_inverse, class_num

({'hyena': 80}, {80: 80}, {80: 80}, {})

In [None]:
!mkdir -p dataset/labels dataset/images

save_base_path  = 'dataset/labels/'
save_image_path = 'dataset/images/'

# remapping label id to 0~1
label_transfer = {80:80}

In [None]:
#writing the annotations to txt files according to the image names
for index, img_id in tqdm.tqdm(enumerate(img_ids), desc='change .json file to .txt file'):
    img_info = data_source.loadImgs(img_id)[0]

    # Change the path containing the folder to the file name
    save_name = img_info['file_name'].replace('/', '_')

    # Removing file extensions
    file_name = save_name.split('.')[0]

    # Get the width and height of each image
    height = img_info['height']
    width = img_info['width']

    # Save the converted txt file into tmp folder created
    save_path = save_base_path + file_name + '.txt'
    is_exist = False # To check whether the file already exists

    # print(save_path)
    with open(save_path, mode='w') as fp:
      annotation_id = data_source.getAnnIds(img_id)
      boxes = np.zeros((0, 5))
      if len(annotation_id) == 0: # if there are no annotations for that image or no classes in the image
            fp.write('')
            continue

      annotations = data_source.loadAnns(annotation_id)
      lines = ''

      for annotation in annotations:
        # to get the label of the object
        label = coco_labels_inverse[annotation['category_id']]

        if label in label_transfer.keys():
          is_exist = True
          box = annotation['bbox']
          if box[2] < 1 or box[3] < 1:
            continue
          box[0] = round((box[0] + box[2] / 2) / width, 6)
          box[1] = round((box[1] + box[3] / 2) / height, 6)
          box[2] = round(box[2] / width, 6)
          box[3] = round(box[3] / height, 6)

          if label not in class_num.keys():
            class_num[label] = 0

          class_num[label] += 1
          lines = lines + str(label)

          for i in box:
            lines += ' ' + str(i)
          lines += '\n'

      fp.writelines(lines)

    if is_exist:
        # if there is a target type directory, copy it
      shutil.copy('/content/hyena.coco/images/train2022/{}'.format(img_info['file_name']), os.path.join(save_image_path, save_name))
    else:
      os.remove(save_path)

change .json file to .txt file: 3104it [00:43, 71.39it/s] 


In [None]:
!mkdir -p copy_dataset/images copy_dataset/labels

#adding the prefix 'Hyena_ to the image names and label names and copying them into a new folder
img_src = r'/content/dataset/images'
labels_src = r'/content/dataset/labels'

img_tgt = '/content/copy_dataset/images'
labels_tgt = '/content/copy_dataset/labels'

new_pre = 'Hyena_'

for filename in os.listdir(img_src):
  if filename.endswith('.jpg'):
    # print(filename)
    os.rename(os.path.join(img_src, filename), os.path.join(img_tgt, ''.join([new_pre, filename])))


for filename in os.listdir(labels_src):
  if filename.endswith('.txt'):
    # print(filename)
    os.rename(os.path.join(labels_src, filename), os.path.join(labels_tgt, ''.join([new_pre, filename])))

In [None]:
# to remove a directory and its sub directories
directory = ['/content/hyena_dataset']

for dir in directory:
  shutil.rmtree(dir)

In [None]:
#splitting the files into test and train since the Kaggle dataset has this forlder format
splitfolders.ratio('/content/copy_dataset', output="hyena_dataset", seed=1337, ratio=(.8, 0.2))

Copying files: 0 files [00:00, ? files/s]
