# **Import Dependencies**

In [7]:
import torch
import torch.nn as nn

import os
import cv2
import glob
import numpy as np
from torch.utils.data import Dataset, DataLoader

In [1]:
pip install roboflow



In [4]:
from roboflow import Roboflow

In [5]:
# rf = Roboflow(api_key="gMu18twOPMgwp22tEQfY")
# project = rf.workspace("sayed-gamall").project("fire-smoke-detection-yolov11")
# version = project.version(2)
# dataset = version.download("darknet")

rf = Roboflow(api_key="gMu18twOPMgwp22tEQfY")
project = rf.workspace("sayed-gamall").project("fire-smoke-detection-yolov11")
version = project.version(2)
dataset = version.download("yolov11")


# rf = Roboflow(api_key="gMu18twOPMgwp22tEQfY")
# project = rf.workspace("sayed-gamall").project("fire-smoke-detection-yolov11")
# version = project.version(2)
# dataset = version.download("yolov4pytorch")


loading Roboflow workspace...
loading Roboflow project...


Downloading Dataset Version Zip in Fire-Smoke-Detection-Yolov11-2 to yolov11:: 100%|██████████| 581927/581927 [00:33<00:00, 17590.02it/s]





Extracting Dataset Version Zip to Fire-Smoke-Detection-Yolov11-2 in yolov11:: 100%|██████████| 20938/20938 [00:03<00:00, 6957.80it/s]


# **DarkNet-53**

In [38]:
# Define the characteristics of a convolutional block in our CNN
class ConvUnit(nn.Module):
  def __init__(self, in_channels, out_channels, kernel_size, stride, padding):
    super(ConvUnit, self).__init__()
    # create convolutional layer, bias = False becasue we're using batch normalisation and that has its own bias
    self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding, bias=False)
    # batch normalisation - stabalise training
    self.bn = nn.BatchNorm2d(out_channels)
    # leaky relu - this should avoid vanishing gradients (alongside our skip connections) and also address the dying neuron problem
    # 0.1 defines the negative slope in the ReLU function that will allow some negative weights not to evaluate to zero
    self.leaky_relu = nn.LeakyReLU(0.1)


  def forward(self, x):
    '''
    pass x (our input) through conv -> batch_norm - > ReLU
    '''
    return self.leaky_relu(self.bn(self.conv(x)))

In [39]:
# Define the characteristics of a residual unit in our CNN
class ResidualUnit(nn.Module):
  def __init__(self, in_channels):
    super(ResidualUnit, self).__init__()
    # first convolution with n=output_channels 1*1 kernels yielding img*img*out_channels feature map
    # (downsample while retaining features)
    self.conv1 = ConvUnit(in_channels, in_channels//2, kernel_size=1, stride=1, padding=0)
    # second convolution (extract features)
    self.conv2 = ConvUnit(in_channels//2, in_channels, kernel_size=3, stride=1, padding=1)

  def forward(self, x):
    '''
    pass x through these convolutional layers and return original input + processed output
    '''
    # skip connection
    return x + self.conv2(self.conv1(x))

In [63]:
# Outline the CNN itself

class DarkNet53(nn.Module):
  def __init__(self):
    super(DarkNet53, self).__init__()
    # First convolutional layers (1) + batch norm and ReLU for each conv unit
    self.conv1 = ConvUnit(in_channels=3, out_channels=32, kernel_size=3, stride=1, padding=1)

    # Downsample (reduce dimensionality) (1 layer)
    self.conv2 = ConvUnit(in_channels=32, out_channels=64, kernel_size=3, stride=2, padding=1)

    # First residual unit + skip connection (2 convolutional layers)
    self.res1 = self.make_layer(ResidualUnit, 64, 1)

    # process repeated to capture more complex features
    self.conv3 = ConvUnit(in_channels=64, out_channels=128, kernel_size=3, stride=2, padding=1) # 1 layer
    self.res2 = self.make_layer(ResidualUnit, 128, 2) # 4 layers
    self.conv4 = ConvUnit(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1) # 1 layer
    self.res3 = self.make_layer(ResidualUnit, 256, 8) # 16 layers
    self.conv5 = ConvUnit(in_channels=256, out_channels=512, kernel_size=3, stride=2, padding=1) # 1 layer
    self.res4 = self.make_layer(ResidualUnit, 512, 8) # 16 layers
    self.conv6 = ConvUnit(in_channels=512, out_channels=1024, kernel_size=3, stride=2, padding=1) # 1 layer
    self.res5 = self.make_layer(ResidualUnit, 1024, 4) # 8 layers

    # apply gloval average pooling to final feature map as per DarkNet-53 architecture (1 layer)
    # final downsample before we feed the output feature map to the YOLO Layers (1*1 convolutional layers to predict the bounding boxes)
    # yolo v1 used fully connected layers but in this v3 implementation 1*1 convolutional layers are used to directly predict boxes i.e we don't flatten the feature map
    self.global_avg_pool = nn.AdaptiveAvgPool2d((1,1))

    # for a total of 53 layers ... what a surprise

  def make_layer(self, unit, out_channels, num_units):
    layers=[]
    for _ in range(num_units):
      layers.append(unit(out_channels))
    return nn.Sequential(*layers)

  def forward(self, x):
    '''
    feed input through the layers
    '''
    x = self.conv1(x)
    x = self.conv2(x)
    x = self.res1(x)
    x = self.conv3(x)
    x = self.res2(x)
    x = self.conv4(x)
    x = self.res3(x)
    c1=x
    x = self.conv5(x)
    x = self.res4(x)
    c2=x
    x = self.conv6(x)
    x = self.res5(x)
    c3=x

    # x = self.global_avg_pool(x) -> Part of DarkNet-53 but not used in YOLO V3
    # we take the raw convolutional outputs as below for multi scale detection
    return c1, c2, c3 # our feature map

#**"YOLO Layer"**

After our Darknet53 CNN extracts the features, we use the 'yolo,' layer to complete object detection and classification in a single forward pass.

In [41]:
class YOLOLayer(nn.Module):
  def __init__(self, in_channels, num_classes, anchors, input_dimension):
    super(YOLOLayer, self).__init__()
    self.num_classes = num_classes
    self.num_anchors = len(anchors)
    self.input_dim = input_dimension

    self.anchors = torch.tensor(anchors, dtype=torch.float32)
    # Output layer
    # convolutional layer convolving the DarkNet output witht a 1*1 kernel yielding c channels where c=(self.num_anchors * (5 + num_classes)
    # our grid is defined as each cell in the final downsampled feature map (after global pooling in darknet)
    # Each cell in the grid corresponds to a region of the original image and each cell is responsible for detecting objects within this region

    # Each cell has 3 anchors (predefined sizes objects should be approximately at different aspect ratios) YOLO
    # adjusts these anchors to define bounding boxes rather than drawing boxes from scratch so for each anchor
    # we want (x, y, w, h) describing how much to offset each aspect of the anchor to surround the image, we want our confidence score that
    # an object exists and we want class probabilities [x,y,w,h,conf,classes] per anchor, hence c=(self.num_anchors * (5 + num_classes)


    self.conv = nn.Conv2d(in_channels, self.num_anchors * (5 + num_classes), kernel_size=1, stride=1, padding=0)

  def forward(self, x):
    batch_size, _, grid_size, _ = x.shape

    anchors = self.anchors.to(x.device)

    # This outputs a 4D tensor of shape (batch_size, (anchors*(5+classes), grid_size, grid_size)) - a hypercube of data (not relevant, just like the word 'hypercube')
    # we use .view() to transform to a 5D tensor of shape (batch_size, anchors, (x,y,w,h,conf,2 class probs =7) grid, grid)
    # This way each anchor corresponds to the ReLU activations we are attributing to that anchor and that will be learned by the model
    prediction = self.conv(x).view(batch_size, self.num_anchors, 5 + self.num_classes, grid_size, grid_size)

    # this changes the order now to shape = (batch, anchors, grid, grid, (x,y,w,h,conf,2 class probs =7))
    # this allows us to cleanly access a given anchor at a given cell and output its downsampled ReLU activations from the CNN
    # these activations initially have nothing to do with x,y,w,h,conf,class probs, they are just a downsampling from the output of the CNN
    # but the model will learn to optimise them essentially outputting what we want where we want through the loss function
    prediction = prediction.permute(0, 1, 3, 4, 2).contiguous()

    # Now we have to take these predictions and convert the raw ReLU activations into our bounded boxes
    # I've written the formulae here :
    #      x = (sigmoid(predicted x offset) + grid cell x) / grid size)
    #      y = (sigmoid(predicted y offset) + grid cell y) / grid size)
    #      w = anchor w * exp(predicted w offset) / input_width
    #      h = anchor h * exp(predicted h offset) / input_height
    #      conf = sigmoid(predicted)
    #      class probs = softmax/sigmoid(predicted) depending on how many

    # the grid is sort of imaginary its just the size of the downsampled tensor (minus the channels obviously) and we can map it to the original image

    x = prediction[..., 0] # get x for every anchor for every cell for every image in batch
    y = prediction[..., 1] # get y for every anchor for every cell for every image in batch
    w = prediction[..., 2] # get w for every anchor for every cell for every image in batch
    h = prediction[..., 3] # get h for every anchor for every cell for every image in batch

    conf = torch.sigmoid(prediction[..., 4]) # sigmoid of every conf prediction in every anchor in every cell in batch
    class_probs = torch.sigmoid(prediction[..., 5:]) # we only have two classes so I went for sigmoid

    # grid_x and grid_y are tensors that store every x and every y coordinate respectively of the grid cells
    # meshgrid() creates a grid of grid_size * grid_size and assigns coordinates to grid_x and grid_y
    grid_x, grid_y = torch.meshgrid(torch.arange(grid_size), torch.arange(grid_size), indexing='ij')
    # these should be tensor.int64 by default but they will be involved in floating point calculations in a matter of 3 lines so
    # best to convert them to floats so python doesn't start nagging us
    grid_x, grid_y = grid_x.float().to(x.device), grid_y.float().to(x.device)

    # now the actual bounded box calculations according to fomulae outlined above
    bx = (torch.sigmoid(x) + grid_x) / grid_size # grid_X and Y ensure bounding box mapped to right position and
    by = (torch.sigmoid(y) + grid_y) / grid_size # sigmoid helps the bounding box centre stay in the cell

    # I feel like this looks more complicated than it is - its just the formula as above but
    # we use [:, 0/1] to index anchors from the anchors outlined in the YOLOv3 class and .view(1,-1,1,1)
    # to match the shape of w which has batch, grid and anchors so in order to match we transform it to shape
    # (1, num_anchors, 1, 1) - [[[[num_anchors]]]] * exp(w)
    # exp() is used becasue YOLO doesnt predict width it predicts a logarithmic offset of the predefined anchor so exp makes it stay positive
    # and scale by a meaningful amount
    # lastly dividing it by the input_dimension of the image gives us a decimal that we can use to adjust the boxes in a scale
    # invariant way
    bw = anchors[:, 0].view(1, -1, 1, 1) * torch.exp(w) / self.input_dim
    bh = anchors[:, 1].view(1, -1, 1, 1) * torch.exp(h) / self.input_dim

    # stack all predictions into a single tensor
    boxes = torch.stack((bx, by, bw, bh, conf), dim=-1)

    return boxes, class_probs

# **YOLOv3**

In [54]:
class YOLOv3(nn.Module):
  def __init__(self, num_classes):
    super(YOLOv3, self).__init__()
    self.base = DarkNet53()

    self.yolo1 = YOLOLayer(1024, num_classes, anchors=[(116, 90), (156, 198), (373, 326)], input_dimension=604)
    self.yolo2 = YOLOLayer(512, num_classes, anchors=[(30, 61), (62, 45), (59, 119)], input_dimension=604)
    self.yolo3 = YOLOLayer(256, num_classes, anchors=[(10, 13), (16, 30), (33, 23)], input_dimension=604)

  def forward(self, x):
    '''
    We pass x - through darknet extracting feature maps at multiple scales

    Three YOLO Layers are implemented to detect objects of variable size
    - Layer 1 -> deep map -> large objects
    - Layer 2 -> mid map -> medium objects
    - Layer 3 -> shallow map -> small objects

    We later apply NMS to remove duplicates ( boxes with >= 40% overlap )
    '''
    c1, c2, c3 = self.base(x) # Three depths from three stages of CNN
    out1 = self.yolo1(c3)
    out2 = self.yolo2(c2)
    out3 = self.yolo3(c1)
    return out1, out2, out3 # (Three detection heads)

# **Loss Function**

Three phase, Squared Error Approach

1) Bounding box error - heavy penalty

2) Incorrect object detected in cell

3) Error between prediction and target prediction

In [43]:
class YOLOLoss(nn.Module):
  def __init__(self, num_classes):
    super(YOLOLoss, self).__init__()
    self.num_classes = num_classes
    self.mse = nn.MSELoss()
    self.bce = nn.BCEWithLogitsLoss()
    self.ce = nn.CrossEntropyLoss()

  def forward(self, predictions, targets):

    pred_boxes = predictions[..., 1:5] # (bx, by, bw, bh)
    target_boxes = targets[..., 1:5] # (bx, by, bw, bh)(target)
    pred_conf = predictions[..., 0]
    target_conf = targets[..., 0]
    # pred_classes = predictions[..., 5:] - not present in annotations
    # target_classes = targets[..., 5:]

    # 1 : loss for bounding box (x,y coords) i.e the position of the box - iou loss used for w and h
    xy_loss = self.mse(pred_boxes[..., :2], target_boxes[..., :2])

    # 2 : w and h loss (iou loss)
    wh_loss = self.iou_loss(pred_boxes, target_boxes)

    # 3 : object confidence
    conf_loss = self.bce(pred_conf, target_conf)

    # 4 : class predictions
    # class_loss = self.ce(pred_classes, target_classes.argmax(-1))

    total_loss = xy_loss + wh_loss + conf_loss #+ class_loss
    return total_loss

    # intersection under curve
    def iou_loss(self, pred_boxes, target_boxes):

      pred_x, pred_y, pred_w, pred_h = pred_boxes[..., 0], pred_boxes[..., 1], pred_boxes[..., 2], pred_boxes[..., 3]
      target_x, target_y, target_w, target_h = target_boxes[..., 0],target_boxes[..., 1],target_boxes[..., 2],target_boxes[..., 3]

      # convert to w/h to min/max coords
      pred_x1 = pred_x - pred_w / 2
      pred_y1 = pred_y - pred_h / 2
      pred_x2 = pred_x + pred_w / 2
      pred_y2 = pred_y + pred_h / 2

      target_x1 = target_x - target_w / 2
      target_y1 = target_y - target_h / 2
      target_x2 = target_x + target_w / 2
      target_y2 = target_y + target_h / 2

      # get intersection area
      inter_x1 = torch.max(pred_x1, target_x1)
      inter_y1 = torch.max(pred_y1, target_y1)
      inter_x2 = torch.max(pred_x2, target_x2)
      inter_y2 = torch.max(pred_y2, target_y2)

      inter_w = torch.clamp(inter_x2 - inter_x1, min=0)
      inter_h = torch.clamp(inter_y2 - inter_y1, min=0)
      inter_area = inter_w * inter_h

      # get union area
      pred_area = pred_w * pred_h
      target_area = target_w * target_h
      union_area = pred_area + target_area - inter_area

      iou = inter_area / (union_area + 1e-6)
      return iou.mean()


In [15]:
# model = YOLOv3(num_classes=2)

Non Maximum Suppression

We're allowing 3 anchors per cell as per YOLOv3 so there's lots of duplicates to remove

In [14]:
def nonMaxSuppression(predictions, conf_threshold=0.5, iou_threshold=0.4):
    '''
    '''

    pass


In [44]:
class YOLODataset(Dataset):
  def __init__(self, image_dir, label_dir, img_size, num_classes, transform=None):
    self.image_paths = glob.glob(os.path.join(image_dir, '*.jpg'))
    self.label_paths = glob.glob(os.path.join(label_dir, '*.txt'))
    self.img_size = img_size
    self.transform = transform
    self.num_classes = num_classes

  def __len__(self):
    return len(self.image_paths)

  def parse_yolo_line(self, line):
    fields = line.strip().split()

    if len(fields) != 5:
      return None

    try:
      class_id, x,y,w,h = map(float, fields)
    except ValueError:
      return None

    return class_id, x,y,w,h

  def __getitem__(self, index):
    img_path = self.image_paths[index]
    img = cv2.imread(img_path)
    orig_h, orig_w, _ = img.shape

    img = img / 255.0
    img = torch.tensor(img, dtype=torch.float32).permute(2, 0, 1)

    label_path = self.label_paths[index]
    labels = []
    with open(label_path, 'r') as f:
      for line in f:
        parsed = self.parse_yolo_line(line)
        if parsed is None:
          continue


        class_id, x,y,w,h = parsed

        row = [class_id, x,y,w,h]
        labels.append([class_id,x,y,w,h])

    labels = torch.tensor(labels, dtype=torch.float32) if labels else torch.zeros((0,5))

    return img, labels

In [45]:
def collate_fn(batch):
  '''
  Collate the data

  Why is this necessary ?
  The YOLODatset().__getitem__() method returns a list of tuples (img, labels)
  e.g [(img1, lbl1), (img2, lbl2),...]
  When we pass this to the torch DataLoader

  '''
  # # unpack the list of tuples into two tuples images and labels e.g (img1, img2, img3) and (lbl1, lbl2, lbl3)
  # images, labels = zip(*batch)

  # # each image and label is a tensor of shape = (Channels, Height, Width)
  # # stack each of these images into a tensor of shape Batch(number of images passed in inserted at dim=0), Channels, Height, Width
  # images = torch.stack(images, dim=0)

  # # repeat the process with labels inserting the batch at dim 0
  # labels = torch.cat(labels, dim=0)
  imgs = []
  lbls_list = []
  for (img, lbls) in batch:
        imgs.append(img)
        lbls_list.append(lbls)
  imgs = torch.stack(imgs, dim=0)
  return imgs, lbls_list


In [46]:

train_img_dir = "/content/Fire-Smoke-Detection-Yolov11-2/train/images"
train_label_dir = "/content/Fire-Smoke-Detection-Yolov11-2/train/labels"
valid_img_dir = "/content/Fire-Smoke-Detection-Yolov11-2/valid/images"
valid_label_dir = "/content/Fire-Smoke-Detection-Yolov11-2/valid/labels"
train_dataset = YOLODataset(image_dir=train_img_dir, label_dir=train_label_dir, img_size=416, num_classes=2)
valid_dataset = YOLODataset(image_dir=valid_img_dir, label_dir=valid_label_dir, img_size=416, num_classes=2)
train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)
valid_loader = DataLoader(valid_dataset, batch_size=2, shuffle=False, collate_fn=collate_fn)

for batch_index, (images, labels) in enumerate(train_loader):
  print(f"Batch {batch_index + 1 }")
  print(f"Labels : {labels}")
  print(f"Images : {images.shape}")
  break


Batch 1
Labels : [tensor([[0.0000, 0.4872, 0.5183, 0.0284, 0.0593]]), tensor([[0.0000, 0.2989, 0.7197, 0.3693, 0.3877]])]
Images : torch.Size([2, 3, 640, 640])


# **Train**

In [47]:
epochs = 10

In [48]:
torch.cuda.empty_cache()

In [49]:
torch.cuda.memory_summary(device=None, abbreviated=False)

# clear cuda memory
torch.cuda.empty_cache()

In [64]:
model = YOLOv3(num_classes=2).to('cuda')

optimiser = torch.optim.Adam(model.parameters(), lr=0.001)
loss_fn = YOLOLoss(num_classes=2)


for epoch in range(epochs):
  total_loss = 0.0
  for batch_index, (image, labels) in enumerate(train_loader):

    images = images.to('cuda')
    for i in range(len(labels)):
        labels[i] = labels[i].to('cuda')

    out1, _, _ = model(images)

    boxes, classes = out1

    predictions = torch.cat([boxes, classes], dim=-1)

    batch_pred = predictions.view(-1, predictions.shape[-1])

    targets = []
    for lbl in labels:
      targets.append(lbl)

    batch_target = torch.cat(targets, dim=0)

    loss = loss_fn(batch_pred, batch_target).to('cuda')

    optimiser.zero_grad()
    loss.backward()
    optimiser.step()

    total_loss += loss.item()

    if (batch_index + 1) % 10 == 0:
      print(f"Epoch {epoch + 1}/{epochs}, Batch {batch_index + 1}/{len(train_loader)}, Loss: {loss.item()}")
  avg_loss = total_loss / len(train_loader)
  print(f"Epoch {epoch + 1}/{epochs}, Average Loss: {avg_loss}")

Using a target size (torch.Size([2, 2])) that is different to the input size (torch.Size([2400, 2])). This will likely lead to incorrect results due to broadcasting. Please ensure they have the same size.


RuntimeError: The size of tensor a (2400) must match the size of tensor b (2) at non-singleton dimension 0