In [1]:
print("nice")

nice


### global

In [2]:
import os
import tensorflow as tf
import torch
print(tf.__version__)
print(torch.__version__)

2.15.0
2.1.0+cu121


In [3]:
import numpy as np
print(np.__version__)
# setting random_state
RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)
tf.random.set_seed(RANDOM_STATE)
torch.manual_seed(RANDOM_STATE)

1.25.2


<torch._C.Generator at 0x7ceb7bf4df50>

### some libraries and functions

In [4]:
# libraries
import sys, math
from collections import defaultdict
import pandas as pd
import matplotlib.pyplot as plt
import sklearn

In [5]:
# fix random_state
def fixRandomState(fixed_state: int=RANDOM_STATE):
  np.random.seed(fixed_state)
  tf.random.set_seed(fixed_state)
  torch.manual_seed(fixed_state)

# exception
def exception(requirement: bool, content):
  if(requirement == False): raise ValueError(content)
def catchException(ex: Exception):
  print(type(ex), ex.args)
  exception(False, ex)

# message
def mesVerbose(flag: bool, verbose, func_dir: str=""):
  if(flag == False): return
  print("__verbose__:", func_dir, verbose)
def mesWarning(note, func_dir: str=""):
  print("__warning__:", func_dir, str(note) + "###")

In [6]:
def over(val, name="") -> tuple:
  try: mesVerbose(True, (type(val), val.shape, str(sys.getsizeof(val)) + "Bytes"), name)
  except: mesVerbose(True, (type(val), "no_shape", str(sys.getsizeof(val)) + "Bytes"), name)

### model architecture

In [7]:
from torch import nn, optim
from torch.utils import data

BATCH_SIZE = 4
IN_SHAPE = (BATCH_SIZE, 3, 224, 224)

YOLO_BACKBONE_ARCHITECTURE = [(64, 7, 2, 'same'), 'M',
                                (192, 3, 1, 'same'), 'M',
                                (128, 1, 1, 'valid'),
                                [(128, 256), 1],
                                [(256, 512), 1], 'M',
                                [(256, 512), 4],
                                [(512, 1024), 1], 'M',
                                [(512, 1024), 2]]

GRID_SIZE = 7
NUM_BOXES = 2
NUM_CLASSES = 3
OUT_SHAPE = (16, 7, 7, 13)

In [8]:
DEVICE = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu')
DEVICE

device(type='cpu')

##### blcoks

In [9]:
class ConvWithBatchNorm(nn.Module):
  """Conv layer with batch norm and leaky relu"""

  def __init__(self, in_c: int, out_c: int, k_size: int, stride=1, negative_slope=0.1):
    super(ConvWithBatchNorm, self).__init__()
    self.in_shape = ()
    self.out_shape = ()

    padding = k_size // 2
    layers = nn.ModuleList()
    layers += [nn.Conv2d(in_c, out_c, k_size, stride=stride, padding=padding, bias=False)]
    layers += [nn.BatchNorm2d(num_features=out_c)]
    layers += [nn.LeakyReLU(negative_slope=negative_slope)]
    self.layers = layers

  def forward(self, x):
    self.in_shape = x.shape
    for layer in self.layers:
      x = layer(x)
    self.out_shape = x.shape
    return x

  def getInShape(self): return self.in_shape
  def getOutShape(self): return self.out_shape

In [10]:
class BottleNeckBlock(nn.Module):
  """Block of 1x1 reduction layers followed by 3x3 conv. layer"""

  def __init__(self, in_c: int, out_ces: tuple, num_repeat: int):
    super(BottleNeckBlock, self).__init__()
    self. out_shape = ()

    out_1x1 = out_ces[0]
    out_3x3 = out_ces[1]
    layers = nn.ModuleList()
    for i in range(num_repeat):
      layers += [nn.Conv2d(in_c, out_1x1, 1, stride=1, padding=0, bias=False)]
      layers += [nn.Conv2d(out_1x1, out_3x3, 3, stride=1, padding=1, bias=False)]
    self.layers = layers

  def forward(self, x):
    for layer in self.layers:
      x = layer(x)
    self.out_shape = x.shape
    return x

  def getOutShape(self): return self.out_shape

##### nnModule

In [11]:
class nnModule(nn.Module):
  def __init__(self) -> None:
    super(nnModule, self).__init__()
    self.in_shape = ()
    self.out_shape = ()
    self.model = nn.ModuleList()

  def getInShape(self): return self.in_shape
  def getOutShape(self): return self.out_shape
  def getModel(self): return self.model
  def setInShape(self, in_shape): self.in_shape = in_shape
  def setOutShape(self, out_shape): self.out_shape = out_shape
  def setModel(self, model): self.model = model

  def summary(self):
    in_shape = self.getInShape()
    model = self.getModel()
    x = torch.rand(in_shape[0], in_shape[1], in_shape[2], in_shape[3])
    for layer in model:
      print("\tin_shape:", type(x), x.shape)
      print(type(layer), sys.getsizeof(layer))
      x = layer(x)
    print("out_shape:", type(x), x.shape)

##### YoloBackbone

In [12]:
class YoloBackbone(nnModule):
  """YOLO backbone extract feature from the input"""

  def __init__(self, in_shpae: tuple, backbone_config=YOLO_BACKBONE_ARCHITECTURE):
    super(YoloBackbone, self).__init__()
    self.setInShape(in_shpae)
    model = nn.ModuleList()
    x = torch.rand(in_shpae[0], in_shpae[1], in_shpae[2], in_shpae[3])
    for i, config in enumerate(backbone_config):
      if type(config) == tuple:
        out_c, k_size, stride, _ = config
        model += [ConvWithBatchNorm(in_c=x.shape[1], out_c=out_c, k_size=k_size, stride=stride, negative_slope=0.1)]
        x = model[-1](x)

      elif type(config) == str:
        model += [nn.MaxPool2d(kernel_size=2, stride=2, padding=0)]
        x = model[-1](x)

      elif type(config) == list:
        out_ces, num_repeat = config
        model += [BottleNeckBlock(x.shape[1], out_ces, num_repeat)]
        x = model[-1](x)
    self.setOutShape(x.shape)
    self.setModel(model=model)

  def forward(self, x):
    for layer in self.getModel():
      x = layer(x)
    return x

In [13]:
model = YoloBackbone((1, 3, 224, 224)).summary()

	in_shape: <class 'torch.Tensor'> torch.Size([1, 3, 224, 224])
<class '__main__.ConvWithBatchNorm'> 48
	in_shape: <class 'torch.Tensor'> torch.Size([1, 64, 112, 112])
<class 'torch.nn.modules.pooling.MaxPool2d'> 48
	in_shape: <class 'torch.Tensor'> torch.Size([1, 64, 56, 56])
<class '__main__.ConvWithBatchNorm'> 48
	in_shape: <class 'torch.Tensor'> torch.Size([1, 192, 56, 56])
<class 'torch.nn.modules.pooling.MaxPool2d'> 48
	in_shape: <class 'torch.Tensor'> torch.Size([1, 192, 28, 28])
<class '__main__.ConvWithBatchNorm'> 48
	in_shape: <class 'torch.Tensor'> torch.Size([1, 128, 28, 28])
<class '__main__.BottleNeckBlock'> 48
	in_shape: <class 'torch.Tensor'> torch.Size([1, 256, 28, 28])
<class '__main__.BottleNeckBlock'> 48
	in_shape: <class 'torch.Tensor'> torch.Size([1, 512, 28, 28])
<class 'torch.nn.modules.pooling.MaxPool2d'> 48
	in_shape: <class 'torch.Tensor'> torch.Size([1, 512, 14, 14])
<class '__main__.BottleNeckBlock'> 48
	in_shape: <class 'torch.Tensor'> torch.Size([1, 512, 1

##### YoloOutput

In [14]:
YOLO_OUT_ARCHITECTURE = [(4096, 0.1), 0.5, (2040, 0.1), 0.5, (1024, 0.1), 0.5, (GRID_SIZE * GRID_SIZE * (NUM_BOXES * 5 + NUM_CLASSES), 0.1)]

In [15]:
class YoloOutput(nnModule):
  """YOLO last convolution and FC layers to produce prediction"""

  def __init__(self, in_shape: tuple):
    super(YoloOutput, self).__init__()
    self.setInShape(in_shape=in_shape)
    x = torch.rand(in_shape[0], in_shape[1], in_shape[2], in_shape[3])
    model = nn.ModuleList()
    model += [ConvWithBatchNorm(in_shape[1], out_c=1024, k_size=3),
              ConvWithBatchNorm(1024, out_c=1024, k_size=3),
              ConvWithBatchNorm(1024, out_c=1024, k_size=3),
              ConvWithBatchNorm(1024, out_c=1024, k_size=3),
              nn.Flatten()]
    for layer in model: x = layer(x)

    for i, config in enumerate(YOLO_OUT_ARCHITECTURE):
      if type(config) == tuple:
        out_f, slop = config
        model += [nn.Linear(in_features=x.shape[1], out_features=out_f), nn.LeakyReLU(negative_slope=slop)]
        x = model[-1](model[-2](x))

      else:
        p = config
        model += [nn.Dropout(p=0.5)]
        x = model[-1](x)
    self.setOutShape(x.shape)
    self.setModel(model)

  def forward(self, x):
    for layer in self.getModel():
      x = layer(x)
    return x

In [16]:
YoloOutput((16, 1024, 7, 7)).summary()

	in_shape: <class 'torch.Tensor'> torch.Size([16, 1024, 7, 7])
<class '__main__.ConvWithBatchNorm'> 48
	in_shape: <class 'torch.Tensor'> torch.Size([16, 1024, 7, 7])
<class '__main__.ConvWithBatchNorm'> 48
	in_shape: <class 'torch.Tensor'> torch.Size([16, 1024, 7, 7])
<class '__main__.ConvWithBatchNorm'> 48
	in_shape: <class 'torch.Tensor'> torch.Size([16, 1024, 7, 7])
<class '__main__.ConvWithBatchNorm'> 48
	in_shape: <class 'torch.Tensor'> torch.Size([16, 1024, 7, 7])
<class 'torch.nn.modules.flatten.Flatten'> 48
	in_shape: <class 'torch.Tensor'> torch.Size([16, 50176])
<class 'torch.nn.modules.linear.Linear'> 48
	in_shape: <class 'torch.Tensor'> torch.Size([16, 4096])
<class 'torch.nn.modules.activation.LeakyReLU'> 48
	in_shape: <class 'torch.Tensor'> torch.Size([16, 4096])
<class 'torch.nn.modules.dropout.Dropout'> 48
	in_shape: <class 'torch.Tensor'> torch.Size([16, 4096])
<class 'torch.nn.modules.linear.Linear'> 48
	in_shape: <class 'torch.Tensor'> torch.Size([16, 2040])
<class '

##### YoloV1

In [17]:
class YoloV1(nnModule):
  """End-to-end YOLO network"""

  def __init__(self, in_shape: tuple):
    super(YoloV1, self).__init__()
    self.setInShape(in_shape)

    x = torch.rand(in_shape[0], in_shape[1], in_shape[2], in_shape[3])
    yolo_backbone = YoloBackbone(in_shape)
    x = yolo_backbone(x)
    yolo_output = YoloOutput(in_shape=x.shape)
    x = yolo_output(x)

    self.setOutShape(x.shape)
    model = nn.ModuleList()
    model += [yolo_backbone, yolo_output]
    self.setModel(model)

  def forward(self, x):
    for layer in self.getModel():
      x = layer(x)
    return x

In [18]:
YoloV1((16, 3, 224, 224)).summary()

	in_shape: <class 'torch.Tensor'> torch.Size([16, 3, 224, 224])
<class '__main__.YoloBackbone'> 48
	in_shape: <class 'torch.Tensor'> torch.Size([16, 1024, 7, 7])
<class '__main__.YoloOutput'> 48
out_shape: <class 'torch.Tensor'> torch.Size([16, 637])


### YoloLoss

In [19]:
def intersection_over_union(boxes_preds, boxes_labels, box_format='midpoint'):
  """
  Calculates intersection over union

  Parameters:
      boxes_preds (tensor): Predictions of Bounding Boxes (BATCH_SIZE, 4)
      boxes_labels (tensor): Correct labels of Bounding Boxes (BATCH_SIZE, 4)
      box_format (str): midpoint/corners, if boxes are (x,y,w,h) or (x1,y1,x2,y2) respectively.

  Returns:
      tensor: Intersection over union for all examples
  """
  # boxes_preds shape is (N, 4)
  # boxes_labels shape is (N, 4)

  if box_format == 'midpoint':
      box1_x1 = boxes_preds[..., 0:1] - boxes_preds[..., 2:3] / 2
      box1_y1 = boxes_preds[..., 1:2] - boxes_preds[..., 3:4] / 2
      box1_x2 = boxes_preds[..., 0:1] + boxes_preds[..., 2:3] / 2
      box1_y2 = boxes_preds[..., 1:2] + boxes_preds[..., 3:4] / 2

      box2_x1 = boxes_labels[..., 0:1] - boxes_labels[..., 2:3] / 2
      box2_y1 = boxes_labels[..., 1:2] - boxes_labels[..., 3:4] / 2
      box2_x2 = boxes_labels[..., 0:1] + boxes_labels[..., 2:3] / 2
      box2_y2 = boxes_labels[..., 1:2] + boxes_labels[..., 3:4] / 2

  if box_format == 'corners':
      box1_x1 = boxes_preds[..., 0:1]
      box1_y1 = boxes_preds[..., 1:2]
      box1_x2 = boxes_preds[..., 2:3]
      box1_y2 = boxes_preds[..., 3:4]

      box2_x1 = boxes_labels[..., 0:1]
      box2_y1 = boxes_labels[..., 1:2]
      box2_x2 = boxes_labels[..., 2:3]
      box2_y2 = boxes_labels[..., 3:4]

  x1 = torch.max(box1_x1, box2_x1)
  y1 = torch.max(box1_y1, box2_y1)
  x2 = torch.min(box1_x2, box2_x2)
  y2 = torch.min(box1_y2, box2_y2)

  #$$$.clamp(0) is for the case when they don't intersect. Since when they don't intersect, one of these will be negative so that should become 0
  intersection = (x2 - x1).clamp(0) * (y2 - y1).clamp(0)
  box1_area = abs((box1_x2 - box1_x1) * (box1_y2 - box1_y1))
  box2_area = abs((box2_x2 - box2_x1) * (box2_y2 - box2_y1))
  return intersection / (box1_area + box2_area - intersection + 1e-6)


In [20]:
EPS = 1e-6
def sign_sqrt(pred):
  return torch.sign(pred) * torch.sqrt(torch.abs(pred + EPS))
### !!! tai 0 khong co dao ham cua abs

In [22]:
class YoloLoss(nn.Module):
  def __init__(self, coord_c=5, noobj_c=0.5):
    super(YoloLoss, self).__init__()
    self.COORD = coord_c
    self.NOOBJ = noobj_c
    self.mse = nn.MSELoss(reduction="sum")

  def forward(self, predictions: torch.Tensor, target: torch.Tensor) -> torch.Tensor:
    predictions = predictions.reshape((-1, GRID_SIZE, GRID_SIZE, NUM_BOXES * 5 + NUM_CLASSES))
    exists_box = target[..., [4]]
    iou_b1 = intersection_over_union(
        predictions[...,0:4], target[..., 0:4])
    iou_b2 = intersection_over_union(
        predictions[..., 5:9], target[..., 0:4])
    bestbox = torch.where(iou_b1 >= iou_b2, 0, 1)

    # class loss
    class_loss = self.mse(
      exists_box * predictions[..., 10:],
      exists_box * target[..., 5:])
    print(class_loss)

    # obj loss
    pred_box = (
        (1-bestbox) * predictions[..., [4]] + (bestbox) * predictions[..., [9]]
    )
    object_loss = self.mse(
      exists_box * pred_box,
      exists_box * target[..., [4]]
    )
    print(object_loss)

    # coor loss
    pred_box = (
        (1-bestbox) * predictions[..., 0:4] + (bestbox) * predictions[..., 5:9]
    )
    true_box = target[..., 0:4]
    pred_box[..., 2:4] = sign_sqrt(pred_box[..., 2:4])
    true_box[..., 2:4] = sign_sqrt(true_box[..., 2:4])
    coor_loss = self.mse(
      #exists_box * pred_box, end_dim=-2),
      #exists_box * true_box, end_dim=-2),
      exists_box * pred_box, exists_box * true_box
    )
    print(coor_loss)

    # no obj loss
    no_obj_loss = self.mse(
      (1 - exists_box) * predictions[..., [4]], (1 - exists_box) * target[..., [4]]
    )
    no_obj_loss += self.mse(
      (1 - exists_box) * predictions[..., [9]], (1 - exists_box) * target[..., [4]]
    )
    print(no_obj_loss)
    return class_loss + object_loss + self.COORD * coor_loss + self.NOOBJ * no_obj_loss


In [24]:
imgs = torch.rand(16, 3, 224, 224)
over(imgs, "imgs=")
model = YoloV1((16, 3, 224, 224))
out = model(imgs)
over(out, "pred=")

__verbose__: imgs= (<class 'torch.Tensor'>, torch.Size([16, 3, 224, 224]), '80Bytes')
__verbose__: pred= (<class 'torch.Tensor'>, torch.Size([16, 637]), '80Bytes')


In [25]:
out_true = torch.rand(16, 7, 7, 8)
over(out_true, "true=")
loss = YoloLoss()
loss.forward(out, out_true)

__verbose__: true= (<class 'torch.Tensor'>, torch.Size([16, 7, 7, 8]), '80Bytes')
tensor(256.4862, grad_fn=<MseLossBackward0>)
tensor(154.1761, grad_fn=<MseLossBackward0>)
tensor(398.1599, grad_fn=<MseLossBackward0>)
tensor(44.9569, grad_fn=<AddBackward0>)


tensor(2423.9402, grad_fn=<AddBackward0>)

### DataLoad

In [26]:
import os
from xml.etree import ElementTree
import tensorflow as tf
from tqdm import tqdm
from functools import partial
from keras.preprocessing.image import load_img, img_to_array

class_names = ['apple', 'banana', 'orange']

class DataLoad(data.Dataset):
  def __init__(self, file_dir, repeat, aug=False) -> None:
    super().__init__()
    dataframe = self.get_dataframe(file_dir=file_dir)
    self.imgs, self.labels = self.load_dataset(dataframe, input_shape=(224, 224, 3), #!!!
                                                grid_size=GRID_SIZE) # np.ndarray
    # repeat
    for i in range(repeat):
      self.imgs = np.concatenate((self.imgs, self.imgs), axis=0)
      self.labels = np.concatenate((self.labels, self.labels), axis=0)
    # aug
    if(aug == True):
      for i, img in enumerate(self.imgs):
        label = self.labels[i]
        self.imgs[i], self.labels[i] = self._apply_augmentation(img, label, seed=RANDOM_STATE)
    over(self.imgs, "DataLoad > __init__ > imgs=")
    over(self.labels, "DataLoad > __init__ > labels=")

  def __len__(self):
    return len(self.imgs)

  def __getitem__(self, idx):
      x, y = self.imgs[idx], self.labels[idx] # np.ndarray
      x, y = tf.convert_to_tensor(x), tf.convert_to_tensor(y) # tf.tensor
      # if(self.aug == True):
      #   x, y = self._apply_augmentation(x, y, seed=RANDOM_STATE) # tf.tensor
      # cast type
      x = torch.tensor(x.numpy(), dtype=torch.float32)  # torch.tensor
      y = torch.tensor(y.numpy(), dtype=torch.float32)
      return x, y


  def get_dataframe(self, file_dir):
    """
    Get the train/val/test dataframe which contains image
    file names and annotations files. If `phase = train',
    return train and val set
    :param file_dir: File directory to create dataframe
    :return file_df: Train or test dataframe
    """

    img_files = [os.path.join(file_dir, img_file) for img_file
                 in sorted(os.listdir(file_dir)) if img_file[-4:] == '.jpg']
    annot_files = [img_file[:-4] + '.xml' for img_file in img_files]

    img_file_series = pd.Series(img_files, name='Image_file')
    annot_file_series = pd.Series(annot_files, name='Annotation_file')
    file_df = pd.DataFrame(pd.concat([img_file_series, annot_file_series], axis=1))
    return file_df

  def prepare_image(self, filename, input_shape):
    """
    Resize image to expected dimension, and opt. apply some random transformation.
    :param filename: File name
    :param input_shape: Shape expected by the model (image will be resize accordingly)
    :return : 3D image array, pixel values from [0., 1.]
    """

    img = img_to_array(load_img(filename, target_size=input_shape)) / 255.
    img = np.einsum('ijk->kij', img)
    # print(img.shape)
    return img

  def convert_to_xywh(self, bboxes):
    """
    Convert list of (xmin, ymin, xmax, ymax) to
    (x_center, y_center, box_width, box_height)
    :param bboxes: List of bounding boxes, each has 4
    values (xmin, ymin, xmax, ymax)
    :return boxes: List of bounding boxes, each has 4
    values (x_center, y_center, box_width, box_height)
    """

    boxes = list()
    for box in bboxes:
        xmin, ymin, xmax, ymax = box

        # Compute width and height of box
        box_width = xmax - xmin
        box_height = ymax - ymin

        # Compute x, y center
        x_center = int(xmin + (box_width / 2))
        y_center = int(ymin + (box_height / 2))

        boxes.append((x_center, y_center, box_width, box_height))

    return boxes

  def extract_annotation_file(self, filename):
    """
    Extract bounding boxes from an annotation file
    :param filename: Annotation file name
    :return boxes: List of bounding boxes in image, each box has
    4 values (x_center, y_center, box_width, box_height)
    :return classes: List of classes in image
    :return width: Width of image
    :return height: Height of image
    """

    # Load and parse the file
    tree = ElementTree.parse(filename)
    # Get the root of the document
    root = tree.getroot()
    boxes = list()
    classes = list()

    # Extract each bounding box
    for box in root.findall('.//object'):
        cls = class_names.index(box.find('name').text)
        xmin = int(box.find('bndbox/xmin').text)
        ymin = int(box.find('bndbox/ymin').text)
        xmax = int(box.find('bndbox/xmax').text)
        ymax = int(box.find('bndbox/ymax').text)
        coors = (xmin, ymin, xmax, ymax)
        boxes.append(coors)
        classes.append(cls)

    boxes = self.convert_to_xywh(boxes)

    # Get width and height of an image
    width = int(root.find('.//size/width').text)
    height = int(root.find('.//size/height').text)

    # Some annotation files have set width and height by 0,
    # so we need to load image and get it width and height
    if (width == 0) or (height == 0):
        img = load_img(filename[:-4] + '.jpg')
        width, height = img.width, img.height

    return boxes, classes, width, height

  def convert_bboxes_to_tensor(self, bboxes, classes, img_width, img_height, grid_size=7):
    """
    Convert list of bounding boxes to tensor target
    :param bboxes: List of bounding boxes in image, each box has
    4 values (x_center, y_center, box_width, box_height)
    :param classes: List of class in image
    :param img_width: Image's width
    :param img_height: Image's height
    :param grid_size: Grid size
    :return target: Target tensor (grid_size x grid_size x (5 + num_classes))
    """

    num_classes = len(class_names)
    target = np.zeros(shape=(grid_size, grid_size, 5 + num_classes), dtype=np.float32)

    for idx, bbox in enumerate(bboxes):
        x_center, y_center, width, height = bbox

        # Compute size of each cell in grid
        cell_w, cell_h = img_width / grid_size, img_height / grid_size

        # Determine cell i, j of bounding box
        i, j = int(y_center / cell_h), int(x_center / cell_w)

        # Compute value of x_center and y_center in cell
        x, y = (x_center / cell_w) - j, (y_center / cell_h) - i

        # Normalize width and height of bounding box
        w_norm, h_norm = width / img_width, height / img_height

        # Add bounding box to tensor
        # Set x, y, w, h
        target[i, j, :4] += (x, y, w_norm, h_norm)
        # Set obj score
        target[i, j, 4] = 1.
        # Set class dist.
        target[i, j, 5 + classes[idx]] = 1.
    return target

  def load_dataset(self, dataframe, input_shape, grid_size=7):
    """
    Load img and target tensor
    :param dataframe: Dataframe contains img files and annotation files
    :param input_shape: Shape expected by the model (image will be resize accordingly)
    :param grid_size: Grid size
    :return dataset: Iterable dataset
    """

    imgs, targets = list(), list()

    for _, row in tqdm(dataframe.iterrows()):
        img = self.prepare_image(row.Image_file, input_shape)
        target = self.extract_annotation_file(row.Annotation_file)
        target = self.convert_bboxes_to_tensor(*target, grid_size)
        imgs.append(img)
        targets.append(target)

    imgs = np.array(imgs)
    targets = np.array(targets)
    return imgs, targets
    # dataset = tf.data.Dataset.from_tensor_slices((imgs, targets))
    # return dataset

  def _apply_augmentation(self, image, target, seed=None):
    """
    Apply random brightness and saturation on image
    :param image: Image to augment
    :param target: Target tensor
    :param seed: Seed for random operation
    :return : Processed data
    """

    # Random bright & saturation change
    image = tf.image.random_brightness(image, max_delta=0.1, seed=seed)
    image = tf.image.random_saturation(image, lower=0.5, upper=1.5, seed=seed)

    # Keeping pixel values in check
    image = tf.clip_by_value(image, clip_value_min=0.0, clip_value_max=1.0)

    return image, target

  def load_dataset_from_df(self, dataframe, batch_size=32, num_repeat=None, shuffle=False,
                         input_shape=(448, 448, 3), grid_size=7, augment=False,
                         seed=None):
    """
    Instantiate dataset
    :param dataframe: Dataframe contains img files and annotation files
    :param batch_size: Batch size
    :param num_epochs: Number of epochs (to repeat the iteration - infinite if None)
    :param shuffle: Flag to shuffle the dataset (if True)
    :param input_shape: Shape of the processed image
    :param grid_size: Grid size
    :param augment: Flag to apply some random augmentations to the image
    :param seed: Random seed for operation
    :return : Iterable dataset
    """

    apply_augmentation = partial(self._apply_augmentation, seed=seed)
    dataset = self.load_dataset(dataframe, input_shape, grid_size)
    ### !!!
    dataset = dataset.repeat(num_repeat)
    if shuffle:
        dataset = dataset.shuffle(1000, seed)
    if augment:
        dataset = dataset.map(apply_augmentation, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(tf.data.AUTOTUNE)
    return dataset

In [27]:
IN_SHAPE

(4, 3, 224, 224)

In [28]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [29]:
train_dir = '/content/drive/MyDrive/Colab Notebooks/My_Laptop_Data/fruits_dataset/train'
dataload = DataLoad(train_dir, aug=False, repeat=4)
train_df = dataload.get_dataframe(train_dir)

240it [00:05, 40.72it/s]


__verbose__: DataLoad > __init__ > imgs= (<class 'numpy.ndarray'>, (3840, 3, 224, 224), '2312110240Bytes')
__verbose__: DataLoad > __init__ > labels= (<class 'numpy.ndarray'>, (3840, 7, 7, 8), '6021280Bytes')


In [30]:
over(dataload)
over(dataload[0][0])
over(dataload[0:16][1])

__verbose__:  (<class '__main__.DataLoad'>, 'no_shape', '48Bytes')
__verbose__:  (<class 'torch.Tensor'>, torch.Size([3, 224, 224]), '80Bytes')
__verbose__:  (<class 'torch.Tensor'>, torch.Size([16, 7, 7, 8]), '80Bytes')


In [31]:
# Assuming train_dataset is your training dataset
# train_loader = DataLoader(dataset=dataload, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True, drop_last=True, prefetch_factor=2)


In [32]:
# Assuming train_dataset is your training dataset
train_loader = data.DataLoader(dataset=dataload, batch_size=BATCH_SIZE, shuffle=True, drop_last=False)

In [33]:
over(train_loader, "train_loader=")

__verbose__: train_loader= (<class 'torch.utils.data.dataloader.DataLoader'>, 'no_shape', '48Bytes')


### training and testing

In [34]:
def train_fn(train_loader, model, optimizer, loss_fn):
  loop = tqdm(train_loader, leave=True)
  mean_loss = []

  for batch_idx, (x, y) in enumerate(train_loader):
    x, y = x.to(DEVICE), y.to(DEVICE)
    out = model(x).to(DEVICE)
    over(out, "train_fn > out=")
    over(y, "train_fn > y=")
    loss = loss_fn(out, y)
    print("loss in batch", loss.detach().numpy())

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

  print(f"Mean loss was {sum(mean_loss) / len(mean_loss)}")

In [35]:
train_loader = data.DataLoader(dataset=dataload, batch_size=BATCH_SIZE, shuffle=True, drop_last=False)
model = YoloV1(in_shape=IN_SHAPE).to(DEVICE)
optimizer = optim.Adam(list(model.parameters()), lr=2e-5, weight_decay=0)
loss_fn = YoloLoss().to(DEVICE)

In [36]:
train_fn(train_loader, model, optimizer, loss_fn)

  0%|          | 0/960 [00:00<?, ?it/s]

__verbose__: train_fn > out= (<class 'torch.Tensor'>, torch.Size([4, 637]), '80Bytes')
__verbose__: train_fn > y= (<class 'torch.Tensor'>, torch.Size([4, 7, 7, 8]), '80Bytes')
tensor(10.4106, grad_fn=<MseLossBackward0>)
tensor(10.3511, grad_fn=<MseLossBackward0>)
tensor(16.4696, grad_fn=<MseLossBackward0>)
tensor(1.1594, grad_fn=<AddBackward0>)
loss in batch 103.68942
__verbose__: train_fn > out= (<class 'torch.Tensor'>, torch.Size([4, 637]), '80Bytes')
__verbose__: train_fn > y= (<class 'torch.Tensor'>, torch.Size([4, 7, 7, 8]), '80Bytes')
tensor(8.6974, grad_fn=<MseLossBackward0>)
tensor(8.8443, grad_fn=<MseLossBackward0>)
tensor(14.0548, grad_fn=<MseLossBackward0>)
tensor(1.5393, grad_fn=<AddBackward0>)
loss in batch 88.58521
__verbose__: train_fn > out= (<class 'torch.Tensor'>, torch.Size([4, 637]), '80Bytes')
__verbose__: train_fn > y= (<class 'torch.Tensor'>, torch.Size([4, 7, 7, 8]), '80Bytes')
tensor(4.9224, grad_fn=<MseLossBackward0>)
tensor(4.4851, grad_fn=<MseLossBackward0>)

KeyboardInterrupt: 

In [None]:
a = torch.rand(1, 7, 7, 2)
print(a)
sign_sqrt(a)

### End