#  Import Library

In [18]:
import json
import os
import numpy as np
import matplotlib.patches as patches
import matplotlib.pyplot as plt
from bs4 import BeautifulSoup
import PIL
from PIL import Image
import torchvision
from torchvision import transforms, datasets, models
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
import time
import random
import shutil
import xml.etree.ElementTree as ET
from tqdm import tqdm
import utils_objectdetection as utils
from utils_objectdetection import bbox_iou
import torch
!pip install pydrive2 python-telegram-bot
!pip install pydrive2
import torch.optim
import pkg_resources
from datetime import date
from pydrive2.auth import GoogleAuth
from pydrive2.drive import GoogleDrive
from oauth2client.service_account import ServiceAccountCredentials
from telegram import Bot



#  Check GPU Availability

In [19]:
if torch.cuda.is_available():
    device = torch.device("cuda")
    print('There are %d GPU(s) available.' % torch.cuda.device_count())
    print('We will use the GPU:', torch.cuda.get_device_name(0))

else:
    print('No GPU available, using the CPU instead.')
    device = torch.device("cpu")

There are 1 GPU(s) available.
We will use the GPU: Tesla P100-PCIE-16GB


# Load Dataset

In [20]:
source_path = '/kaggle/input/xml-and-coco-json-dataset'

destination_path = '/kaggle/working/xml-and-coco-json-dataset'

shutil.copytree(source_path, destination_path)

print("Dataset Loaded Successfully")

FileExistsError: [Errno 17] File exists: '/kaggle/working/xml-and-coco-json-dataset'

# Test-Train Separation

In [None]:
random.seed(384)

FAIR1M2 = '/kaggle/working/xml-and-coco-json-dataset/'

coco_json_path = os.path.join(FAIR1M2, 'Refined COCO2.json')

with open(coco_json_path, 'r') as f:
    data = json.load(f)

print("Total images:", len(data['images']))

# Calculate total objects
total_objects = len(data['annotations'])
print("Total objects:", total_objects)

test_dir = os.path.join(FAIR1M2, 'test_images')
train_dir = os.path.join(FAIR1M2, 'train_images')

os.makedirs(test_dir, exist_ok=True)

# Splitting ratio
test_ratio = 0.2  

num_images = len(data['images'])
num_images_test = int(num_images * test_ratio)

test_indices = random.sample(range(num_images), num_images_test)

# Initialize lists for train and test data
train_images = []
test_images = []
train_annotations = []
test_annotations = []

# Iterate through images and annotations
for img in data['images']:
    if img['id'] in test_indices:
        test_images.append(img)
    else:
        train_images.append(img)

# Filter annotations based on the selected images
for annotation in data['annotations']:
    if annotation['image_id'] in test_indices:
        test_annotations.append(annotation)
    else:
        train_annotations.append(annotation)

# Save test images and annotations to a JSON file
test_data = {'images': test_images, 'annotations': test_annotations, 'categories': data['categories']}
test_json_path = os.path.join(FAIR1M2, 'test.json')

with open(test_json_path, 'w') as test_file:
    json.dump(test_data, test_file, indent=4)

for img in test_images:
    img_name = img['file_name']
    shutil.move(os.path.join(FAIR1M2, 'images', img_name), os.path.join(test_dir, img_name))

train_data = {'images': train_images, 'annotations': train_annotations, 'categories': data['categories']}
train_json_path = os.path.join(FAIR1M2, 'train.json')

with open(train_json_path, 'w') as train_file:
    json.dump(train_data, train_file, indent=4)

# Calculate train and test objects
train_objects = len(train_annotations)
test_objects = len(test_annotations)

print("Train images:", len(train_images))
print("Train objects:", train_objects)
print("Test images:", len(test_images))
print("Test objects:", test_objects)

# Defining the Dataset Class

In [None]:
label_colors = {
1: '#FF0000', # red
2: '#008000', # green
3: '#0000FF', # blue
4: '#00FFFF', # cyan
5: '#FF00FF', # magenta
6: '#FFFF00', # yellow
7: '#000000', # black
8: '#FFFFFF', # white
9: '#FFC080', # red-orange
10: '#008080', # teal
11: '#000080', # navy blue
12: '#FFA500', # orange
13: '#800080', # purple
14: '#008040', # dark green
15: '#FF00BF', # pink
16: '#808000', # brown
17: '#0080FF', # sky blue
18: '#FFC0CB', # peach
19: '#8000FF', # violet
20: '#00FFFF', # cyan
21: '#FFD700', # gold
22: '#008000', # green
23: '#4B0082', # indigo
24: '#FF69B4', # hot pink
25: '#20B2AA', # light sea green
26: '#FFA07A', # light orange
27: '#008000', # green
28: '#FF00FF', # magenta
29: '#00FF00', # lime
30: '#FFC080', # red-orange
31: '#008080', # teal
32: '#000080', # navy blue
33: '#FFA500', # orange
34: '#800080', # purple
35: '#008040', # dark green
36: '#FF00BF', # pink
37: '#808000', # brown
}
# label_colors = {i: '#2fc557' for i in range(1, 38)}


def generate_box(obj):
    return [
        float(obj['bbox'][0]),
        float(obj['bbox'][1]),
        float(obj['bbox'][0] + obj['bbox'][2]),
        float(obj['bbox'][1] + obj['bbox'][3])
    ]

def generate_label(obj):
    name = obj['category_id']
    return list(label_colors.keys()).index(name) + 1




def generate_target(image_id, json_data):
    target = {
        'boxes': [],
        'labels': []
    }
    for annotation in json_data['annotations']:
        if annotation['image_id'] == image_id:
            xmin, ymin, xmax, ymax = annotation['bbox']
            target['boxes'].append([xmin, ymin, xmax, ymax])
            target['labels'].append(generate_label(annotation))
    target['boxes'] = torch.as_tensor(target['boxes'], dtype=torch.float32)
    target['labels'] = torch.as_tensor(target['labels'], dtype=torch.int64)
    return target

def plot_image_from_output(img, annotation):
    img = img.cpu().permute(1, 2, 0)
    fig, ax = plt.subplots(1)
    ax.imshow(img)

    for idx, label in enumerate(annotation["labels"]):
        xmin, ymin, xmax, ymax = annotation["boxes"][idx]
        rect = patches.Rectangle(
            (xmin, ymin), (xmax - xmin), (ymax - ymin),
            linewidth=1, edgecolor=label_colors[label], facecolor='none'
        )
        ax.add_patch(rect)

    plt.show()

In [None]:
class MaskDataset(object):
    def __init__(self, transforms, path, json_path):
        self.transforms = transforms
        self.path = path
        self.imgs = list(sorted(os.listdir(self.path)))
        self.json_path = json_path
        self.json_data = json.load(open(json_path, 'r'))

    def __getitem__(self, idx):
        file_image = self.imgs[idx]
        img_path = os.path.join(self.path, file_image)
        img = Image.open(img_path).convert("RGB")

        # Generate Label
        file_label = file_image[:-3] + 'jpg'
        image_id = [img['id'] for img in self.json_data['images'] if img['file_name'] == file_label][0]
        target = generate_target(image_id, self.json_data)

        if self.transforms is not None:
            img = self.transforms(img)

        return img, target

    def __len__(self):
        return len(self.imgs)

data_transform = transforms.Compose([
        transforms.ToTensor()
    ])

train_dataset = MaskDataset(data_transform, '/kaggle/working/xml-and-coco-json-dataset/images', '/kaggle/working/xml-and-coco-json-dataset/train.json')
test_dataset = MaskDataset(data_transform, '/kaggle/working/xml-and-coco-json-dataset/test_images', '/kaggle/working/xml-and-coco-json-dataset/test.json')

def collate_fn(batch):
    return tuple(zip(*batch))

train_data_loader = torch.utils.data.DataLoader(train_dataset, batch_size=3, collate_fn=collate_fn)
test_data_loader = torch.utils.data.DataLoader(test_dataset, batch_size=2, collate_fn=collate_fn)

# Import Model

In [None]:
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
!pip install efficientnet_pytorch
from efficientnet_pytorch import EfficientNet

class EfficientNetBackbone(torch.nn.Module):
    def __init__(self, efficientnet_model):
        super(EfficientNetBackbone, self).__init__()
        self.features = efficientnet_model
        self.out_channels = efficientnet_model._conv_head.out_channels

    def forward(self, x):
        x = self.features.extract_features(x)
        return x

def get_model_instance_segmentation(num_classes):
    # Load pre-trained EfficientNet-B2 model
    efficientnet_model = EfficientNet.from_name('efficientnet-b2', num_classes=38)

    # Define the Faster R-CNN model with EfficientNet backbone
    backbone = EfficientNetBackbone(efficientnet_model)
    model = FasterRCNN(
        backbone,
        num_classes=num_classes,
        rpn_anchor_generator=AnchorGenerator(
            sizes=((32, 64, 128, 256, 512),),
            aspect_ratios=((0.5, 1.0, 2.0),)
        ),
    )

    # Replace the box predictor with FastRCNNPredictor
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    return model

In [None]:
model = get_model_instance_segmentation(38)

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

In [None]:
torch.cuda.is_available()

# GDrive-Telegram API Configuration

In [None]:
telegram_token = "6753078681:AAEf5Q6L9Dlbs32axb7vvZxqhK_niaKjdwE"
channel_name = "-1002087711269"  
bot = Bot(token=telegram_token)

async def send_telegram_message(message):
    await bot.send_message(chat_id=channel_name, text=message)

def upload_file_to_gdrive(file_from, file_to, folder_name):
    gauth = GoogleAuth()
    gauth.credentials = ServiceAccountCredentials.from_json_keyfile_name(
        "/kaggle/input/drive-credential/credentials.json", scopes=['https://www.googleapis.com/auth/drive'])

    drive = GoogleDrive(gauth)
    today = date.today().strftime("%m/%d/%y")

    parent_directory_id = '1HDTRIbB9yF918oBtN5zWmFQHgFOiU-II'

    folder_meta = {
        "title":  folder_name,
        "parents": [{'id': parent_directory_id}],
        'mimeType': 'application/vnd.google-apps.folder'
    }

    # check if folder already exists or not
    folder_id = None
    foldered_list = drive.ListFile(
        {'q':  "'"+parent_directory_id+"' in parents and trashed=false"}).GetList()

    for file in foldered_list:
        if (file['title'] == folder_name):
            folder_id = file['id']

    if folder_id is None:
        folder = drive.CreateFile(folder_meta)
        folder.Upload()
        folder_id = folder.get("id")

    file1 = drive.CreateFile(
        {'parents': [{"id": folder_id}], 'title': file_to})
    
    file1.SetContentFile(file_from)
    file1.Upload()
    
    # Get the file ID
    file_id = file1['id']
    
    file_link = f"https://drive.google.com/uc?export=download&id={file_id}"
    
    return file_link

# Training

In [None]:
#Copy Saved Model
source_path = '/kaggle/input/fair1m-model-checkpoints/pytorch/efficientnet-b2/7'

destination_path = '/kaggle/working/checkpoint/'
shutil.copytree(source_path, destination_path)

print("Checkpoint Copied Successfully")

In [None]:
folder_name = "EfficientNet-B2" 
start_epoch = 31
num_epochs = 100
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005,
                                momentum=0.9, weight_decay=0.0005)

In [None]:
checkpoint_dir = '/kaggle/working/checkpoint/'
if not os.path.exists(checkpoint_dir):
    os.makedirs(checkpoint_dir)

checkpoint_path = f'/kaggle/working/checkpoint/checkpoint{start_epoch}.pt'

if os.path.exists(checkpoint_path):
    model.load_state_dict(torch.load(checkpoint_path))
    print("Checkpoint loaded successfully.")

In [None]:
PIL.Image.MAX_IMAGE_PIXELS = None

In [None]:
print('----------------------train start--------------------------')

cls_losses = []
box_losses = []
total_losses = []

async def main():
    for epoch in range(start_epoch, num_epochs):
        start = time.time()
        model.train()
        
        total_cls_loss = 0.0
        total_box_loss = 0.0
        total_epoch_loss = 0.0

        for imgs, annotations in train_data_loader:
            imgs = [img.to(device) for img in imgs]
            annotations = [{k: v.to(device) for k, v in t.items()} for t in annotations]
            loss_dict = model(imgs, annotations)
            losses_batch = sum(loss for loss in loss_dict.values())
            cls_loss = loss_dict['loss_classifier']
            box_loss = loss_dict['loss_box_reg']

            optimizer.zero_grad()
            losses_batch.backward()
            optimizer.step()

            total_cls_loss += cls_loss.item()
            total_box_loss += box_loss.item()
            total_epoch_loss += losses_batch.item()


        epoch_file_name = f'{folder_name}_checkpoint_epoch{epoch+1}.pt'
        checkpoint_path = os.path.join(checkpoint_dir, epoch_file_name)
        torch.save(model.state_dict(), checkpoint_path)

        # Upload the file
        file_link = upload_file_to_gdrive(checkpoint_path, epoch_file_name, folder_name)

        # Send message to Telegram
        message = f"{folder_name} | epoch : {epoch+1}, cls_loss : {total_cls_loss}, box_loss : {total_box_loss}, total_loss : {total_epoch_loss}\n{file_link}"
        await send_telegram_message(message)

        cls_losses.append(total_cls_loss)
        box_losses.append(total_box_loss)
        total_losses.append(total_epoch_loss)

        print(f'epoch : {epoch+1}, cls_loss : {total_cls_loss}, box_loss : {total_box_loss}, total_loss : {total_epoch_loss}, time : {time.time() - start}')

await main()


In [None]:
# Plotting the loss
plt.figure(figsize=(10, 6)) 
plt.plot(range(start_epoch, epoch + 1), losses, label='Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title(f'Training Loss on {folder_name} ({start_epoch+1}-{num_epochs} Epoch)')
# plt.legend()
plt.show()



# # Plotting Manual  loss
# losses = [4219.66845703125,
#           3294.255126953125,
#           2996.263916015625,
#           2855.10693359375,
#           2750.4365234375,
#           2686.026123046875,
#           2617.944580078125,
#           2569.9990234375,
#           2538.9892578125,
#           2485.15966796875,
#           2440.21533203125, 
#           2415.9052734375, 
#           2396.2783203125, 
#           2362.054443359375, 
#           2346.248046875, 
#           2315.4921875, 
#           2288.581298828125, 
#           2289.169921875, 
#           2254.004150390625, 
#           2246.160888671875,
#           2244.7666015625,
#           2210.5478515625,
#           2205.06689453125,
#           2182.842529296875,
#           2185.711181640625,
#           2174.51708984375,
#           2141.72119140625,
#           2134.73974609375,
#           2135.479736328125,
#           2104.87109375,
#           2103.5703125,
#           2091.90087890625,
#           2093.547119140625,
#           2053.4951171875,
#           2050.3564453125,
#           2047.2796630859375,
#           2064.235107421875,
#           2016.824951171875,
#           2016.7198486328125,
#           2016.572021484375,
#           2005.3057861328125,
#           2007.4322509765625,
#           1999.246826171875,
#           1979.5877685546875,
#           1989.076416015625,
#           1983.7818603515625,
#           1936.830078125,
#           1960.492431640625,
#           1949.1075439453125,
#           1944.2293701171875,
#           1950.8743896484375,
#           1945.679931640625,
#           1937.458740234375,
#           1939.6971435546875,
#           1915.462890625,
#           1913.741455078125,
#           1915.2259521484375,
#           1922.7769775390625,
#           1905.7222900390625,
#           1886.150390625
#          ]

# num_epochs = len(losses)


# import matplotlib.pyplot as plt

# plt.figure(figsize=(10, 6))
# plt.plot(range(1, len(losses) + 1), losses, label='Training Loss')
# plt.xlabel('Epoch')
# plt.ylabel('Loss')
# plt.title(f'Training Loss on ResNet-50 (1-60 Epoch)')
# # plt.legend()
# plt.show()

# Testing

In [None]:
# def make_prediction(model, img, threshold):
#     model.eval()
#     preds = model(img)
#     for id in range(len(preds)) :
#         idx_list = []

#         for idx, score in enumerate(preds[id]['scores']) :
#             if score > threshold :
#                 idx_list.append(idx)

#         preds[id]['boxes'] = preds[id]['boxes'][idx_list]
#         preds[id]['labels'] = preds[id]['labels'][idx_list]
#         preds[id]['scores'] = preds[id]['scores'][idx_list]

#     return preds

In [None]:
# def plot_image_from_output(img, annotation):
#     img = img.cpu().numpy().transpose(1, 2, 0)
#     fig, ax = plt.subplots(1, figsize=(14, 14))
#     ax.imshow(img)

#     for idx, label in enumerate(annotation["labels"]):
#         xmin, ymin, xmax, ymax = annotation["boxes"][idx].cpu().numpy()
#         rect = patches.Rectangle(
#             (xmin, ymin), (xmax - xmin), (ymax - ymin),
#             linewidth=1, edgecolor=label_colors[label.item()], facecolor='none'
#         )
#         ax.add_patch(rect)

#     plt.show()


# random.seed(384)  

# with torch.no_grad():
#     total_images = len(test_data_loader.dataset)
    
#     # Select 10 random indices
#     random_indices = random.sample(range(total_images), 10)
    
#     for idx in random_indices:
#         imgs, annotations = test_data_loader.dataset[idx]
#         imgs = [imgs.to(device)]

#         pred = make_prediction(model, imgs, 0.5)

#         print("Target : ", annotations['labels'])
#         plot_image_from_output(imgs[0], annotations)
#         print("Prediction : ", pred[0]['labels'])
#         plot_image_from_output(imgs[0], pred[0])

# Plot

In [None]:
# labels = []
# preds_adj_all = []
# annot_all = []

# for im, annot in tqdm(test_data_loader, position = 0, leave = True):
#     im = list(img.to(device) for img in im)

#     for t in annot:
#         labels += t['labels']

#     with torch.no_grad():
#         preds_adj = make_prediction(model, im, 0.5)
#         preds_adj = [{k: v.to(torch.device('cpu')) for k, v in t.items()} for t in preds_adj]
#         preds_adj_all.append(preds_adj)
#         annot_all.append(annot)

In [None]:
# sample_metrics = []
# for batch_i in range(len(preds_adj_all)):
#     sample_metrics += utils.get_batch_statistics(preds_adj_all[batch_i], annot_all[batch_i], iou_threshold=0.5) 

# true_positives, pred_scores, pred_labels = [torch.cat(x, 0) for x in list(zip(*sample_metrics))]  # all the batches get concatenated
# precision, recall, AP, f1, ap_class = utils.ap_per_class(true_positives, pred_scores, pred_labels, torch.tensor(labels))
# mAP = torch.mean(AP)
# print(f'mAP : {mAP}')
# print(f'AP : {AP}')

In [None]:
# def plot_iou_curve(outputs, targets):
#     iou_scores = []
#     total_objects = 0
#     for output, annotations in zip(outputs, targets):
#         if output is None:
#             continue

#         for sample_i in range(len(output)):
#             true_positives = torch.zeros(output[sample_i]['boxes'].shape[0])

#             target_labels = annotations[sample_i]['labels'] if len(annotations[sample_i]) else []
#             total_objects += len(target_labels)
#             if len(annotations[sample_i]):
#                 detected_boxes = []
#                 target_boxes = annotations[sample_i]['boxes']

#                 for pred_i, (pred_box, pred_label) in enumerate(zip(output[sample_i]['boxes'], output[sample_i]['labels'])):
#                     if len(detected_boxes) == len(target_labels):
#                         break

#                     if pred_label not in target_labels:
#                         continue

#                     iou, _ = bbox_iou(pred_box.unsqueeze(0), target_boxes).max(0)
#                     if iou >= 0:
#                         true_positives[pred_i] = 1
#                         detected_boxes.append(_)
#                         iou_scores.append(iou.item()) 

#     plt.figure(figsize=(10, 6))
#     plt.hist(iou_scores, bins=50, density=True, alpha=0.75, edgecolor='black')
#     plt.xlabel('IOU Score')
#     plt.ylabel('Frequency')
#     plt.title('IOU Distribution')
#     plt.grid(True)
#     plt.show()
    
#     print("Total number of objects used:", total_objects)

# plot_iou_curve(preds_adj_all, annot_all)

In [None]:
# def plot_curves(true_positives, pred_scores, pred_labels, target_labels):
#     # Sorting by confidence
#     sorted_indices = torch.argsort(-pred_scores)
#     true_positives = true_positives[sorted_indices]
#     pred_scores = pred_scores[sorted_indices]
#     pred_labels = pred_labels[sorted_indices]

#     # Compute cumulative true positives and false positives
#     cum_tp = torch.cumsum(true_positives, dim=0)
#     cum_fp = torch.cumsum(1 - true_positives, dim=0)

#     precision = cum_tp / (cum_tp + cum_fp + 1e-16)
#     recall = cum_tp / (len(target_labels) + 1e-16)
#     f1 = 2 * precision * recall / (precision + recall + 1e-16)

#     # Precision-Confidence Curve
#     plt.figure(figsize=(10, 6))
#     plt.plot(pred_scores.cpu().numpy(), precision.cpu().numpy(), label="Precision")
#     plt.xlabel('Confidence')
#     plt.ylabel('Precision')
#     plt.title('Precision-Confidence Curve')
#     plt.legend()
#     plt.grid(True)
#     plt.show()

#     # Recall-Confidence Curve
#     plt.figure(figsize=(10, 6))
#     plt.plot(pred_scores.cpu().numpy(), recall.cpu().numpy(), label="Recall")
#     plt.xlabel('Confidence')
#     plt.ylabel('Recall')
#     plt.title('Recall-Confidence Curve')
#     plt.legend()
#     plt.grid(True)
#     plt.show()

#     # F1-Confidence Curve
#     plt.figure(figsize=(10, 6))
#     plt.plot(pred_scores.cpu().numpy(), f1.cpu().numpy(), label="F1 Score")
#     plt.xlabel('Confidence')
#     plt.ylabel('F1 Score')
#     plt.title('F1-Confidence Curve')
#     plt.legend()
#     plt.grid(True)
#     plt.show()

#     # Precision-Recall Curve
#     plt.figure(figsize=(10, 6))
#     plt.plot(recall.cpu().numpy(), precision.cpu().numpy(), label="Precision-Recall")
#     plt.xlabel('Recall')
#     plt.ylabel('Precision')
#     plt.title('Precision-Recall Curve')
#     plt.legend()
#     plt.grid(True)
#     plt.show()

# plot_curves(true_positives, pred_scores, pred_labels, torch.tensor(labels))