In [1]:
#Importing libraries
from __future__ import print_function, division
import glob
import pandas as pd
import os
import numpy as np
import torch
from skimage import io, transform
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
import torch.nn as nn
from torch.nn import Sequential
from torch.nn import functional as F
from torch import nn, optim
from sklearn.utils import resample
import cv2
import random
from sklearn.model_selection import train_test_split

# Ignore warnings
import warnings
warnings.filterwarnings("ignore")

#1. Загрузка данных

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
%cd '/content/drive/MyDrive/cats_dogs_dataset/'

/content/drive/MyDrive/cats_dogs_dataset


In [None]:
#Walk trough all files in the dataset folder and take their names
txt_files = glob.glob('*.txt')
#List for annotations
annotations = []
#For each image get its annotations (labels)
for file in txt_files:
    with open(file, 'rt') as fd:
        first_line = fd.readline().split()
    annotations.append([file[:-4]] + first_line)
#Create a dataframe with image name and its labels   
annotations = pd.DataFrame(annotations, columns = ['image_name', 'class', 'x_min', 'y_min', 'x_max', 'y_max'])
#Change classes values to 0 (cat) and 1 (dog) for binary classification
annotations['class'] = annotations['class'].astype(float) - 1
#Save the dataframe to the dataset folder
annotations.to_csv(r'/content/drive/MyDrive/cats_dogs_dataset/annotations.csv', index=False)

#2. Класс датасета

In [3]:
#Define a custom dataset class
"""   
       Arguments: 
                    df: pd.DataFrame file with names of images and their labels
                    root_dir: name of files directory
                    transform: images transformes
"""
class CatDogDataset(Dataset):

    def __init__(self, df, root_dir, transform=None):
        self.df = df
        self.root_dir = root_dir
        self.transform = transform  
    def __len__(self):
        return len(self.df)
    def __getitem__(self, idx):
        #Get image
        img_name = self.df.iloc[idx]['image_name'] + '.jpg'
        image = io.imread(img_name).astype('double')
        #For images not in RGB change to 3-channel format
        if image.ndim == 2:
            image = np.stack((image,)*3, axis=-1)
        #Get class label
        dog_score = np.array([self.df.iloc[idx, 1]]).astype('double')
        #Get bounding box coordinates
        coords = np.array([self.df.iloc[idx, 2:]]).astype('double')
        #Return an image and its class label and bbox coordinates
        sample = {'image': image, 'dog_score': dog_score, 'coords': coords}
        #Transform image and bbox
        if self.transform:
            sample = self.transform(sample)
        return sample

#3. Аугментации
В качестве аугментаций используются: \\
 1. Поворот на рандомный угол.
 2. Горизонтальное отражение (случайное).
 3. Скейлинг (в данной реализации в формате 240х240).
 4. Перевод данных в тензорное представление.

Также пиксели картинки приведены к диапазону [0, 1], bbox'ы -- к относительному положению на картинке. Нормализация не использовалась.

In [4]:
def rotate_im(image, angle):
    (h, w) = image.shape[:2]
    (cX, cY) = (w // 2, h // 2)
    M = cv2.getRotationMatrix2D((cX, cY), angle, 1.0)
    cos = np.abs(M[0, 0])
    sin = np.abs(M[0, 1])
    nW = int((h * sin) + (w * cos))
    nH = int((h * cos) + (w * sin))
    M[0, 2] += (nW / 2) - cX
    M[1, 2] += (nH / 2) - cY
    image = cv2.warpAffine(image, M, (nW, nH))
    return image

def get_corners(bboxes):

    width = (bboxes[:,2] - bboxes[:,0]).reshape(-1,1)
    height = (bboxes[:,3] - bboxes[:,1]).reshape(-1,1)
        
    x1 = bboxes[:,0].reshape(-1,1)
    y1 = bboxes[:,1].reshape(-1,1)
        
    x2 = x1 + width
    y2 = y1 
        
    x3 = x1
    y3 = y1 + height
        
    x4 = bboxes[:,2].reshape(-1,1)
    y4 = bboxes[:,3].reshape(-1,1)
        
    corners = np.hstack((x1,y1,x2,y2,x3,y3,x4,y4))
        
    return corners
        
def rotate_bbox(corners, angle, cx, cy, h, w):

    corners = corners.reshape(-1,2)
    corners = np.hstack((corners, np.ones((corners.shape[0],1), dtype = type(corners[0][0]))))
        
    M = cv2.getRotationMatrix2D((cx, cy), angle, 1.0)
        
    
    cos = np.abs(M[0, 0])
    sin = np.abs(M[0, 1])
        
    nW = int((h * sin) + (w * cos))
    nH = int((h * cos) + (w * sin))
    M[0, 2] += (nW / 2) - cx
    M[1, 2] += (nH / 2) - cy
    calculated = np.dot(M,corners.T).T
        
    calculated = calculated.reshape(-1,8)
    
    return calculated

def get_enclosing_box(corners):

    x_ = corners[:,[0,2,4,6]]
    y_ = corners[:,[1,3,5,7]]
    
    xmin = np.min(x_,1).reshape(-1,1)
    ymin = np.min(y_,1).reshape(-1,1)
    xmax = np.max(x_,1).reshape(-1,1)
    ymax = np.max(y_,1).reshape(-1,1)
        
    final = np.hstack((xmin, ymin, xmax, ymax,corners[:,8:]))
        
    return final

def bbox_area(bbox):
    return (bbox[:,2] - bbox[:,0])*(bbox[:,3] - bbox[:,1])

def clip_box(bbox, clip_box, alpha):

    ar_ = (bbox_area(bbox))
    x_min = np.maximum(bbox[:,0], clip_box[0]).reshape(-1,1)
    y_min = np.maximum(bbox[:,1], clip_box[1]).reshape(-1,1)
    x_max = np.minimum(bbox[:,2], clip_box[2]).reshape(-1,1)
    y_max = np.minimum(bbox[:,3], clip_box[3]).reshape(-1,1)
    
    bbox = np.hstack((x_min, y_min, x_max, y_max, bbox[:,4:]))
    
    delta_area = ((ar_ - bbox_area(bbox))/ar_)
    
    mask = (delta_area < (1 - alpha)).astype(int)
    
    bbox = bbox[mask == 1,:]


    return bbox

In [5]:
class Rescale(object):
    def __init__(self, output_size):
        assert isinstance(output_size, (int, tuple))
        self.output_size = output_size

    def __call__(self, sample):
        image, dog_score, coords = sample['image'], sample['dog_score'], sample['coords']
        h, w = image.shape[:2]
        if isinstance(self.output_size, int):
            new_h, new_w = self.output_size * h / w, self.output_size * w / h
        else:
            new_h, new_w = self.output_size
        new_h, new_w = int(new_h), int(new_w)
        img = transform.resize(image, (new_h, new_w))
        coords = coords * [new_w / w, new_h / h, new_w / w, new_h / h]
        return {'image': img, 'dog_score': dog_score, 'coords': coords}

class RandomHorizontalFlip(object):
    def __init__(self, p=0.5):
        self.p = p
    def __call__(self, sample):
        img, dog_score, bboxes = sample['image'], sample['dog_score'], sample['coords']
        img_center = np.array(img.shape[:2])[::-1]/2
        img_center = np.hstack((img_center, img_center))
        if random.random() < self.p:
            img =  img[:,::-1,:]
            bboxes[:,[0,2]] += 2*(img_center[[0,2]] - bboxes[:,[0,2]])
            box_w = abs(bboxes[:,0] - bboxes[:,2]) 
            bboxes[:,0] -= box_w
            bboxes[:,2] += box_w
        return {'image': img, 'dog_score': dog_score, 'coords': bboxes}

class RandomRotation(object):

    def __call__(self, sample):
        img, dog_score, bboxes = sample['image'], sample['dog_score'], sample['coords']
        angle = random.randint(-45, 45)
        w, h = img.shape[1], img.shape[0]
        cx, cy = w//2, h//2
        img = rotate_im(img, angle)
        corners = get_corners(bboxes)
        corners = np.hstack((corners, bboxes[:,4:]))
        corners[:,:8] = rotate_bbox(corners[:,:8], angle, cx, cy, h, w)
        new_bbox = get_enclosing_box(corners)
        scale_factor_x = img.shape[1] / w
        scale_factor_y = img.shape[0] / h
        img = cv2.resize(img, (w,h))
        new_bbox[:,:4] /= [scale_factor_x, scale_factor_y, scale_factor_x, scale_factor_y] 
        bboxes  = new_bbox
        bboxes = clip_box(bboxes, [0,0,w, h], 0.25)

        return {'image': img, 'dog_score': dog_score, 'coords': bboxes}

class ToTensor(object):
    def __call__(self, sample):
        image, dog_score, coords = sample['image'], sample['dog_score'], sample['coords']
        image = image.transpose((2, 0, 1))
        return {'image': torch.from_numpy(image).float(),
                'dog_score': torch.from_numpy(dog_score).float(),
                'coords': torch.from_numpy(coords).float()}

#4. Ребалансировка данных
Датасет содержит 1037 изображений кошек и 2348 изображений собак => problem of class imbalance.
Для решения этой проблемы нужно сделать upsample (oversample) картинок с кошками так, чтобы на выходе имелось одинаковое количкство данных обоих классов. Для того, чтобы сеть не переобучилась на одинаковых картинках кошек (одна и та же картинка может попасть в трейн и тест), а также для увеличения размера обучающей выборки, добавим аугментации.

In [None]:
df = pd.read_csv('/content/drive/MyDrive/cats_dogs_dataset/annotations.csv')
df_cats = df[df['class'] == 0]
df_dogs = df[df['class'] == 1]
#Upsample the small class to the size of the big class
df_cats_upsampled = resample(df_cats, replace=True, 
                                n_samples = len(df_dogs), 
                                random_state=42)
#Concatenate both datasets
df_upsampled = pd.concat([df_dogs, df_cats_upsampled])
#Save the dataset to the data folder
df_upsampled.to_csv(r'/content/drive/MyDrive/cats_dogs_dataset/df_upsampled.csv',
                    index=False)

#5. Подготовка датасетов для обучения и тестирования сети

In [6]:
df = pd.read_csv('/content/drive/MyDrive/cats_dogs_dataset/df_upsampled.csv')
#Split the data to train and test sets
train, test = train_test_split(df, test_size=0.2, random_state=42)
#Get the train set with train transforms
train_set = CatDogDataset(df=train,
                            root_dir='/content/drive/MyDrive/cats_dogs_dataset',
                            transform=transforms.Compose([
                                               Rescale((240, 240)),
                                               RandomHorizontalFlip(),
                                               RandomRotation(),
                                               ToTensor()]))
#Get the test set with test transforms
test_set = CatDogDataset(df=test,
                            root_dir='/content/drive/MyDrive/cats_dogs_dataset',
                            transform=transforms.Compose([
                                               Rescale((240, 240)),
                                               ToTensor()]))
#Get the train and test loader
train_loader = DataLoader(train_set, batch_size=64, shuffle=True, num_workers=2)
test_loader = DataLoader(test_set, batch_size=64, shuffle=False, num_workers=2)

#6. Реализация архитектуры сети
Я использовала transfer learning, модель -- Mobilenetv2, предобученная на ImageNet картинках, т. к. эта сеть достаточно простая в плане обучения, при этом дает неплохой результат + хорошо себя показала в Image Detection.

In [7]:
#Define a net class
class Mobilenetv2(nn.Module):
    def __init__(self):
        super(Mobilenetv2, self).__init__()
        #Choose MobileNetv2 pretrained on ImageNet
        self.model = torch.hub.load('pytorch/vision', 'mobilenet_v2', pretrained=True)
        #Rewrite the last (fully-connected) layers
        self.model.classifier = nn.Sequential(
            nn.Linear(in_features=self.model.classifier[1].in_features, out_features=128),
            nn.ReLU(inplace = True),
            nn.Dropout(0.3),
            nn.Linear(128, 64),
            nn.ReLU(inplace = True),
            nn.Dropout(0.3),
            #We need 5 outputs
            nn.Linear(64, 5),
        )   
        
    def forward(self, x):
        bs, _, _, _ = x.shape
        x = self.model.features(x)
        x = F.adaptive_avg_pool2d(x, 1).reshape(bs, -1)
        x = self.model.classifier(x)
        #Нужна вероятность классов для бинарной классификации
        dog_prob = torch.sigmoid(x[:, 0])
        coords = x[:, 1:]
        return {'dog_prob': dog_prob, 'coords': coords}

net = Mobilenetv2()

Using cache found in /root/.cache/torch/hub/pytorch_vision_master


In [None]:
#Model's architecture
net

Mobilenetv2(
  (model): MobileNetV2(
    (features): Sequential(
      (0): ConvBNActivation(
        (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU6(inplace=True)
      )
      (1): InvertedResidual(
        (conv): Sequential(
          (0): ConvBNActivation(
            (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False)
            (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
            (2): ReLU6(inplace=True)
          )
          (1): Conv2d(32, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        )
      )
      (2): InvertedResidual(
        (conv): Sequential(
          (0): ConvBNActivation(
            (0): Conv2d(16, 96, kernel_size=(1, 1), stride=(

#7. Обучение и тестирование сети.

In [8]:
#Use GPU if possible
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = net.to(device)
#Criterion for classification
criterion_binary = nn.BCELoss()
#Criterion for regression
criterion_multioutput = nn.MSELoss()
#Setting an optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001)
# A path to save the model
PATH = "model.pt"

In [9]:
#Функция для подсчета метрики IoU
def bb_intersection_over_union(boxA, boxB):
    boxA = boxA[0]
    boxB = boxB[0]
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])
    interArea = max(0, xB - xA + 1) * max(0, yB - yA + 1)
    boxAArea = (boxA[2] - boxA[0] + 1) * (boxA[3] - boxA[1] + 1)
    boxBArea = (boxB[2] - boxB[0] + 1) * (boxB[3] - boxB[1] + 1)
    iou = interArea / float(boxAArea + boxBArea - interArea)
    return iou

In [10]:
stats = {'epoch': [], 'train_loss': [], 'val_loss': [], 'acc': [], 'iou': []}
n_epochs = 10
val_loss_min = np.Inf
# Train the model
total_step = len(train_loader)
for epoch in range(n_epochs):
    batch_train_loss = []
    stats['epoch'].append(epoch)
    model.train()
    for i, sample in enumerate(train_loader):
        images = sample['image'].to(device)
        dog_prob = sample['dog_score'].to(device)
        coords = sample['coords'].to(device)
        images /= 255
        coords /= images.shape[2]
        optimizer.zero_grad()
        outputs = model(images)

        dog_prob_hat = outputs['dog_prob']
        coords_hat = outputs['coords']
        loss1 = criterion_binary(dog_prob_hat, dog_prob.squeeze().type(torch.float))
        loss2 = criterion_multioutput(coords_hat, coords.squeeze().type(torch.float))
        loss=loss1+loss2
        loss.backward()
        optimizer.step()
        batch_train_loss.append(loss.item())

        if (i+1) % 10 == 0:
            print ("Epoch {}, Step [{}/{}] Loss: {:.4f}"
                   .format(epoch, i+1, total_step, loss.item()))
    stats['train_loss'].append(batch_train_loss)

# Test the model
    model.eval()
    with torch.no_grad():
      correct = 0
      total = 0
      batch_val_loss = []
      batch_acc = []
      batch_iou = []
      for i, sample in enumerate(test_loader):
        images = sample['image'].to(device)
        dog_prob = sample['dog_score'].to(device)
        coords = sample['coords'].to(device)
        images /= 255
        coords /= images.shape[2]
        outputs = model(images)
        dog_prob_hat = outputs['dog_prob']
        coords_hat = outputs['coords']
        val_loss1 = criterion_binary(dog_prob_hat, dog_prob.squeeze().type(torch.float))
        val_loss2 = criterion_multioutput(coords_hat, coords.squeeze().type(torch.float))
        val_loss=val_loss1+val_loss2
        total += dog_prob.size(0)
        correct += (torch.round(dog_prob_hat) == dog_prob.squeeze()).sum().item()
        iou = bb_intersection_over_union(coords_hat, coords.squeeze())
        batch_acc.append(100 * correct / total)
        batch_iou.append(iou)
        if (i+1) % 10 == 0:
            print ("Epoch {}, Loss: {:.4f}"
                   .format(epoch, val_loss.item()))
        batch_val_loss.append(val_loss.item())
      stats['acc'].append(batch_acc)
      stats['iou'].append(batch_iou)
      stats['val_loss'].append(batch_val_loss)
      print('Mean accuracy of epoch on the test images: {} %'.\
            format(sum(stats['acc'][epoch])/len(stats['acc'][epoch])))
      print('Mean IoU of epoch on the test images: {} '.\
            format(sum(stats['iou'][epoch])/len(stats['iou'][epoch])))
      
      # Saving the model if validation loss decreased to avoid overfitting
      if val_loss < val_loss_min:
          torch.save(net.state_dict(), PATH)
          val_loss_min = val_loss
          print('Saving model...')

Epoch 0, Step [10/59] Loss: 0.4419
Epoch 0, Step [20/59] Loss: 0.4512
Epoch 0, Step [30/59] Loss: 0.3302
Epoch 0, Step [40/59] Loss: 0.2454
Epoch 0, Step [50/59] Loss: 0.3774
Epoch 0, Loss: 0.1044
Mean accuracy of epoch on the test images: 95.11320165625352 %
Mean IoU of epoch on the test images: 0.7191346883773804 
Saving model...
Epoch 1, Step [10/59] Loss: 0.2092
Epoch 1, Step [20/59] Loss: 0.1961
Epoch 1, Step [30/59] Loss: 0.2412
Epoch 1, Step [40/59] Loss: 0.2103
Epoch 1, Step [50/59] Loss: 0.1593
Epoch 1, Loss: 0.1219
Mean accuracy of epoch on the test images: 96.00341635930067 %
Mean IoU of epoch on the test images: 0.7010229825973511 
Epoch 2, Step [10/59] Loss: 0.2276
Epoch 2, Step [20/59] Loss: 0.1114
Epoch 2, Step [30/59] Loss: 0.1460
Epoch 2, Step [40/59] Loss: 0.1107
Epoch 2, Step [50/59] Loss: 0.0672
Epoch 2, Loss: 0.1297
Mean accuracy of epoch on the test images: 95.03343151381715 %
Mean IoU of epoch on the test images: 0.7096962332725525 
Saving model...
Epoch 3, Step 

#8. Инференс и оценка времени предсказания

In [45]:
import time
# Loading the model for inference
model = net.to(device)
model.load_state_dict(torch.load(PATH))
start_time = time.time()
model.eval()
with torch.no_grad():
  for i, sample in enumerate(test_loader):
    images = sample['image'].to(device)
    images /= 255
    outputs = model(images)
    dog_prob_hat = outputs['dog_prob']
    coords_hat = outputs['coords']
inference_time = time.time() - start_time
print("--- %s seconds ---" % (time.time() - start_time))

--- 29.862287282943726 seconds ---


In [48]:
sum_iou = sum(stats['iou'][8])
print('mIoU: {}%, Accuracy: {}%, {} seconds, {} train, {} test'.format( 
    round(sum_iou.item()/len(stats['iou'][8]) * 100, 2), 
    round(sum(stats['acc'][8])/len(stats['acc'][8]), 2), 
    round(inference_time, 2),
    len(train_set), 
    len(test_set)))

mIoU: 72.37%, Accuracy: 97.42%, 29.86 seconds, 3756 train, 940 test
