In [None]:
pwd

In [None]:
%pip install selectivesearch torchsummary

In [None]:
## Python in-built tools 
import os
import pickle
from tqdm import tqdm

## Data Science Tools 
import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.patches as patches 
%matplotlib inline
import torch
import torch.nn as nn

## Image Processing Tools 
import cv2
import selectivesearch
from torch.utils.data import Dataset, DataLoader 


## Frameworks
from torchvision import transforms
from torchvision import models
from torchsummary import summary


Hyper Parameters

In [None]:
image_paths = r"/kaggle/input/open-images-bus-trucks/images/images"
csv_path = r"/kaggle/input/open-images-bus-trucks/df.csv"
batch_size = 2
n_epochs = 5
learning_rate = 1e-4
threshold_iou = 0.3
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

# open image datasets loading

In [None]:
class OpenImageDataset(Dataset):

    def __init__(self, image_paths, csv_path):
        super().__init__()
        self.image_paths = image_paths
        self.csv_path = csv_path
        self.df = pd.read_csv(csv_path)
        self.unique_images = self.df['ImageID'].unique()

    def __len__(self):
        return len(self.unique_images)
    
    def __getitem__(self, index):
        image_id = self.unique_images[index]
        image_full_path = os.path.join(self.image_paths, image_id + ".jpg")
        image = cv2.imread(image_full_path, cv2.IMREAD_COLOR)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        h, w, _ = image.shape
        df = self.df.loc[self.df['ImageID'] == image_id]

        bboxes = df[['XMin', 'YMin', 'XMax', 'YMax']].values
        bboxes = (bboxes * np.array([w, h, w, h])).astype(np.uint16)

        classes = df['LabelName'].values
        return image, bboxes, classes, image_full_path
        return image_full_path
    
datasets = OpenImageDataset(image_paths, csv_path)
datasets[0]

In [None]:
len(datasets)

In [None]:
img, bbx, _, _ = datasets[14]
plt.imshow(img)
plt.axis('off')

for bb in bbx:
    rect = patches.Rectangle(bb[:2], bb[2]-bb[0], bb[3]-bb[1], edgecolor = 'r', facecolor = 'none', linewidth = 1)
    plt.gca().add_patch(rect)
plt.show


### Utils Function

In [None]:
def extract_candidates(img):
    _, regions = selectivesearch.selective_search(img, scale=4, min_size=20)
    candidates = []
    img_area = np.prod(img.shape[:2])
    for region in regions:
        if region['rect'] in candidates:
            continue
        if region['size'] < 0.05*img_area:
            continue
        if region['size'] > img_area:
            continue 
        candidates.append(region['rect'])
    return candidates

def extract_iou(bbox1, bbox2, epsilon=1e-5):
    x1 = max(bbox1[0], bbox2[0])
    y1 = max(bbox1[1], bbox2[1])
    
    x2 = min(bbox1[2], bbox2[2])
    y2 = min(bbox1[3], bbox2[3])

    width = x2 - x1
    height = y2 - y1

    if width < 0 or height < 0:
        return 0
    
    intersection_area = width * height 
    area_1 = (bbox1[2] - bbox1[0]) * (bbox1[3] - bbox1[1])
    area_2 = (bbox2[2] - bbox2[0]) * (bbox2[3] - bbox2[1])
    union_area = area_1 + area_2 - intersection_area
    return intersection_area / (union_area + epsilon)


In [None]:
# FULL_PATHS, GTBBS, CLSS, DELTAS, ROIS, IOUS = [], [], [], [], [], []
# # N = 100
# for i, (image, bboxes, classes, image_full_path) in enumerate(datasets):
#     # if i == N:
#     #     break

#     H, W, _ = image.shape
#     candidates = extract_candidates(image)
#     candidates = np.array([(x, y, x+w, y+h) for x, y, w, h in candidates])

#     clss, deltas, rois = [], [], []
#     ious = np.array([[extract_iou(candidate, bbox) for bbox in bboxes] for candidate in candidates])
    
#     for j, candidate in enumerate (candidates):
#         cx, cy, cX, cY = candidate
#         candidate_ious = ious[j]
#         best_iou_at = np.argmax(candidate_ious)
#         best_iou = candidate_ious[best_iou_at]
#         best_bb = _x, _y, _X, _Y = bboxes[best_iou_at]

#         if best_iou > threshold_iou:
#             clss.append(classes[best_iou_at])
#         else:
#             clss.append('background')

#         delta = np.array([_x - cx, _y - cy, _X - cX, _Y - cY]) / np.array([W, H, W, H])
#         deltas.append(delta)

#         rois.append(candidate / np.array([W, H, W, H]))

#     FULL_PATHS.append(image_full_path)
#     GTBBS.append(bboxes)
#     CLSS.append(clss)
#     DELTAS.append(deltas)
#     ROIS.append(rois)
#     IOUS.append(ious)


#### multiprocessing mathi ko satta tala ko 2 ota section garyo vani mathi 4 hour ko kam 1 hour ma huncha

In [None]:
# def extract_all(t):
#     image, bboxes, classes, image_full_path = t
#     H, W, _ = image.shape
#     candidates = extract_candidates(image)
#     candidates = np.array([(x, y, x+w, y+h) for x,y,w,h in candidates])
#     clss, deltas, rois = [], [], []
#     ious = np.array([extract_iou(candidate, bbox) for bbox in bboxes] for candidate in candidates])
#     for j, candidate in enumerate(candidates):
#         cx, cy, CX, CY = candidate
#         candidate_ious = ious[j]
#         best_iou_at = np.argmax(candidate_ious)
#         best_iou = candidate_ious[best_iou_at]
#         best_bb = (_x, _y, _X, _Y) = bboxes[best_iou_at]
        
#         if best_iou > threshold_iou:
#             clss.append(classes[best_iou_at])
#         else:
#             clss.append('background')
        
#         delta = np.array([_x - cx, _y - cy, _X - CX, _Y - CY]) / np.array([W, H, W, H])
#         deltas.append(delta)
#         rois.append(candidate / np.array([W, H, W, H]))
    
#     return image_full_path, bboxes, clss, deltas, rois, ious


In [None]:
# from multiprocessing import Pool
# FULL_PATHS, GTBBS, CLSS, DELTAS, ROIS, IOUS = [], [], [], [], [], []

# with Pool(14) as p:
#     results = p.imap(extract_all, datasets)
#     for result in tqdm(results):
#         image_full_path, bboxes, clss, deltas, rois, ious = result
#         FULL_PATHS.append(image_full_path)
#         GTBBS.append(bboxes)
#         CLSS.append(clss)
#         DELTAS.append(deltas)
#         ROIS.append(rois)
#         IOUS.append(ious)


### Pickle Utils Function

In [None]:

def save_pickle(var, path):
    with open(path, 'wb') as file:
        pickle.dump(var, file)
        
def load_pickle(path):
    with open(path, 'rb') as file:
        return pickle.load(file)
        
# save_pickle(FULL_PATHS, 'full_paths.pkl')


# save_pickle(GTBBS, 'gtbbs.pkl')
# save_pickle(CLSS, 'clss.pkl')   
# save_pickle(DELTAS, 'deltas.pkl')
# save_pickle(ROIS, 'rois.pkl')
# save_pickle(IOUS, 'ious.pkl')

# F_PATH = load_pickle('paths.pkl')
# F_PATH

### Datasets for R-CNN

In [None]:
FULL_PATHS = load_pickle('/kaggle/input/pkl-files/fpath.pkl')
GTBBS = load_pickle('/kaggle/input/pkl-files/gtbbs.pkl')
CLSS = load_pickle('/kaggle/input/pkl-files/clss.pkl')
DELTAS = load_pickle('/kaggle/input/pkl-files/deltas.pkl')
ROIS = load_pickle('/kaggle/input/pkl-files/rois.pkl')
IOUS = load_pickle('/kaggle/input/pkl-files/ious.pkl')

In [None]:
unique_labels = np.unique(np.array([c for clss in CLSS for c in clss])) # 2d data lai 1d ma lerayera unique label nikaleko
target2label = {i:label for i, label in enumerate(unique_labels)}
label2target = {label:i for i, label in enumerate(unique_labels)}

print(target2label)
print(label2target)
background_class = label2target['background']
print(background_class)

In [None]:
len(FULL_PATHS)

In [None]:
# # sir ko code but error ayo

# def preprocess(crop_img):
#     crop_img = torch.tensor(crop_img).permute(2, 0, 1)
#     normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
#     crop_img = normalize(crop_img)
#     return crop_img.float()


# class RCNNDataset(Dataset):
#     def __init__(self, fpaths, rois, gtbbs, labels, deltas, ious):
#         super().__init__()
#         self.fpaths = fpaths
#         self.rois = rois
#         self.gtbbs = gtbbs
#         self.labels = labels
#         self.deltas = deltas
#         self.ious = ious
#         self.label2target = {'background':0, 'bus':1}

#     def __len__(self):
#         return len(self.fpaths)

#     def __getitem__(self, index):
#         fpath = self.fpaths[index]
#         image = cv2.imread(fpath, cv2.IMREAD_COLOR)[..., ::-1]
#         H, W, _ = image.shape
        
#         gtbbs = self.gtbbs[index]
        
#         rois = self.rois[index]
#         bbs = (rois * np.array([W, H, W, H])).astype(np.uint8)
        
#         crops = [image[y:Y ,x:X] for x, y, X, Y in bbs]
#         labels = self.labels[index]
#         deltas = self.deltas[index]
#         fpath = self.fpaths[index]
        
#         return image, gtbbs, bbs ,crops , labels, deltas, fpath

        
#     def collate_fn(self, batch):
#         inputs, output_labels, output_deltas = [], [], []
#         for i in range(len(batch)):
#             image, gtbbs, bbs, crops, labels, deltas, fpath = batch[i]
#             crops = [cv2.resize(crop, (224, 224)) for crop in crops]
#             crops = [preprocess(crop/255.0)[None] for crop in crops]
#             inputs.extend(crops)
#             output_labels.extend([label2target[label] for label in labels])
#             output_deltas.extend(deltas)
            
#     # yo tala ko 3 ota line le garda train garna lai feasible banako ho
#         inputs = torch.cat(inputs).to(device)
#         output_labels = torch.tensor(output_labels).long().to(device)
#         output_deltas = torch.tensor(output_deltas).float().to(device)
        
#         return inputs, output_labels, output_deltas

            
            
# # [] = [1,2,4] + [6, 7, 8]-> extend -> [1,2,4, 6, 7, 8]
# # [] = [1,2,4] -> append -> [[1,2,4]]

#         # Further processing (not defined yet)
#         # You can add more code to preprocess inputs and handle labels, deltas, etc.

# # n_train = len(FULL_PATHS) * 8//10

# dataset = RCNNDataset(FULL_PATHS, GTBBS, CLSS, DELTAS , ROIS, IOUS)
# # len(dataset)
# dataset.collate_fn([dataset[10], dataset[11]])

In [None]:
def preprocess(crop_img):
    crop_img = torch.tensor(crop_img).permute(2, 0, 1)
    normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    crop_img = normalize(crop_img)
    return crop_img.float()


class RCNNDataset(Dataset):
    def __init__(self, fpaths, rois, gtbbs, labels, deltas, ious):
        super().__init__()
        self.fpaths = fpaths
        self.rois = rois
        self.gtbbs = gtbbs
        self.labels = labels
        self.deltas = deltas
        self.ious = ious
        self.label2target = {'background': 0, 'bus': 1}

    def __len__(self):
        return len(self.fpaths)

    def __getitem__(self, index):
        fpath = self.fpaths[index]
        image = cv2.imread(fpath, cv2.IMREAD_COLOR)[..., ::-1]  # Convert BGR to RGB
        H, W, _ = image.shape

        gtbbs = self.gtbbs[index]

        rois = self.rois[index]
        bbs = (rois * np.array([W, H, W, H])).astype(np.uint8)  # Convert bounding boxes to integers

        # bbs is required because the Selective Search algorithm may return bounding boxes outside the image
        bbs = np.clip(bbs, 0, [W, H, W, H])

        # Get crops and check if valid
        # crops = []
        # for x, y, X, Y in bbs:
        #     if X > x and Y > y:  # Ensure bounding box has a valid area
        #         crop = image[y:Y, x:X]
        #         if crop.size > 0:  # Ensure the crop is not empty
        #             crops.append(crop)
        
        crops = [image[y:Y, x:X] for x, y, X, Y in bbs]
        labels = self.labels[index]
        deltas = self.deltas[index]
        
        return image, gtbbs, bbs, crops, labels, deltas, fpath

    def collate_fn(self, batch):
        inputs, output_labels, output_deltas = [], [], []
        for i in range(len(batch)):
            image, gtbbs, bbs, crops, labels, deltas, fpath = batch[i]
            
            # Resize valid crops and preprocess them
            crops = [cv2.resize(crop,(224, 224)) for crop in crops ]
            crops = [preprocess(crop/255.0)[None] for crop in crops]
            
            inputs.extend(crops)
            output_labels.extend([label2target[label] for label in labels])
            output_deltas.extend(deltas)

    # yo tala ko 3 ota line le garda train garna lai feasible banako ho
        inputs = torch.cat(inputs).to(device)
        output_labels = torch.tensor(output_labels).long().to(device)
        output_deltas = torch.tensor(output_deltas).float().to(device)

        return inputs, output_labels, output_deltas
    


# [] = [1,2,4] + [6, 7, 8]-> extend -> [1,2,4, 6, 7, 8]
# [] = [1,2,4] -> append -> [[1,2,4]]

        # Further processing (not defined yet)
        # You can add more code to preprocess inputs and handle labels, deltas, etc.

n_train = len(FULL_PATHS) * 8//10
train_dataset = RCNNDataset(FULL_PATHS[:n_train], ROIS[:n_train], GTBBS[:n_train], CLSS[:n_train], DELTAS[:n_train], IOUS[:n_train])
test_dataset = RCNNDataset(FULL_PATHS[n_train:], ROIS[n_train:], GTBBS[n_train:], CLSS[n_train:], DELTAS[n_train:], IOUS[n_train:])


# Dataloader

In [None]:
# Initialize DataLoader for the training and test datasets
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=train_dataset.collate_fn)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True, collate_fn=test_dataset.collate_fn)


## Model architecture

In [None]:
backbone = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V2)
backbone.fc = nn.Sequential()

for param in backbone.parameters():
    param.requires_grad = False
backbone.to(device=device)

In [None]:
summary(backbone.to(device), (3, 224, 224))

In [None]:
class RCNN(nn.Module):
    def __init__(self, backbone, n_classes):
        super().__init__()
        self.backbone = backbone
        self.n_classes = n_classes
        
        self.classification_head = nn.Linear(2048,n_classes)

        self.bbox_localization_head = nn.Sequential(
            nn.Linear(2048, 512),
            nn.ReLU(inplace=True),
            nn.Linear(512, 4),
            nn.Tanh()
        )
        
        
        
        self.classification_loss = nn.CrossEntropyLoss()
        self.localization_loss = nn.L1Loss()
        
        self.lmbda = 10.0 #priotizes localization loss over classification loss
        
        
        
    def forward(self, inputs):
        feat = self.backbone(inputs)
        
        cls_score = self.classification_head(feat)
        
        deltas = self.bbox_localization_head(feat)
        
        return cls_score, deltas
    
    
    def calculate_loss(self, _labels, _deltas, actual_labels, actual_deltas):
        
        #classification loss   
        classification_loss = self.classification_loss(_labels, actual_labels)
        
        #localization loss
        ix = torch.where(actual_labels != background_class)[0]
        _deltas = _deltas[ix]
        actual_deltas = actual_deltas[ix]
        
        if (len(ix)>0):
            localization_loss = self.localization_loss(_deltas, actual_deltas)
        else:
            localization_loss = torch.tensor(0)
            
        total_loss = classification_loss + self.lmbda * localization_loss
        
        return total_loss, classification_loss, localization_loss
            
        
# inputs, _ , targets_deltas = next(iter(train_dataloader))
# rcnn = RCNN(backbone=backbone, n_classes=len(unique_labels)).to(device=device)
# rcnn(inputs)

In [None]:
# Loss Function
# Train Batch
# validate batch
#validate batch
#test prediction

In [None]:
def train_batch(model, optimizer, inputs, actual_labels, deltas):
    model.train()
    optimizer.zero_grad()

    # forward pass
    _labels, _deltas = model(inputs)
    total_loss, classification_loss, localization_loss = model.calculate_loss(_labels, _deltas, actual_labels, deltas)
    conf, pred_labels = _labels.max(-1)
    acc = pred_labels == actual_labels

    # backward pass
    total_loss.backward()
    optimizer.step()

    return _labels, _deltas, total_loss, classification_loss, localization_loss, acc


In [None]:
@torch.no_grad
def validate_batch(model, inputs, actual_labels, deltas):
    model.eval()
    _labels, _deltas = model(inputs)
    total_loss, classification_loss, localization_loss = model.calculate_loss(_labels, _deltas, actual_labels, deltas)

    conf, pred_labels = _labels.max(-1)
    acc = pred_labels == actual_labels

    return _labels, _deltas, total_loss, classification_loss, localization_loss, acc


In [None]:
rcnn = RCNN(backbone, 3).to(device=device)
optimizer = torch.optim.SGD(rcnn.parameters(), lr=learning_rate)

In [None]:
# training pipeline
train_history = {
    'total_loss': [],
    'detection_loss': [],
    'localization_loss': [],
    'accuracy': []
}

test_history = {
    'total_loss': [],
    'detection_loss': [],
    'localization_loss': [],
    'accuracy': []
}

for epoch in range(1, n_epochs + 1):
    epoch_train_total_loss = 0
    epoch_train_detection_loss = 0
    epoch_train_localization_loss = 0
    epoch_train_acc = []

    for inputs, labels, deltas in tqdm(train_dataloader, desc=f'Training {epoch} of {n_epochs}'):
        _inputs ,_deltas, total_loss , classification_loss, localization_loss, acc = train_batch(rcnn, optimizer, inputs, labels, deltas)
        epoch_train_total_loss += total_loss.item()
        epoch_train_detection_loss += classification_loss.item()
        epoch_train_localization_loss += localization_loss.item()
        epoch_train_acc.extend(acc.tolist())
        
    epoch_train_total_loss /= len(train_dataloader)
    epoch_train_detection_loss /= len(train_dataloader)
    epoch_train_localization_loss /= len(train_dataloader)
    epoch_train_acc = sum(epoch_train_acc)  / len(epoch_train_acc)
        
    epoch_test_total_loss = 0
    epoch_test_detection_loss = 0
    epoch_test_localization_loss = 0
    epoch_test_acc = []

    for inputs, labels, deltas in tqdm(test_dataloader, desc=f'Testing '):
        _inputs ,_deltas, total_loss ,classification_loss, localization_loss, acc = validate_batch(rcnn, inputs, labels, deltas)
        epoch_test_total_loss += total_loss.item()
        epoch_test_detection_loss += classification_loss.item()
        epoch_test_localization_loss += localization_loss.item()
        epoch_test_acc.extend(acc.tolist())
        
    epoch_test_total_loss /= len(test_dataloader)   
    epoch_test_detection_loss /= len(test_dataloader)
    epoch_test_localization_loss /= len(test_dataloader)
    epoch_test_acc = sum(epoch_test_acc) / len(test_dataloader)

    train_history.get('total_loss').append(epoch_train_total_loss)
    train_history.get('detection_loss').append(epoch_train_detection_loss)
    train_history.get('localization_loss').append(epoch_train_localization_loss)
    train_history.get('accuracy').append(epoch_train_acc)
    
    test_history.get('total_loss').append(epoch_test_total_loss)
    test_history.get('detection_loss').append(epoch_test_detection_loss)
    test_history.get('localization_loss').append(epoch_test_localization_loss)
    test_history.get('accuracy').append(epoch_test_acc)
    
    print(f'Epoch {epoch} of {n_epochs}, Training_loss: {epoch_train_total_loss}, Testing Detection Loss: {epoch_test_total_loss}, Testing Localization Loss: {epoch_test_localization_loss}, Testing Accuracy: {epoch_test_acc}')


In [None]:
torch.save(rcnn,'model.pth')