# Fish Detection
This notebook is taken from [pytorch.org](https://pytorch.org/tutorials/beginner/transfer_learning_tutorial.html).property

License: BSD
Author: Sasank Chilamkurthy

## Dependencies
torch torchvision numpy matplotlib ipykernel opencv-python


In [48]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import torch.backends.cudnn as cudnn
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import time
import os
from PIL import Image
from tempfile import TemporaryDirectory
import xml.etree.ElementTree as ET
import cv2


cudnn.benchmark = True
plt.ion()

<contextlib.ExitStack at 0x7687e8dd7310>

In [49]:
# For reference:
# Each directory in "$DATASET_DIR" has an "annotations.xml" and an "images"
# directory. "annotations.xml" contains all annotations, while "images" contains
# every frame in the video.
#
# To get all images in a direcotry: os.listdir(). The length of this list is
# the total frames in the video.

DATASET_DIR = "/home/khai/mounted_drives/documents/computer_vision/fish_classification_data/annotated_videos"

# Test on Single Sample
Let's create a test on a single image. We create a bounding box around the fish
per annotation and label it "fish".

In [50]:
# image_name = "frame_000070.PNG"
# xml_name = "annotations.xml"

# # Put together image
# img = cv2.imread(os.path.join(DATASET_DIR, image_name))
# cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # Convert BGR to RGB

# # Put together annotations
# tree = ET.parse(os.path.join(ANNOTATIONS_DIR, xml_name))
# root = tree.getroot()

# annotation_color = (0, 255, 0)

# # Extract bounding boxes from the annoations (if any)
# # <box frame="70" outside="0" occluded="0" keyframe="0" xtl="327.46" ytl="224.60" xbr="623.15" ybr="377.17" z_order="0">
# box_frames_collection = root.find("track")
# assert box_frames_collection is not None, "Cannot find the 'track' tag in the annotations"

# box_frames = list()

# # Extract all annoation datas for each frame
# for element in box_frames_collection:
#     xtl = int(float(element.attrib.get('xtl')))
#     ytl = int(float(element.attrib.get('ytl')))
#     xbr = int(float(element.attrib.get('xbr')))
#     ybr = int(float(element.attrib.get('ybr')))
#     box_frames.append({
#         "frame": element.attrib.get('frame'),
#         "x": xtl,
#         "width": xbr - xtl,
#         "y": ytl,
#         "height": ybr - ytl,
#     })

# fbox = dict()
# for e in box_frames:
#     if e.get("frame") == "70":
#         fbox = e
#         break


# # BGR colour space
# cv2.rectangle(img,
#             (fbox.get("x"), fbox.get("y")),
#             (fbox.get("x") + fbox.get("width"), fbox.get("y") + fbox.get("height")),
#             annotation_color,
#             2)

# # Annotate the box
# text_position = (fbox.get("x"), fbox.get("y") + 20)
# text_size = 1.5
# text_thickness = 3
# cv2.putText(img, "fish", text_position, cv2.FONT_HERSHEY_SIMPLEX, text_size, annotation_color, thickness=text_thickness)

# plt.imshow(img)
# plt.show()

# Label Video Clip
This time, we will use a video clip to create a bounding box around the fish and
also label it. We'll convert the above cell to a function.

Credit to [YingqiangGao](https://stackoverflow.com/users/7759152/yingqiang-gao) at [StackOverflow](https://stackoverflow.com/q/43048725) for the code with writing to video.

## Video Specifications
FPS: 25 (Based eye judgement; 24 is too slow and 30 is too quick)
Dimensions: 1920x1080


In [51]:
import shutil


def label_frames(images_dir: str, annotations: str, output_dir: str) -> None:
    try:
        # Create output directory
        output_images_dir = os.path.join(os.getcwd(), output_dir)
        if not os.path.exists(output_images_dir):
            os.mkdir(output_images_dir)
        
        # Create bounding boxes
        tree = ET.parse(annotations)
        box_frames_collection = tree.getroot().find("track")

        if box_frames_collection is None:
            print(f"Video {images_dir} does not contain annotations")
            return
        
        # Set bounding box and label colour
        annotation_color = (0, 255, 0)

        bframes = dict() # List of all boxes

        # Extract all annoation datas for each frame
        # TODO Support multple bounding boxes
        for element in box_frames_collection:
            xtl = int(float(element.attrib.get('xtl')))
            ytl = int(float(element.attrib.get('ytl')))
            xbr = int(float(element.attrib.get('xbr')))
            ybr = int(float(element.attrib.get('ybr')))
            bframes[element.attrib.get('frame')] = {
                "x": xtl,
                "width": xbr - xtl,
                "y": ytl,
                "height": ybr - ytl,
            }

        # Labeling each frame in the images directory
        all_frames = os.listdir(images_dir)
        all_frames.sort()
        for i, image_name in enumerate(all_frames):
            frame_num = str(i)

            # Print loading bar
            print(f"\tFrame {i + 1} of {len(all_frames)} ", end='|')
            # Number of signs to print for loading bar
            loading_bar = ((i + 1) / len(all_frames)) * 100
            for j in range(int(loading_bar)):
                print("=", end='')
            # Print empty signs for remaining frames to render
            for j in range(100 - int(loading_bar)):
                print(" ", end='')
            print(f"| {100 * (i + 1) / len(all_frames):.1f}%", end='\r')

            # Get image
            img = cv2.imread(os.path.join(images_dir, image_name))
            cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # Convert BGR to RGB

            if bframes.get(frame_num) is not None:  # There is a bounding box!
                # Create bounding box
                cv2.rectangle(img,
                            (bframes.get(frame_num).get("x"), bframes.get(frame_num).get("y")),
                            (bframes.get(frame_num).get("x") + bframes.get(frame_num).get("width"), bframes.get(frame_num).get("y") + bframes.get(frame_num).get("height")),
                            annotation_color,
                            2)

                # Annotate the box
                text_position = (bframes.get(frame_num).get("x"), bframes.get(frame_num).get("y") + 20)
                text_size = 1.5
                text_thickness = 3
                cv2.putText(img, "fish", text_position, cv2.FONT_HERSHEY_SIMPLEX, text_size, annotation_color, thickness=text_thickness)
                cv2.imwrite(os.path.join(output_images_dir, image_name), img)
            else:   # No bounding box
                # We simply copy over the file. This speeds up the processing time
                shutil.copy(os.path.join(images_dir, image_name), os.path.join(output_images_dir, image_name))
        
    except KeyboardInterrupt:
        raise KeyboardInterrupt("You have cancelled the process")
    print("")

In [52]:
if 1==0:
    all_videos_list = os.listdir(DATASET_DIR)
    all_videos_list.sort()

    # Get width and height
    img_size = (1920, 1080)

    print("This will take some time. Press CTRL+C to cancel gracefully")

    if not os.path.exists("/home/khai/mounted_drives/documents/computer_vision/fish_classification_data/output"):
        os.mkdir("/home/khai/mounted_drives/documents/computer_vision/fish_classification_data/output")

    for i, img in enumerate(all_videos_list):
        print(f"Labelling video {i + 1} of {len(all_videos_list)}")

        label_frames(
            images_dir=os.path.join(DATASET_DIR, img, "images"),
            annotations=os.path.join(DATASET_DIR, img, "annotations.xml"),
            output_dir=str(f"/home/khai/mounted_drives/documents/computer_vision/fish_classification_data/output/{i}.{img}")
        )


# Create CNN Model to Classify the Fish
- Specifications: 80% training, 20% testing

The first 80% of the dataset is for training, and the last 20% for testing. The
order is random.

In [124]:
# For reference: all our dataset is in "$DATASET_DIR"
import random


all_dataset = os.listdir(DATASET_DIR)

# Randomize the list
random.seed(42)
random.shuffle(all_dataset)

trainset = all_dataset[0: int(len(all_dataset) * 0.8)]
testset = all_dataset[int(len(all_dataset) * 0.8):]
classes = ['not_fish', 'fish']
class_labels = [0, 1]

# Define the Convolutional Neural Network
In this example, we will implement the following:

- Convolution: A 2d convolutional layer with `x` input channels, `y` output channels and kernel size of `z * z`
- Max pooling: A layer with `x * x` kernel size and stride of `y`
- Linear layer: A fully connected layer with `x` input features and `y` output features. `nn.Linear(16*5*5, 120)` means 16 features, 5x5 spatial dimensions and output size of 120.

## Max Pooling
A technique to downsample data,, commonly used in convolutional neural networks. It divides areas in the data into non-overlapping blocks and picks the maximum value in each block.

If you want to know more, use your favourite search engine to look them up.

In [120]:
# Define convolutional neural network
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):
    def __init__(self):
        super().__init__()
        # Convolutional layer: applies a filter to the input image to extract features.
        self.conv1 = nn.Conv2d(3,6,5)
        # Max pooling layer
        self.pool = nn.MaxPool2d(2,2)
        self.conv2 = nn.Conv2d(6,16,5)
        # Fully connected layer
        self.fc1 = nn.Linear(400, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    # ReLU: https://en.wikipedia.org/wiki/Rectifier_(neural_networks)
    # Propagate one step through the network.
    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

net = Net()

In [56]:
print(trainset)
os.listdir(os.path.join(DATASET_DIR, trainset[0]))


['task_fish_detection-2024_01_28_18_22_01-cvat_for_video_1.1', 'task_fish_detection-2024_01_28_18_41_54-cvat_for_video_1.1', 'task_fish_detection-2024_01_28_18_18_34-cvat_for_video_1.1', 'task_fish_detection-2024_01_28_18_29_12-cvat_for_video_1.1', 'task_fish_detection-2024_02_02_06_10_27-cvat_for_video_1.1', 'task_fish_detection-2024_02_01_18_51_52-cvat_for_video_1.1', 'task_fish_detection-2024_01_28_18_33_21-cvat_for_video_1.1', 'task_fish_detection-2024_01_28_18_28_06-cvat_for_video_1.1', 'task_fish_detection-2024_01_28_18_12_47-cvat_for-video-1.1', 'task_fish_detection-2024_01_28_18_55_01-cvat_for_video_1.1', 'task_fish_detection-2024_01_28_19_00_18-cvat_for_video_1.1', 'task_fish_detection-2024_01_28_18_51_06-cvat_for_video_1.1', 'task_fish_detection-2024_02_01_18_41_23-cvat_for_video_1.1', 'task_fish_detection-2024_02_01_18_27_02-cvat_for_video_1.1', 'task_fish_detection-2024_01_28_18_30_27-cvat_for_video_1.1', 'task_fish_detection-2024_02_01_18_34_18-cvat_for_video_1.1', 'task_f

['images', 'annotations.xml']

In [57]:
# Loss function and optimizer
import torch.optim as optim

# Cross entropy loss: Measure the difference between predicted probability and
# the truth. There are many loss functions, but cross entropy is commonly used
# in classification problems.
# In short, loss is a quantitative measure of how our model's prediction
# performs.
criterion = nn.CrossEntropyLoss()

# Stochastic gradient descent: Minimize the loss function.
# Learning rate: Step size in learning process. Too high value: overshoots the optimal value. Too low: too slow process.
# Momentum: Adds a fraction from the vector in the previous time step to the current.
#
# For info: an optimiser aims to improve the model's predictions (performance)
# by adjusting weights. Weights impacts how much a node's output contributes to
# the propagated node's input.
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

In [138]:
# Add transformers to our model
transform = transforms.Compose ([
    transforms.ToTensor(),
    transforms.Resize((32,32)),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])

# Train the network
# Please note this takes time, depending on your hardware. At the moment, we use
# CPU. This is slow!
# If you have saved the model in a file, this step is only necessary to
# execute when you want to improve or make changes to the model.

# Let's try to train only for one of the videos

video = trainset[10]

video_path = os.path.join(DATASET_DIR, video, "images")


# Get all annotations
annotations = os.path.join(DATASET_DIR, video, "annotations.xml")
labels = ["no-fish"] * int(len(video_path))  # List of frames with visible fish
if os.path.exists(annotations):
    tree = ET.parse(annotations)
    box_frames_collection = tree.getroot().find("track")

    if box_frames_collection is None:
        raise TypeError(f"Video {video} is not annotated")

    for element in box_frames_collection:
        labels[int(element.attrib.get('frame'))] = "fish"


frames = os.listdir(video_path)
frames.sort()

for epoch in range(2): # Loop over the dataset multiple times
    running_loss = 0.0

    print(f"Epoch {epoch+1} of {2}")

    for i, data in enumerate(frames, 0):
        print(f"Frame {i+1} of {len(frames)}", end='\r')
        img = Image.open(os.path.join(video_path, data))
        img.thumbnail((30, 30), Image.Resampling.LANCZOS)
        # get the inputs from dataset
        # inputs = image
        # labels = class
        # data = data from the trainloader
        inputs = transform(img)
        inputs = inputs.unsqueeze(0)
        inputs = inputs.repeat(4,1,1,1)
        # print(inputs.shape)
        # break

        # If there is an annotation for the current frame, then label is "fish".
        # Otherwise it's "no_fish"


        # zero the parameter gradients. Gradients means the change in the
        # weights
        optimizer.zero_grad()

        # forward + backward propagation + optimize
        # This is how we traverse the neural network. In CNN we perform forward
        # and backward propagation before we optimize the weights.
        outputs = net(inputs)

        c_labels = torch.LongTensor(class_labels).repeat(2)
        
        loss = criterion(outputs, c_labels)
        # loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        if i % 10 == 9: # print every n mini-batches
            print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
            running_loss = 0.0

    print("")

print('Finished Training')

Epoch 1 of 2
[1,    10] loss: 0.012
[1,    20] loss: 0.012
[1,    30] loss: 0.012
[1,    40] loss: 0.012
[1,    50] loss: 0.012
[1,    60] loss: 0.012
[1,    70] loss: 0.012
[1,    80] loss: 0.012
[1,    90] loss: 0.012
[1,   100] loss: 0.012
[1,   110] loss: 0.012
[1,   120] loss: 0.012
[1,   130] loss: 0.012
[1,   140] loss: 0.012
[1,   150] loss: 0.012
[1,   160] loss: 0.012
Frame 164 of 164
Epoch 2 of 2
[2,    10] loss: 0.012
[2,    20] loss: 0.012
[2,    30] loss: 0.012
[2,    40] loss: 0.012
[2,    50] loss: 0.012
[2,    60] loss: 0.012
[2,    70] loss: 0.012
[2,    80] loss: 0.012
[2,    90] loss: 0.012
[2,   100] loss: 0.012
[2,   110] loss: 0.012
[2,   120] loss: 0.012
[2,   130] loss: 0.012
[2,   140] loss: 0.012
[2,   150] loss: 0.012
[2,   160] loss: 0.012
Frame 164 of 164
Finished Training
