<a href="https://colab.research.google.com/github/haonguyenuet/person_reid/blob/master/osnet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install torchreid
!git clone https://github.com/KaiyangZhou/deep-person-reid.git

%cd deep-person-reid/
!conda create --name torchreid python=3.7
!conda activate torchreid

!pip install -r requirements.txt

!conda install pytorch torchvision cudatoolkit=9.0 -c pytorch

!python setup.py develop

In [None]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torch.nn import functional as F

from torchvision import transforms
import torchvision

import torchreid

import numpy as np

import matplotlib.pyplot as plt

from collections import namedtuple
from sklearn.metrics import classification_report
import unittest

# Basic Layer, Residual Block, OSNet

## Conv1x1 

In [None]:
class Conv1x1(nn.Module):
    """ 1x1 Conv -> Batch norm -> ReLU """

    def __init__(self, c_in, c_out, stride=1):
        super().__init__()
        self.conv = nn.Conv2d(c_in, c_out, 1, stride=stride, padding=0, bias=False)
        self.bn = nn.BatchNorm2d(c_out)
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        x = self.relu(x)
        return x


class Conv1x1Linear(nn.Module):
    """ 1x1 Conv -> Batch norm """
 
    def __init__(self, c_in, c_out, stride=1):
        super().__init__()
        self.conv = nn.Conv2d(c_in, c_out, 1, stride=stride, padding=0, bias=False)
        self.bn = nn.BatchNorm2d(c_out)

    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        return x

## LightConv3x3

In [None]:
class LightConv3x3(nn.Module):
    """ 1x1 Conv -> DW 3x3 Conv -> Batch norm -> ReLU """

    def __init__(self, c_in, c_out):
        super().__init__()
        self.conv1 = nn.Conv2d(c_in, c_out, 1, stride=1, padding=0, bias=False)
        self.depthwise = nn.Conv2d(
            c_out, c_out, 3, stride=1, padding=1, bias=False, groups=c_out
				)
        self.bn = nn.BatchNorm2d(c_out)
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        x = self.conv1(x)
        x = self.depthwise(x)
        x = self.bn(x)
        x = self.relu(x)
        return x

## ConvLayer

In [None]:
class ConvLayer(nn.Module):
    """Conv -> Batch norm -> ReLU"""

    def __init__(self, c_in, c_out, kernel_size, stride=1, padding=0, groups=1):
        super().__init__()
        self.conv = nn.Conv2d(c_in, c_out, kernel_size, stride=stride,
                              padding=padding, bias=False, groups=groups)

        self.bn = nn.BatchNorm2d(c_out)
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        x = self.relu(x)
        return x

## AggregationGate

In [None]:
class AggregationGate(nn.Module):

    def __init__(self, c_in):
        super().__init__()
        reduction = 16
        self.global_avgpool = nn.AdaptiveAvgPool2d(1)
        self.fc1 = nn.Conv2d(c_in, c_in//reduction, kernel_size=1, bias=True, padding=0)
        self.relu = nn.ReLU(inplace=True)
        self.fc2 = nn.Conv2d(c_in//reduction, c_in, kernel_size=1, bias=True, padding=0)
        self.activation = nn.Sigmoid()
        
    def forward(self, x):
        output = self.global_avgpool(x)
        output = self.fc1(output)
        output = self.relu(output)
        output = self.fc2(output)
        output = self.activation(output)
        return output * x

## OSResBlock

In [None]:
class OSResBlock(nn.Module):
    def __init__(self, c_in, c_out, bottleneck_reduction=4, **kwargs):
        super().__init__()
        c_mid = c_out // bottleneck_reduction
        self.conv1 = Conv1x1(c_in, c_mid)
        self.conv2a = LightConv3x3(c_mid, c_mid)
        self.conv2b = nn.Sequential(
            LightConv3x3(c_mid, c_mid),
            LightConv3x3(c_mid, c_mid),
        )
        self.conv2c = nn.Sequential(
            LightConv3x3(c_mid, c_mid),
            LightConv3x3(c_mid, c_mid),
            LightConv3x3(c_mid, c_mid),
        )
        self.conv2d = nn.Sequential(
            LightConv3x3(c_mid, c_mid),
            LightConv3x3(c_mid, c_mid),
            LightConv3x3(c_mid, c_mid),
            LightConv3x3(c_mid, c_mid),
        )
        self.gate = AggregationGate(c_mid)
        self.conv3 = Conv1x1Linear(c_mid, c_out)
        self.downsample = None
        if c_in != c_out:
            self.downsample = Conv1x1Linear(c_in, c_out)
			
    def forward(self, x):
        residual = self.conv1(x)
        x_a = self.conv2a(residual)
        x_b = self.conv2b(residual)
        x_c = self.conv2c(residual)
        x_d = self.conv2d(residual)
        residual = self.gate(x_a) + self.gate(x_b) + self.gate(x_c) + self.gate(x_d)
        residual = self.conv3(residual)
        if self.downsample is not None:
            x = self.downsample(x)
        out = x + residual
        return F.relu(out)

## OSNet

In [None]:
class OSNet(nn.Module):
    def __init__(self,
								num_classes = 100, 
								blocks=[OSResBlock, OSResBlock, OSResBlock], 
								layers=[2, 2, 2],
                channels=[64, 256, 384, 512],
								feature_dim=512,
								loss='softmax'):
        super().__init__()
        self.loss = loss

        # convolutional backbone
        self.conv1 = ConvLayer(3, channels[0], 7, stride=2, padding=3)
        self.maxpool = nn.MaxPool2d(3, stride=2, padding=1)
        self.conv2 = self._make_layer(blocks[0], layers[0], channels[0], channels[1])
        self.tran1 =  nn.Sequential(
                    Conv1x1(channels[1], channels[1]),
                    nn.AvgPool2d(2, stride=2)
        )
        self.conv3 = self._make_layer(blocks[1], layers[1], channels[1], channels[2])
        self.tran2 =  nn.Sequential(
                    Conv1x1(channels[2], channels[2]),
                    nn.AvgPool2d(2, stride=2)
        )
        self.conv4 = self._make_layer(blocks[2], layers[2], channels[2], channels[3])
        self.conv5 = Conv1x1(channels[3], channels[3])
        self.global_avgpool = nn.AdaptiveAvgPool2d(1)
        # fully connected layer
        self.fc = nn.Linear(feature_dim, num_classes)

    def _make_layer(self, block, num_layers, c_in, c_out):
        layers = []

        layers.append(block(c_in, c_out))
        for i in range(1, num_layers):
            layers.append(block(c_out, c_out))

        return nn.Sequential(*layers)

    def featuremaps(self, x):
        x = self.conv1(x)
        x = self.maxpool(x)
        x = self.conv2(x)
        x = self.tran1(x)
        x = self.conv3(x)
        x = self.tran2(x)
        x = self.conv4(x)
        x = self.conv5(x)
        return x

    def forward(self, x):
        x = self.featuremaps(x)
        v = self.global_avgpool(x)
        v = v.view(v.size(0), -1)
        y = self.fc(v)
        return y

# Unit Tests

## TestConv1x1

In [None]:
class TestConv1x1(unittest.TestCase):
  def test_ouput(self):
    layer1 = Conv1x1(c_in=3, c_out = 1)
    layer2 = Conv1x1Linear(c_in=3, c_out = 1)
    input = torch.randn(1, 3, 256, 128)
    output1 = layer1(input)
    output2 = layer2(input)

    self.assertEqual(output1.shape, (1, 1, 256, 128))
    self.assertEqual(output2.shape, (1, 1, 256, 128))


## TestLightConv3x3

In [None]:
class TestLightConv3x3(unittest.TestCase):
  def test_ouput(self):
    layer = LightConv3x3(c_in=10, c_out = 5)
    input = torch.randn(1, 10, 256, 128)
    output = layer(input)

    self.assertEqual(output.shape, (1, 5, 256, 128))

## TestConvLayer

In [None]:
class TestConvLayer(unittest.TestCase):
  def test_ouput(self):
    layer1 = ConvLayer(c_in=10, c_out = 5, kernel_size=5, padding = 0)
    layer2 = ConvLayer(c_in=10, c_out = 8, kernel_size=5, padding = 1)
    layer3 = ConvLayer(c_in=10, c_out = 10, kernel_size=5, padding = 2)

    input = torch.randn(1, 10, 256, 128)
    output1 = layer1(input)
    output2 = layer2(input)
    output3 = layer3(input)

    self.assertEqual(output1.shape, (1, 5, 252, 124))
    self.assertEqual(output2.shape, (1, 8, 254, 126))
    self.assertEqual(output3.shape, (1, 10, 256, 128))

## TestOSResBlock

In [None]:
class TestOSResBlock(unittest.TestCase):
  def test_ouput(self):
    layer1 = OSResBlock(c_in=64, c_out = 128, bottleneck_reduction = 4)
    layer2 = OSResBlock(c_in=128, c_out = 256, bottleneck_reduction = 2)

    input = torch.randn(1, 64, 64, 32)
    output1 = layer1(input)
    output2 = layer2(output1)

    self.assertEqual(output1.shape, (1, 128, 64, 32))
    self.assertEqual(output2.shape, (1, 256, 64, 32))

## TestOSNet

In [None]:
class TestOSNet(unittest.TestCase):
  def test_ouput(self):
    model = OSNet(num_classes=10, blocks=[OSResBlock, OSResBlock, OSResBlock], layers=[2, 2, 2], channels=[64, 256, 384, 512], feature_dim=512, loss='softmax')

    input = torch.randn(1, 3, 256, 128)
    stage1 = model.conv1(input)
    stage1 = model.maxpool(stage1)

    stage2 = model.conv2(stage1)
    stage2 = model.tran1(stage2)

    stage3 = model.conv3(stage2)
    stage3 = model.tran2(stage3)

    stage4 = model.conv4(stage3)
    stage5 = model.conv5(stage4)

    feature = model.featuremaps(input)
    output = model(input)

    self.assertEqual(model.loss, 'softmax')
    self.assertEqual(stage1.shape, (1, 64, 64, 32))
    self.assertEqual(stage2.shape, (1, 256, 32, 16))
    self.assertEqual(stage3.shape, (1, 384, 16, 8))
    self.assertEqual(stage4.shape, (1, 512, 16, 8))
    self.assertEqual(stage5.shape, (1, 512, 16, 8))
    self.assertEqual(feature.shape, stage5.shape)
    self.assertEqual(output.shape, (1, 10))

# Run test

In [None]:
unittest.main(argv=[''], verbosity=2, exit=False)

test_ouput (__main__.TestConv1x1) ... ok
test_ouput (__main__.TestConvLayer) ... ok
test_ouput (__main__.TestLightConv3x3) ... ok
  return torch.max_pool2d(input, kernel_size, stride, padding, dilation, ceil_mode)
ok
test_ouput (__main__.TestOSResBlock) ... ok

----------------------------------------------------------------------
Ran 5 tests in 1.115s

OK


<unittest.main.TestProgram at 0x7fee41906250>

# Prepare Data

In [None]:
QueryGallery = namedtuple('QueryGallery', ['query', 'gallery'])
TrainTest = namedtuple('TrainTest', ['train', 'test'])

def prepare_data():
    datamanager = torchreid.data.ImageDataManager(
      root='./data',
      sources='market1501',
      targets='market1501',
      height=256,
      width=128,
      batch_size_train=32,
      batch_size_test=32,
      transforms=['random_flip', 'random_crop']
    )

    return datamanager

def prepare_loader(datamanager):
    trainloader = datamanager.train_loader
    testloader = datamanager.test_loader.get('market1501')
    queryloader = testloader.get('query')
    galleryloader = testloader.get('gallery')
    
    return TrainTest(train = trainloader, test = QueryGallery(query = queryloader, gallery = galleryloader))

def parse_data_for_train(data):
    imgs = data['img']
    pids = data['pid']
    return imgs, pids

def parse_data_for_eval(data):
    imgs = data['img']
    pids = data['pid']
    camids = data['camid']
    return imgs, pids, camids

# Calc Accuracy

In [None]:
def accuracy(output, target, topk=(1, )):
    maxk = max(topk)
    batch_size = target.size(0)

    if isinstance(output, (tuple, list)):
        output = output[0]

    _, pred = output.topk(1, dim = 1, largest = True, sorted = True)
    pred = pred.t()
    correct = pred.eq(target.view(1, -1).expand_as(pred))

    res = []
    for k in topk:
        correct_k = correct[:k].view(-1).float().sum(0, keepdim=True)
        acc = correct_k.mul_(100.0 / batch_size)
        res.append(acc)

    return res

# Traning

In [None]:
def forward_backward(model, data, criterion, optimizer, use_gpu = True):
    imgs, pids = parse_data_for_train(data)
    
    if use_gpu:
        imgs, pids = imgs.cuda(), pids.cuda()

    outputs = model(imgs)
    loss = criterion(outputs, pids)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    acc = accuracy(outputs, pids)[0]

    return loss.item(), acc.item()


def train_epoch(epoch, model, trainloader, criterion, optimizer, use_gpu = True):
    model.train()
    running_loss = 0.0
    running_acc = 0.0
    reporting_steps = 10

    for i, data in enumerate(trainloader):
      loss, acc = forward_backward(model, data, criterion, optimizer)

      running_loss += loss
      running_acc += acc
      if (i + 1) % reporting_steps == 0:
        print('[%d, %5d]\t loss: %.3f\t accuracy: %.3f' 
              %(epoch + 1, i + 1, running_loss / reporting_steps, running_acc / reporting_steps))
        running_loss = 0.0
        running_acc  = 0.0

def main(PATH='./model.pth'):
    datamanager = prepare_data()  
    loaders = prepare_loader(datamanager)

    device = torch.device("cuda")
    model = OSNet(num_classes = datamanager.num_train_pids).to(device)

    criterion = torchreid.losses.CrossEntropyLoss(num_classes = datamanager.num_train_pids)
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)

    num_epoch = 20
    for epoch in range(num_epoch):
      train_epoch(epoch, model, loaders.train, criterion, optimizer)
      torch.save(model.state_dict(), PATH)

    return model

main()

Building train transforms ...
+ resize to 256x128
+ random flip
+ random crop (enlarge to 288x144 and crop 256x128)
+ to torch tensor of range [0, 1]
+ normalization (mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
Building test transforms ...
+ resize to 256x128
+ to torch tensor of range [0, 1]
+ normalization (mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
=> Loading train (source) dataset
=> Loaded Market1501
  ----------------------------------------
  subset   | # ids | # images | # cameras
  ----------------------------------------
  train    |   751 |    12936 |         6
  query    |   750 |     3368 |         6
  gallery  |   751 |    15913 |         6
  ----------------------------------------
=> Loading test (target) dataset


  cpuset_checked))


=> Loaded Market1501
  ----------------------------------------
  subset   | # ids | # images | # cameras
  ----------------------------------------
  train    |   751 |    12936 |         6
  query    |   750 |     3368 |         6
  gallery  |   751 |    15913 |         6
  ----------------------------------------


  **************** Summary ****************
  source            : ['market1501']
  # source datasets : 1
  # source ids      : 751
  # source images   : 12936
  # source cameras  : 6
  target            : ['market1501']
  *****************************************


[1,    10]	 loss: 6.651	 accuracy: 0.000
[1,    20]	 loss: 6.616	 accuracy: 0.625
[1,    30]	 loss: 6.571	 accuracy: 0.938
[1,    40]	 loss: 6.561	 accuracy: 0.312
[1,    50]	 loss: 6.538	 accuracy: 0.625
[1,    60]	 loss: 6.547	 accuracy: 0.312
[1,    70]	 loss: 6.472	 accuracy: 0.625
[1,    80]	 loss: 6.550	 accuracy: 1.562
[1,    90]	 loss: 6.499	 accuracy: 0.938
[1,   100]	 loss: 6.398	 accuracy: 1.562
[1, 

OSNet(
  (conv1): ConvLayer(
    (conv): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
  )
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (conv2): Sequential(
    (0): OSResBlock(
      (conv1): Conv1x1(
        (conv): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
      )
      (conv2a): LightConv3x3(
        (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (depthwise): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=64, bias=False)
        (bn): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
      )
      (conv2b): Sequential(
  