In [264]:
import torch
import torchvision
from torchvision.models.detection.transform import GeneralizedRCNNTransform
from torchvision.models.detection.backbone_utils import mobilenet_backbone

from torchvision.models.detection.anchor_utils import AnchorGenerator
nn = torch.nn
import os
import numpy as np
from src.utils.audio_utils import compute_spectrogram, load_audio_file
from src.utils import path_utils


In [265]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

cpu


# Dataset and Dataloader

In [266]:
class OneShotDataset(torch.utils.data.Dataset):
    def __init__(self, detection_dir,transform_audio, device):
        self.detection_dir = detection_dir
        self.device = device
        self.samplepath = os.path.join(self.detection_dir, 'samples')
        self.targetpath = os.path.join(self.detection_dir, 'target')
        self.indexes = list(map(lambda x: x.split('.')[0],os.listdir(self.samplepath)))
        self.transform_audio = transform_audio
        self.transform_image = torchvision.transforms.Compose([torchvision.transforms.ToTensor()])
        self.spec_hight=120

    def __len__(self):
        return len(os.listdir((self.samplepath)))
        
    def __getitem__(self,idx):
        index=self.indexes[idx]
        data,fs = load_audio_file(os.path.join(self.samplepath,index +'.wav'))
        targets = np.load(os.path.join(self.targetpath,index +'.npy'))
        data = self.transform_audio(data)
        boxes=torch.as_tensor([[start,0,end,data.shape[0]] for [_,start,end] in targets ], dtype=torch.float64)
        labels=torch.as_tensor([target[0] for target in targets], dtype=torch.int64)
        target_dict={'boxes':boxes,'labels':labels}
        #target_dict=dict(filter(lambda x: x['boxes']!=[], target_dict.items()))
        # targets=[{'boxes':torch.as_tensor([start,0,end,data.shape[0]], dtype=torch.float32),'labels':lbl} for [lbl,start,end]in targets]
        x = self.transform_image(data)
        return x,target_dict

In [267]:
def transform_audio(data):
    _, _, specto = compute_spectrogram(data, 24000, nperseg=256, noverlap=256/2, scale="dB")
    # freq clip
    specto = specto[:120, :]
    return specto

def custom_collate(batch):
    return tuple(zip(*batch))

detection_dataloader = torch.utils.data.DataLoader(OneShotDataset(detection_dir=path_utils.get_detection_data_path(),
                                                    transform_audio=transform_audio, device=device),collate_fn=custom_collate,batch_size=2)

# Model

In [268]:

def rcnn_pretrained_backbone(num_classes : int, anchor_sizes : tuple, aspect_ratios : tuple, parameters : dict = {}):
    """
    Return a faster rcnn with a mobilenetv3 backbone.

    Args:
        num_classes (int): Number of classes expected to return (background should be taken into account)
        parameters (dict, optional): Dictionnary for the different following parameters. Defaults to {}.

    Returns:
        (torchvision.models.detection.faster_rcnn.FasterRCNN): Model implementation of pytorch

    """

    pretrained_backbone=True
    trainable_backbone_layers=6 # All backbone is trainable

    backbone = mobilenet_backbone("mobilenet_v3_large", pretrained_backbone, False, trainable_layers=trainable_backbone_layers)

    model = torchvision.models.detection.faster_rcnn.FasterRCNN(backbone,
                                                                num_classes,
                                                                rpn_anchor_generator=AnchorGenerator(anchor_sizes, aspect_ratios),
                                                                **parameters)

    # Custom transform ie no transform (only postprocessing)
    model.transform = GeneralizedRCNNTransform(min_size=489, max_size=2000, image_mean=[0, 0, 0], image_std=[1, 1, 1])
    return model

# Training

In [269]:
def rcnn_pretrained_backbone_train(num_classes : int, anchor_sizes : tuple, aspect_ratios : tuple, parameters : dict = {}):
    """
    Return a jit compiled faster rcnn with a mobilenetv3 backbone.
    """
    model = rcnn_pretrained_backbone(num_classes, anchor_sizes, aspect_ratios, parameters)
    model._has_warned = True # Remove warning about "RCNN always returns a (Losses, Detections) tuple in scripting"
    return torch.jit.script(model)

In [270]:
def loss_faster_rcnn(dict_losses : dict, training_rpn : bool, training_head : bool):
    """ Reduce the dictionnary of losses

    Args:
        dict_losses (dict): Dictionnary of losses
        training_rpn (bool): Bool to train the rpn
        training_head (bool): Bool to train the head

    Returns:
        (torch.Tensor): Loss
    """
    # Dict("loss_classifier", "loss_box_reg", "loss_objectness", "loss_rpn_box_reg")
    loss = torch.zeros((), dtype=torch.float32, device=device)
    if training_rpn:
        loss += dict_losses["loss_objectness"]
        loss += dict_losses["loss_rpn_box_reg"]
    if training_head:
        loss += dict_losses["loss_classifier"]
        loss += dict_losses["loss_box_reg"]
    return loss

In [271]:
def average_losses(dataloader, mean_loss):
    mean_loss["loss_objectness"] /= len(dataloader)
    mean_loss["loss_rpn_box_reg"] /= len(dataloader)
    mean_loss["loss_classifier"] /= len(dataloader)
    mean_loss["loss_box_reg"] /= len(dataloader)
    return mean_loss

def accumulate_losses(mean_loss, losses):
    mean_loss["loss_objectness"] += losses["loss_objectness"].item()
    mean_loss["loss_rpn_box_reg"] += losses["loss_rpn_box_reg"].item()
    mean_loss["loss_classifier"] += losses["loss_classifier"].item()
    mean_loss["loss_box_reg"] += losses["loss_box_reg"].item()

In [272]:
model = rcnn_pretrained_backbone_train(num_classes=2,anchor_sizes=((32, 64),),
                                    aspect_ratios=((0.5, 1.0, 2.0),))

# construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
sgd_optimizer = torch.optim.SGD(params, lr=0.005,
                            momentum=0.9, weight_decay=0.0005)

# and a learning rate scheduler which decreases the learning rate by
# 10x every 3 epochs
lr_scheduler = torch.optim.lr_scheduler.StepLR(sgd_optimizer,
                                               step_size=3,
                                               gamma=0.1)
loss_fn_frrcnn=lambda x:loss_faster_rcnn(x,True,True)
                                       


In [273]:
def train_loop(dataloader, model, loss_fn, optimizer, scheduler, macro_batch=1):
    # Initialize training
    model.train()
    optimizer.zero_grad()

    mean_loss = {"loss_objectness": 0., "loss_rpn_box_reg": 0., "loss_classifier": 0., "loss_box_reg": 0.}
    # Iterate over the dataset
    for batch, (X, targets) in enumerate(dataloader):
        # Work with the GPU if available
        X = list(x.to(device) for x in X)
        targets = list({k: v.to(device) for k, v in t.items()} for t in targets)
        # Compute prediction error
        losses = model(X, targets)[0]
        accumulate_losses(mean_loss, losses)
        loss = loss_fn(losses)
        print(loss)
        # Backpropagation
        loss.backward()
      
        if (batch+1) % macro_batch == 0 or batch == len(dataloader) - 1:
            optimizer.step()
            optimizer.zero_grad()
            scheduler.step()
        # Print metrics
        if batch % 30 == 0 or batch == len(dataloader) - 1:
            loss_value, current = loss.item(), (batch+1) * len(X)
            print(loss_value)
    return average_losses(dataloader, mean_loss)

In [274]:
train_loop(dataloader=detection_dataloader, model=model, loss_fn=loss_fn_frrcnn, optimizer=sgd_optimizer, scheduler=lr_scheduler, macro_batch=1)

dict_items([('boxes', tensor([[12547.,     0., 33913.,   120.],
        [ 2934.,     0., 34988.,   120.],
        [10122.,     0., 34808.,   120.],
        [ 9361.,     0., 39144.,   120.],
        [17833.,     0., 44625.,   120.],
        [12822.,     0., 42448.,   120.],
        [  226.,     0., 47950.,   120.],
        [ 1628.,     0., 43487.,   120.],
        [  279.,     0., 44493.,   120.],
        [ 3850.,     0., 43313.,   120.]], dtype=torch.float64)), ('labels', tensor([1, 1, 1, 1, 1, 1, 0, 0, 0, 0]))])
dict_items([('boxes', tensor([[ 1287.,     0., 33466.,   120.],
        [14715.,     0., 38766.,   120.],
        [19001.,     0., 39948.,   120.],
        [ 2932.,     0., 43095.,   120.],
        [  297.,     0., 42730.,   120.]], dtype=torch.float64)), ('labels', tensor([1, 1, 1, 0, 0]))])
tensor(2027.9116, grad_fn=<AddBackward0>)
2027.91162109375
dict_items([('boxes', tensor([[19043.,     0., 43425.,   120.],
        [18684.,     0., 39894.,   120.],
        [ 4423.,     0

KeyboardInterrupt: 