In [None]:
import os
import numpy as np
import torch
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from engine import train_one_epoch, evaluate
import utils
import transforms as T
import albumentations as A
import cv2
import time
from albumentations.pytorch.transforms import ToTensorV2
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from  sklearn.model_selection import KFold
import random

In [None]:
print(torch.__version__)
print(torchvision.__version__)

In [None]:
class SmokeDataset(object):
    def __init__(self, root, transforms):
        self.root = root
        self.transforms = transforms
        # load all image files, sorting them to
        # ensure that they are aligned
        self.imgs = list(sorted(os.listdir(os.path.join(self.root, "images"))))
        self.annots = list(sorted(os.listdir(os.path.join(self.root, "labels"))))
        self.classes = ['Smoke', 'Fire']

    def convert_box_cord(self, bboxs, format_from, format_to, img_shape):
        # only add valid bboxes
        coords_converted = np.empty((0, 4))
        for bb in bboxs:
            if (bb[3] != 0 and bb[4] != 0):
                coords_converted = np.append(coords_converted, [yolo2pixel([img_shape[1], img_shape[0]], bb[1:])], axis=0)

        # if format_from == 'normxywh':
        #     if format_to == 'xyminmax':
        #         xw = bboxs[:, (1, 3)] * img_shape[1]
        #         yh = bboxs[:, (2, 4)] * img_shape[0]
        #         xmin = xw[:, 0] - xw[:, 1] / 2
        #         xmax = xw[:, 0] + xw[:, 1] / 2
        #         ymin = yh[:, 0] - yh[:, 1] / 2
        #         ymax = yh[:, 0] + yh[:, 1] / 2
        #         coords_converted = np.column_stack((xmin, ymin, xmax, ymax))

        # print("cc: ", coords_converted)

        return coords_converted

    def __getitem__(self, idx):
        # load images and boxes
        img_path = os.path.join(self.root, "images", self.imgs[idx])
        annot_path = os.path.join(self.root, "labels", self.annots[idx])
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32)
        img= img/255.0
        labels = []#create an empty list for the labels
        # retrieve bbox list and format to required type,
        # if annotation file is empty, fill dummy box with label 0
        if os.path.getsize(annot_path) != 0:
            #read the labels line by line appending them to the labels list with +1 since 0 is reserved for background
            # print(annot_path)
            with open(annot_path, 'r') as f:
              for line in f:
                    parts = line.strip().split()
                    # Only add index for valid bboxes
                    if parts[3] != '0.0' and parts[4] != '0.0':
                        # class_id = int(parts[0])
                        class_id = int(parts[0])
                        labels.append(class_id+1)
            bboxs = np.loadtxt(annot_path, ndmin=2)
            bboxs = self.convert_box_cord(bboxs, 'normxywh', 'xyminmax', img.shape)
            num_objs = len(bboxs)
            bboxs = torch.as_tensor(bboxs, dtype=torch.float32)
            # there is only one class
            # labels = torch.ones((num_objs,), dtype=torch.int64)
            labels = torch.as_tensor(labels, dtype=torch.int64)
            # suppose all instances are not crowd
            iscrowd = torch.zeros((num_objs,), dtype=torch.int64)
        else:
            bboxs = torch.as_tensor([[0, 0, img.shape[1], img.shape[0]]], dtype=torch.float32)
            labels = torch.zeros((1,), dtype=torch.int64)
            iscrowd = torch.zeros((1,), dtype=torch.int64)

        area = (bboxs[:, 3] - bboxs[:, 1]) * (bboxs[:, 2] - bboxs[:, 0])
        image_id = torch.tensor([idx])

        target = {}
        target["boxes"] = bboxs
        target["labels"] = labels
        target["image_id"] = idx
        target["area"] = area
        target["iscrowd"] = iscrowd

        if self.transforms is not None:
            labels_np = labels.numpy().tolist()
            # Pass bounding boxes and labels to Albumentations
            sample = self.transforms(image=img, bboxes=target['boxes'], labels=labels_np)
            img = sample['image']
            target['boxes'] = torch.tensor(sample['bboxes'], dtype=torch.float32)
            target['labels'] = torch.tensor(sample['labels'], dtype=torch.int64)

        # Ensure boxes are valid after transformation
        if target['boxes'].ndim == 1:
            target['boxes'] = torch.as_tensor([[0, 0, 1, 1]], dtype=torch.float32)
            target['labels'] = torch.zeros((1,), dtype=torch.int64)

        return img, target

    def __len__(self):
        return len(self.imgs)



In [None]:
def non_negative(coord, dim):
        """
            Sets negative coordinates to zero. This fixes bugs in some labeling tools.

            Input:
                coord: Int or float
                Any number that represents a coordinate, whether normalized or not.
        """

        if coord < 0:
            return 0
        elif coord > dim:
            return dim
        else:
            return coord

def yolo2pixel(dim, yolo_coords):
    """
        Transforms coordinates in YOLO format to coordinates in pixels.

        Input:
            dim: Tuple or list
            Image size (width, height).
            yolo_coords: List
            Bounding box coordinates in YOLO format (xcenter, ycenter, width, height).
        Output:
            pixel_coords: List
            Bounding box coordinates in pixels (xmin, ymin, xmax, ymax).
    """

    xmin = non_negative(round(dim[0] * (yolo_coords[0] - yolo_coords[2]/2)), dim[0])
    xmax = non_negative(round(dim[0] * (yolo_coords[0] + yolo_coords[2]/2)), dim[0])
    ymin = non_negative(round(dim[1] * (yolo_coords[1] - yolo_coords[3]/2)), dim[1])
    ymax = non_negative(round(dim[1] * (yolo_coords[1] + yolo_coords[3]/2)), dim[1])

    pixel_coords = [xmin, ymin, xmax, ymax]

    return pixel_coords

In [None]:
def get_model_bbox(num_classes):
    # load an instance segmentation model pre-trained on COCO
    # default
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights='DEFAULT') #pretrained=True)
    
    # v2
    # model = torchvision.models.detection.fasterrcnn_resnet50_fpn_v2(weights='DEFAULT') #pretrained=True)
    
    # mobilenet
    # model = torchvision.models.detection.fasterrcnn_mobilenet_v3_large_320_fpn(weights='DEFAULT')

    # get number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    # eff net ----
    # backbone = torchvision.models.efficientnet_b0(weights='DEFAULT').features
    # backbone.out_channels = 1280
    
    # anchor_generator = torchvision.models.detection.rpn.AnchorGenerator(
    #     sizes=((32, 64, 128, 256, 512),),
    #     aspect_ratios=((0.5, 1.0, 2.0),)
    # )

    # roi_pooler = torchvision.ops.MultiScaleRoIAlign(
    #     featmap_names=['0', '1', '2', '3'],
    #     output_size=7,
    #     sampling_ratio=2
    # )

    # model = torchvision.models.detection.FasterRCNN(
    #     backbone=backbone,
    #     num_classes=num_classes,
    #     rpn_anchor_generator=anchor_generator,
    #     box_roi_pool=roi_pooler
    # )

    # for layer in list(model.backbone.modules()):
    #     for param in layer.parameters():
    #         print(param.requires_grad)
    #         param.requires_grad = True

    # ----

    # # unFreeze all parameters
    # for layer in list(model.backbone.modules()):
    #     for param in layer.parameters():
    #         param.requires_grad = True

    # # Freeze first 10 layers
    # for layer in list(model.backbone.modules())[10:]:
    #     for param in layer.parameters():
    #         param.requires_grad = False

    return model

In [None]:
def get_transform(train):
    if train:
        return A.Compose([
            A.Resize(height=600, width=600, p=1.0),
            # A.Flip(p=0.5),
            # A.RandomResizedCrop(height=640,width=640,p=0.4),
            # # A.Perspective(p=0.4),
            # A.Rotate(p=0.5),
            # # A.Transpose(p=0.3),
            ToTensorV2(p=1.0)],
            bbox_params=A.BboxParams(format='pascal_voc',min_visibility=0.4, label_fields=['labels']))
    else:
        return A.Compose([A.Resize(height=600, width=600, p=1.0), ToTensorV2(p=1.0)],
                         bbox_params=A.BboxParams(format='pascal_voc', min_visibility=0.5, label_fields=['labels']))

In [None]:
def reset_weights(m):
  '''
    Try resetting model weights to avoid
    weight leakage.
  '''
  for layer in m.children():
    if hasattr(layer, 'reset_parameters'):
        print(f'Reset trainable parameters of layer = {layer}')
        layer.reset_parameters()

In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import random

# Function to visualize bounding boxes in the image with dynamic labeling
def plot_img_bbox(img, target, class_names):
    # plot the image and bboxes
    fig, a = plt.subplots(1, 1)
    fig.set_size_inches(5, 5)
    a.imshow(img.permute((1, 2, 0)))

    # Loop over each bounding box in the target
    for i, box in enumerate(target['boxes']):
        x, y, width, height = box[0], box[1], box[2] - box[0], box[3] - box[1]
        rect = patches.Rectangle((x, y),
                                 width, height,
                                 edgecolor='r',
                                 facecolor='none',
                                 clip_on=False)

        # Retrieve the class name from `class_names` using the label index in `target['labels']`
        label_idx = target['labels'][i].item()

        label_name = class_names[label_idx-1]  # Get the label name based on the index

        # Annotate the bounding box with the corresponding label name
        a.annotate(label_name, (x, y - 10), color='red', weight='bold',
                   fontsize=10, ha='left', va='top')

        # Draw the bounding box on top of the image
        a.add_patch(rect)
    plt.show()

# Assume `class_names` is a list of labels in the dataset
dataset = SmokeDataset('DFire/train', get_transform(train=False))
class_names = dataset.classes
print(class_names)
# Example of printing a few images with annotations
for i in random.sample(range(len(dataset)), 3):
    img, target = dataset[i]
    print(target['labels'])
    print(target['boxes'])
    plot_img_bbox(img, target, class_names)


## Training 

In [None]:
# !mkdir bestEpochs

In [None]:
save_dir = 'bestEpochs'

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(device)

In [None]:
torch.cuda.empty_cache()

In [None]:
num_epochs = 1

# three classes: background, smoke, and fire
num_classes = 3
# use our dataset and defined transformations
dataset = SmokeDataset('DFire/train', get_transform(train=True))
dataset_val = SmokeDataset('DFire/test', get_transform(train=False))

# define training and validation data loaders
data_loader = torch.utils.data.DataLoader(
        dataset, batch_size=5, shuffle=True, num_workers=0, # was 2, but anything greater than 0 doesn't work on windows
    collate_fn=utils.collate_fn)

data_loader_val = torch.utils.data.DataLoader(
    dataset_val, batch_size=5, shuffle=False, num_workers=0, # was 2, but anything greater than 0 doesn't work on windows
    collate_fn=utils.collate_fn)

# get the model using our helper function
model = get_model_bbox(num_classes)

'''
Use this to reset all trainable weights
model.apply(reset_weights)
'''

# move model to the right device
model.to(device)

# construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.0025,  # Feel free to play with values (0.005, 0.9, 0)
                            momentum=0.9, weight_decay=0)

# Defining learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
                                                step_size=20,
                                                gamma=0.2)


result_mAP = []
best_epoch = None

for epoch in range(num_epochs):
    # train for one epoch, printing every 50 iterations
    train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq=50)
    # update the learning rate
    lr_scheduler.step()
    # evaluate on the test dataset
    results =  evaluate(model, data_loader_val, device=device)
    # saves results of mAP @ IoU = 0.5
    result_mAP.append(results.coco_eval['bbox'].stats[1])
    #save the best result so far
    if result_mAP[-1] == max(result_mAP):
        best_save_path = os.path.join(f'{save_dir}/smoke_bestmodel_noaug_sgd(wd=0)_8batch-epoch{epoch}.pth')
        torch.save(model.state_dict(), best_save_path)
        best_epoch = int(epoch)
        print(f'model from epoch number {epoch} saved!\n result is {max(result_mAP)}')

# Saving the last model
save_path = os.path.join(f'smoke_noaug_sgd_2batch-lastepoch{num_epochs-1}.pth')
torch.save(model.state_dict(), save_path)
print(f'model from last epoch(no.{num_epochs-1}) saved')

# Inference

### Single Image

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = get_model_bbox(num_classes)
model.load_state_dict(torch.load(os.path.join(f'bestEpochs/smoke_bestmodel_noaug_sgd(wd=0)_8batch-epoch{best_epoch}.pth'),map_location=device))
model.to(device)
print(device)

In [None]:
color_inference = np.array([0.0,0.0,255.0])
color_label = np.array([255.0,0.0,0.0])

detection_threshold = 0.7

In [None]:
model.eval()

image_path = 'some_picture.jpg'
# image_path = 'smoke/train/images/ck0khubxx5khq0794ja58vgv5_jpeg_jpg.rf.e9a3da4068323430011b1a2300c02074.jpg'

model_image = cv2.imread(image_path)
model_image = cv2.cvtColor(model_image, cv2.COLOR_BGR2RGB).astype(np.float32)
model_image = model_image/255.0

transform = A.Compose([A.Resize(height=600, width=600, p=1.0), ToTensorV2()])
model_image = transform(image=model_image)['image']

cv2_image = np.transpose(model_image.numpy()*255,(1, 2, 0)).astype(np.float32)
cv2_image = cv2.cvtColor(cv2_image, cv2.COLOR_RGB2BGR).astype(np.float32)

# add batch dimension
model_image = torch.unsqueeze(model_image, 0)
with torch.no_grad():
    outputs = model(model_image.to(device))
    
# load all detection to CPU for further operations
outputs = [{k: v.to('cpu') for k, v in t.items()} for t in outputs]

# print(outputs[0])
label_names = ["Background", "Smoke", "Fire"]

# carry further only if there are detected boxes
if len(outputs[0]['boxes']) != 0:
    boxes = outputs[0]['boxes'].data.numpy()
    scores = outputs[0]['scores'].data.numpy()
    # filter out boxes according to `detection_threshold`
    boxes = boxes[scores >= detection_threshold].astype(np.int32)
    scores = np.round(scores[scores >= detection_threshold],2)
    draw_boxes = boxes.copy()


    # draw the bounding boxes and write the class name on top of it
    for j,box in enumerate(draw_boxes):
        cv2.rectangle(cv2_image,
                      (int(box[0]), int(box[1])),
                      (int(box[2]), int(box[3])),
                      color_inference, 2)
        cv2.putText(img=cv2_image, text=label_names[outputs[0]['labels'][j]], #,"Smoke",
                    org=(int(box[0] + 4), int(box[1] + 8)),
                    fontFace=cv2.FONT_HERSHEY_SIMPLEX,fontScale= 0.3,color= color_inference,
                    thickness=1, lineType=cv2.LINE_AA)
        cv2.putText(img=cv2_image, text=str(scores[j]),
                    org=(int(box[2] - 24), int(box[1] + 8)),
                    fontFace=cv2.FONT_HERSHEY_SIMPLEX,fontScale= 0.3,color= color_inference,
                    thickness=1, lineType=cv2.LINE_AA)

    # set size
    plt.figure(figsize=(10,10))
    plt.axis("off")

    # convert color from CV2 BGR back to RGB
    plt_image = cv2.cvtColor(cv2_image/255.0, cv2.COLOR_BGR2RGB)
    plt.imshow(plt_image)
    plt.show()

### Multi Image

In [None]:
# train on the GPU or on the CPU, if a GPU is not available
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# use our dataset and defined transformations
dataset_test = SmokeDataset('smoke/train', get_transform(train=False))


data_loader_test = torch.utils.data.DataLoader(
    dataset_test, batch_size=1, shuffle=False, num_workers=0, # was 2
    collate_fn=utils.collate_fn)

# get the model using our helper function
model = get_model_bbox(num_classes)


# load model to evaluate
model.load_state_dict(torch.load(os.path.join(f'bestEpochs/smoke_bestmodel_noaug_sgd(wd=0)_8batch-epoch{best_epoch}.pth'),map_location=device))
model.to(device)


results = evaluate(model, data_loader_test, device=device)
print("Average Precision at IoU 0.50:", results.coco_eval['bbox'].stats[1])

In [None]:
color_inference = np.array([0.0,0.0,255.0])
color_label = np.array([255.0,0.0,0.0])

detection_threshold = 0.7 # 0.7 originally
# to count the total number of images iterated through
frame_count = 0
# to keep adding the FPS for each image
total_fps = 0

In [None]:
for i,data in enumerate(data_loader_test):
    # get the image file name for predictions file name
    image_name = 'image no:' + str(int(data[1][0]['image_id']))
    model_image = data[0][0]
    cv2_image = np.transpose(model_image.numpy()*255,(1, 2, 0)).astype(np.float32)
    cv2_image = cv2.cvtColor(cv2_image, cv2.COLOR_RGB2BGR).astype(np.float32)

    # add batch dimension
    model_image = torch.unsqueeze(model_image, 0)
    start_time = time.time()
    with torch.no_grad():
        outputs = model(model_image.to(device))
    end_time = time.time()
    # get the current fps
    fps = 1 / (end_time - start_time)
    # add `fps` to `total_fps`
    total_fps += fps
    # increment frame count
    frame_count += 1
    # load all detection to CPU for further operations
    outputs = [{k: v.to('cpu') for k, v in t.items()} for t in outputs]
    # carry further only if there are detected boxes
    if len(outputs[0]['boxes']) != 0:
        boxes = outputs[0]['boxes'].data.numpy()
        scores = outputs[0]['scores'].data.numpy()
        # filter out boxes according to `detection_threshold`
        boxes = boxes[scores >= detection_threshold].astype(np.int32)
        scores = np.round(scores[scores >= detection_threshold],2)
        draw_boxes = boxes.copy()


        # draw the bounding boxes and write the class name on top of it
        for j,box in enumerate(draw_boxes):
            cv2.rectangle(cv2_image,
                          (int(box[0]), int(box[1])),
                          (int(box[2]), int(box[3])),
                          color_inference, 2)
            cv2.putText(img=cv2_image, text="Smoke",
                        org=(int(box[0]), int(box[1] - 5)),
                        fontFace=cv2.FONT_HERSHEY_SIMPLEX,fontScale= 0.3,color= color_inference,
                        thickness=1, lineType=cv2.LINE_AA)
            cv2.putText(img=cv2_image, text=str(scores[j]),
                        org=(int(box[0]), int(box[1] + 8)),
                        fontFace=cv2.FONT_HERSHEY_SIMPLEX,fontScale= 0.3,color= color_inference,
                        thickness=1, lineType=cv2.LINE_AA)

        # add boxes for labels
        for box in data[1][0]['boxes']:
            cv2.rectangle(cv2_image,
                          (int(box[0]), int(box[1])),
                          (int(box[2]), int(box[3])),
                          color_label, 2)
            cv2.putText(img=cv2_image, text="Label",
                        org=(int(box[0]), int(box[1] - 5)),
                        fontFace=cv2.FONT_HERSHEY_SIMPLEX,fontScale= 0.3,color= color_label,
                        thickness=1, lineType=cv2.LINE_AA)


        # set size
        plt.figure(figsize=(10,10))
        plt.axis("off")

        # convert color from CV2 BGR back to RGB
        plt_image = cv2.cvtColor(cv2_image/255.0, cv2.COLOR_BGR2RGB)
        plt.imshow(plt_image)
        plt.show()
        cv2.imwrite(f"/content/Results/{image_name}.jpg", cv2_image)
    print(f"Image {i + 1} done...")
    print('-' * 50)
print('TEST PREDICTIONS COMPLETE')

avg_fps = total_fps / frame_count
print(f"Average FPS: {avg_fps:.3f}")
