# Blue tit nestling diet pipeline
#### Emma Poliakova 
LICENSE: https://github.com/EmmaPoliakova/BlueTitDiet/blob/main/LICENSE 

<br> 
<br>
This notebook is for processing blue tit nest box recordings. 

### To get started: 

* in the meno on top of the page click file -> save a copy in Drive 

* download the zip file with pretrained models for blue tit detection, landmark localization, and food classifier from here:https://www.mediafire.com/file/l8p2mum2jv3ofva/BlueTitDiet.zip/file

* upload it to the landing page of your google drive

* open the runtime section on the top of the page, select change runtime type and select GPU

* run the imports and setup code sections

* if no errors show you are ready to process the videos

## Imports

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
from IPython.display import Image
from keras.preprocessing.image import ImageDataGenerator
from skimage import io
from PIL import Image

import os, json, cv2, numpy as np, matplotlib.pyplot as plt
import pickle
import seaborn as sns
from datetime import timedelta

import torch
from torch.utils.data import Dataset, DataLoader
import torchvision
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.transforms import functional as F

import torch.nn as nn
from torch import Tensor
import torch.nn.functional as nnf
import torch.optim as optim
from os import path


# SetUp

You only need to run this section when you are running this notebook for the first time.

In [None]:
!unzip ./gdrive/MyDrive/BlueTitDiet.zip

In [None]:
!git clone https://github.com/ultralytics/yolov5 
!pip install -U -r yolov5/requirements.txt 

In [None]:
!rm ./yolov5/detect.py
!cp ./BlueTitDiet/code/detect.py ./yolov5/
if path.exists('./yolov5/runs/detect/') == False:
  os.mkdir('./yolov5/runs')
  os.mkdir('./yolov5/runs/detect')
  os.mkdir('./yolov5/runs/detect/pictures')
  os.mkdir('./yolov5/runs/detect/crops')


In [None]:
%%shell

# Download TorchVision repo to use some files from
# references/detection
git clone https://github.com/pytorch/vision.git
cd vision
git checkout v0.8.2

cp references/detection/utils.py ../
cp references/detection/transforms.py ../
cp references/detection/coco_eval.py ../
cp references/detection/engine.py ../
cp references/detection/coco_utils.py ../

In [None]:
!rm ./coco_eval.py
!cp ./BlueTitDiet/code/coco_eval.py ./

from engine import train_one_epoch, evaluate
import transforms, utils, engine
from utils import collate_fn

# Functions

In [None]:
def get_model(num_keypoints, weights_path=None):
    
    anchor_generator = AnchorGenerator(sizes=(32, 64, 128, 256, 512), aspect_ratios=(0.25, 0.5, 0.75, 1.0, 2.0, 3.0, 4.0))
    model = torchvision.models.detection.keypointrcnn_resnet50_fpn(pretrained=False,
                                                                   pretrained_backbone=True,
                                                                   num_keypoints=num_keypoints,
                                                                   num_classes = 2, # Background is the first class, object is the second class
                                                                   rpn_anchor_generator=anchor_generator)

    if weights_path:
        state_dict = torch.load(weights_path)
        model.load_state_dict(state_dict)        
        
    return model

In [None]:
# training and prediction keypoint rcnn code modified from [2]
def crop_images(in_folder, out_folder, width, height):
  
  width_half = width//2

  for img in os.listdir(in_folder):
    if img.endswith(".png"):
      images = []
      path = in_folder + img
      img_original = cv2.imread(path)
      images.append(F.to_tensor(img_original))

      images = list(image.to(device) for image in images)

      with torch.no_grad():
          model.to(device)
          model.eval()
          output = model(images)

      image = (images[0].permute(1,2,0).detach().cpu().numpy() * 255).astype(np.uint8)
      scores = output[0]['scores'].detach().cpu().numpy()

      high_scores_idxs = np.where(scores > 0.7)[0].tolist() # Indexes of boxes with scores > 0.7
      post_nms_idxs = torchvision.ops.nms(output[0]['boxes'][high_scores_idxs], output[0]['scores'][high_scores_idxs], 0.3).cpu().numpy() 

      keypoints = []
      for kps in output[0]['keypoints'][high_scores_idxs][post_nms_idxs].detach().cpu().numpy():
          keypoints.append([list(map(int, kp[:2])) for kp in kps])

      if keypoints:
        beak = keypoints[0][-1]
        cropped_image =  img_original[ beak[1] : beak[1] + height , beak[0] - width_half: beak[0] + width_half]

        path_crop = out_folder + img 
        cv2.imwrite(path_crop, cropped_image)

In [None]:
def most_common(lst):
    return max(set(lst), key=lst.count)

In [None]:
#implementation by Balawejder, M. [3]
class ClassDataset(Dataset):
    def __init__(self, image_paths, dataset, location, transform=False):
        self.image_paths = image_paths
        self.labels = dataset
        self.transform = transform
        self.location = location
        
    def __len__(self):
        return len(self.image_paths)

    def get_labels(self):
      return(self.labels['labels'])   

    def __getitem__(self, idx):
        image_filepath = self.location + self.image_paths[idx]
        image = cv2.imread(image_filepath) 
        label = self.image_paths[idx]

        return image, label
    

In [None]:
#implementation by Balawejder, M. [3]
class ConvBlock(nn.Module):
    # Convolution Block with Conv2d layer, Batch Normalization and ReLU. Act is an activation function. 
    def __init__(
        self,
        in_channels : int,
        out_channels : int,
        kernel_size : int,
        stride : int,
        act = nn.ReLU(),
        groups = 1,
        bn = True,
        bias = False     
        ):
        super().__init__()

        # If k = 1 -> p = 0, k = 3 -> p = 1, k = 5, p = 2. 
        padding = kernel_size // 2 
        self.c = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding, groups=groups, bias=bias)
        self.bn = nn.BatchNorm2d(out_channels) if bn else nn.Identity()
        self.act = act

    def forward(self, x: Tensor) -> Tensor:
        return self.act(self.bn(self.c(x)))


class SeBlock(nn.Module):
    # Squeeze and Excitation Block. 
    def __init__(
        self, 
        in_channels : int
        ):
        super().__init__()

        C = in_channels
        r = C // 4
        self.globpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc1 = nn.Linear(C, r, bias=False)
        self.fc2 = nn.Linear(r, C, bias=False)
        self.relu = nn.ReLU() 
        self.hsigmoid = nn.Hardsigmoid()

    def forward(self, x: Tensor) -> Tensor:
        # x shape: [N, C, H, W].  
        f = self.globpool(x)
        f = torch.flatten(f,1)
        f = self.relu(self.fc1(f))
        f = self.hsigmoid(self.fc2(f))
        f = f[:,:,None,None]
        # f shape: [N, C, 1, 1]  

        scale = x * f
        return scale

# BNeck
class BNeck(nn.Module):
    # MobileNetV3 Block 
    def __init__(
        self,
        in_channels : int,
        out_channels : int,
        kernel_size : int, 
        exp_size : int,
        se : bool, 
        act : torch.nn.modules.activation,
        stride : int
        ):
        super().__init__()

        self.add = in_channels == out_channels and stride == 1

        self.block = nn.Sequential(
            ConvBlock(in_channels, exp_size, 1, 1, act),
            ConvBlock(exp_size, exp_size, kernel_size, stride, act, exp_size),
            SeBlock(exp_size) if se == True else nn.Identity(),
            ConvBlock(exp_size, out_channels, 1, 1, act=nn.Identity())
        )

    def forward(self, x: Tensor) -> Tensor:
        res = self.block(x)
        if self.add:
            res = res + x

        return res

""" MobileNetV3 """
class MobileNetV3(nn.Module):
    def __init__(
        self,
        config_name : str,
        in_channels = 3,
        classes = 2
        ):
        super().__init__()
        config = self.config(config_name)

        # First convolution(conv2d) layer. 
        self.conv = ConvBlock(in_channels, 16, 3, 2, nn.Hardswish())
        # Bneck blocks in a list. 
        self.blocks = nn.ModuleList([])
        for c in config:
            kernel_size, exp_size, in_channels, out_channels, se, nl, s = c
            self.blocks.append(BNeck(in_channels, out_channels, kernel_size, exp_size, se, nl, s))
        
        # Classifier 
        last_outchannel = config[-1][3]
        last_exp = config[-1][1]
        out = 1280 if config_name == "large" else 1024
        self.classifier = nn.Sequential(
            ConvBlock(last_outchannel, last_exp, 1, 1, nn.Hardswish()),
            nn.AdaptiveAvgPool2d((1,1)),
            ConvBlock(last_exp, out, 1, 1, nn.Hardswish(), bn=False, bias=True),
            nn.Dropout(0.8),
            nn.Conv2d(out, classes, 1, 1)
        )
    
    def forward(self, x: Tensor) -> Tensor:
        x = self.conv(x)
        for block in self.blocks:
            x = block(x)

        x = self.classifier(x)
        return torch.flatten(x, 1)


    def config(self, name):
        HE, RE = nn.Hardswish(), nn.ReLU()
        # [kernel, exp size, in_channels, out_channels, SEBlock(SE), activation function(NL), stride(s)] 
        large = [
                [3, 16, 16, 16, False, RE, 1],
                [3, 64, 16, 24, False, RE, 2],
                [3, 72, 24, 24, False, RE, 1],
                [5, 72, 24, 40, True, RE, 2],
                [5, 120, 40, 40, True, RE, 1],
                [5, 120, 40, 40, True, RE, 1],
                [3, 240, 40, 80, False, HE, 2],
                [3, 200, 80, 80, False, HE, 1],
                [3, 184, 80, 80, False, HE, 1],
                [3, 184, 80, 80, False, HE, 1],
                [3, 480, 80, 112, True, HE, 1],
                [3, 672, 112, 112, True, HE, 1],
                [5, 672, 112, 160, True, HE, 2],
                [5, 960, 160, 160, True, HE, 1],
                [5, 960, 160, 160, True, HE, 1]
        ]

        small = [
                [3, 16, 16, 16, True, RE, 2],
                [3, 72, 16, 24, False, RE, 2],
                [3, 88, 24, 24, False, RE, 1],
                [5, 96, 24, 40, True, HE, 2],
                [5, 240, 40, 40, True, HE, 1],
                [5, 240, 40, 40, True, HE, 1],
                [5, 120, 40, 48, True, HE, 1],
                [5, 144, 48, 48, True, HE, 1],
                [5, 288, 48, 96, True, HE, 2],
                [5, 576, 96, 96, True, HE, 1],
                [5, 576, 96, 96, True, HE, 1]
        ]

        if name == "large": return large
        if name == "small": return small


# Pipieline

This section runs the actual predictions. First the YOLO model, then landmark predictions and image crops, and finally the food classification. 

## Face Detection

To view the selected images navigate to ./yolov5/runs/detect/pictures in your google drive. If any of the images look wrong you can delete the to remove them from further predictions. 

You can adjust the confidence value to save more or fewer images. With lower confidence the quality will decrease but more frames will be saved. 

Note: running the section of code will delete the previous predictions in pictures and crops folder. If you don't want this to happen comment out the section marked below.

In [None]:
#training and prediction code by Solawetz, J. [1]

%cd ./yolov5

#comment out these 4 lines of code if you don't want the images to be removed.
!rm -rf ./runs/detect/pictures
!rm -rf ./runs/detect/crops

os.mkdir('./runs/detect/pictures')
os.mkdir('./runs/detect/crops')

#                                                                      prediction confidence levels     path to video folder
!python detect.py --weights ../BlueTitDiet/models/aug_exp_v2.pt --img 416 --conf 0.4 --source ../gdrive/MyDrive/videos/

By running this code cell you can see the images selected and the number of visits detected.

In [None]:
# Read dictionary pkl file
%cd ..
with open('./yolov5/runs/detect/pictures/visits.pkl', 'rb') as fp:
    visits = pickle.load(fp)
    print('Visit dictionary')
    print(visits)

/content
Visit dictionary
{'81_2016-06-03 06-03-13-Q-M.AVI.png': {'visit': 1, 'frame': '81'}, '82_2016-06-03 06-03-13-Q-M.AVI.png': {'visit': 1, 'frame': '82'}, '83_2016-06-03 06-03-13-Q-M.AVI.png': {'visit': 1, 'frame': '83'}, '84_2016-06-03 06-03-13-Q-M.AVI.png': {'visit': 1, 'frame': '84'}, '85_2016-06-03 06-03-13-Q-M.AVI.png': {'visit': 1, 'frame': '85'}, '86_2016-06-03 06-03-13-Q-M.AVI.png': {'visit': 1, 'frame': '86'}, '93_2016-06-03 06-03-13-Q-M.AVI.png': {'visit': 1, 'frame': '93'}, '94_2016-06-03 06-03-13-Q-M.AVI.png': {'visit': 1, 'frame': '94'}, '96_2016-06-03 06-03-13-Q-M.AVI.png': {'visit': 1, 'frame': '96'}, '98_2016-06-03 06-03-13-Q-M.AVI.png': {'visit': 1, 'frame': '98'}, '110_2016-06-03 06-03-13-Q-M.AVI.png': {'visit': 1, 'frame': '110'}, '111_2016-06-03 06-03-13-Q-M.AVI.png': {'visit': 1, 'frame': '111'}, '112_2016-06-03 06-03-13-Q-M.AVI.png': {'visit': 1, 'frame': '112'}, '113_2016-06-03 06-03-13-Q-M.AVI.png': {'visit': 1, 'frame': '113'}, '114_2016-06-03 06-03-13-Q-

## Landmark Detection

To view the cropped images go to ./yolov5/runs/detect/crops . Again, the results can be removed if any of the crops are wrong. 

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = get_model(num_keypoints = 3, weights_path = './BlueTitDiet/models/keypointsrcnn_weights.pth')

In [None]:
crop_images('./yolov5/runs/detect/pictures/', './yolov5/runs/detect/crops/', 48, 42)

## Insect Classifier

This section loads the crops and makes prediction per each of the images. The visit dictionary is updated with preditions and then the most common value per visit is selected. 

In [None]:
# list to store files
test_img = []
test_path = './yolov5/runs/detect/crops/'

# Iterate directory
for path in os.listdir(test_path):
    # check if current path is a file
    if os.path.isfile(os.path.join(test_path, path)):
        test_img.append(path)
print(test_img)

test_dataset = ClassDataset(test_img, np.zeros(len(test_img)), test_path)
test_loader = torch.utils.data.DataLoader(
    test_dataset, batch_size=64, shuffle=False
)

['1479_2016-06-03 06-03-13-Q-M.AVI.png', '122_2016-06-03 06-03-13-Q-M.AVI.png', '131_2016-06-03 06-03-13-Q-M.AVI.png', '1476_2016-06-03 06-03-13-Q-M.AVI.png', '970_2016-06-03 06-03-13-Q-M.AVI.png', '1421_2016-06-03 06-03-13-Q-M.AVI.png', '94_2016-06-03 06-03-13-Q-M.AVI.png', '1464_2016-06-03 06-03-13-Q-M.AVI.png', '1428_2016-06-03 06-03-13-Q-M.AVI.png', '130_2016-06-03 06-03-13-Q-M.AVI.png', '82_2016-06-03 06-03-13-Q-M.AVI.png', '1465_2016-06-03 06-03-13-Q-M.AVI.png', '1477_2016-06-03 06-03-13-Q-M.AVI.png', '83_2016-06-03 06-03-13-Q-M.AVI.png', '81_2016-06-03 06-03-13-Q-M.AVI.png', '1481_2016-06-03 06-03-13-Q-M.AVI.png', '96_2016-06-03 06-03-13-Q-M.AVI.png', '85_2016-06-03 06-03-13-Q-M.AVI.png', '1483_2016-06-03 06-03-13-Q-M.AVI.png', '128_2016-06-03 06-03-13-Q-M.AVI.png', '1480_2016-06-03 06-03-13-Q-M.AVI.png', '112_2016-06-03 06-03-13-Q-M.AVI.png', '1463_2016-06-03 06-03-13-Q-M.AVI.png', '1425_2016-06-03 06-03-13-Q-M.AVI.png', '1461_2016-06-03 06-03-13-Q-M.AVI.png', '1431_2016-06-03 

In [None]:
name = "large"
rho = 1
res = int(rho * 45)

PATH = './BlueTitDiet/models/net_2class_valid_test_balanced.pth'
net = MobileNetV3(name)
net.load_state_dict(torch.load(PATH))

print(net(torch.rand(1, 3, res, res)).shape)


criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

torch.Size([1, 2])


In [None]:
dataiter = iter(test_loader)
images, labels = next(dataiter)
images = images.to(torch.float)
images = images.permute(0,3,1,2)

outputs = net(images)
prob = nnf.softmax(outputs, dim=1)
top_p, top_class = prob.topk(1, dim = 1)

correct = 0
total = 0
_, predicted = torch.max(outputs, 1)


This cell will run the voting system and print out visit numbers, what food was brought and the time in the video the visit occurs. 

In [None]:
for i in range(len(predicted)):
  visits[labels[i]]['food'] = int(predicted[i])

vote = {}
for item in visits.items():
  visit = item[1]['visit']
  if 'food' in item[1]:
    food = item[1]['food']
    if visit in vote.keys(): 
      vote[visit]['food'].append(food)
    else:
      vote[visit] = {'food' : [food], 'frame': item[1]['frame']}

for key in vote.keys():
    if most_common(vote[key]['food']) == 0:
      food_item = 'caterpillar'
    elif most_common(vote[key]['food']) == 1:
      food_item = 'insect'
    else:
      food_item = 'other'

    td = timedelta(seconds=(int(vote[key]['frame']) / 12))
    print('visit: ',  str(key) , 'food: ' ,food_item, 'time: ', td )

visit:  1 food:  insect time:  0:00:06.750000
visit:  2 food:  caterpillar time:  0:01:20.500000
visit:  3 food:  caterpillar time:  0:01:58.416667
visit:  4 food:  caterpillar time:  0:02:01.750000


 ## References

 1. Solawetz, J. and Nelson, J. (2020) How to train yolov5 on a custom dataset, Roboflow Blog. Roboflow Blog. Available at: https://blog.roboflow.com/how-to-train-yolov5-on-a-custom-dataset/ (Accessed: April 12, 2023). 

 2. P, A. (2022) How to train a custom keypoint detection model with pytorch, Medium. Medium. Available at: https://medium.com/@alexppppp/how-to-train-a-custom-keypoint-detection-model-with-pytorch-d9af90e111da (Accessed: April 12, 2023). 

 3. Balawejder, M. (2022) Mobilenetv3 , GitHub. Available at: https://github.com/maciejbalawejder/Deep-Learning-Collection/tree/main/ConvNets/MobileNetV3 (Accessed: April 12, 2023). 

