In [0]:
import urllib.request
import matplotlib.pyplot as plt
from PIL import Image
import cv2
import torch
import pandas
import numpy as np
import json
import torch.nn as nn
from typing import Tuple
from torch.nn import Conv2d, Sequential, ModuleList, BatchNorm2d
import math
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim


In [0]:
class TrainDataset(Dataset):
    def __init__(self, json_file):
        train = json.load(open(json_file))
        self.links = []
        self.label = []
        for item in train:
            self.links.append(item['image'])
            self.label.append(item['class'])
        
    def __len__(self):
        return len(self.links)

    
    def __getitem__(self, idx):
        img = Image.open(urllib.request.urlopen(self.links[idx]))
        img = img.resize((128,128))
        transform = transforms.Compose([
            transforms.Resize(128),
            transforms.ToTensor(),
            transforms.Normalize(mean=(0.5,), std=(0.5,))
        ])
        img_t = transform(img)
        return {'image': img_t, 
                'label': torch.tensor(self.label[idx]).float()}

class TestDataset(Dataset):
    def __init__(self, csv_file):
        test = pandas.read_csv('dataset_test_ids.csv')
        self.links = []
        self.label = []
        for i in range(len(test)):
            self.links.append(test.iloc[i, 0])
            self.label.append(test.iloc[i, 1])
        
    def __len__(self):
        return len(self.links)

    
    def __getitem__(self, idx):
        img = Image.open(urllib.request.urlopen(self.links[idx]))
        img = img.resize((128,128))
        transform = transforms.Compose([
            transforms.Resize(128),
            transforms.ToTensor(),
            transforms.Normalize(mean=(0.5,), std=(0.5,))
        ])
        img_t = transform(img)
        return {'image': img_t, 
                'label': torch.tensor(self.label[idx]).float(),
                'url': self.links[idx]}



In [0]:
train_loader = DataLoader(TrainDataset('dataset_train.json'), num_workers=2, batch_size=20, shuffle=True)
test_loader = DataLoader(TestDataset('dataset_test_ids.csv'), num_workers=2, batch_size=1, shuffle=False)

In [0]:
def SeperableConv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0):
    return Sequential(
        Conv2d(in_channels=in_channels, out_channels=in_channels, kernel_size=kernel_size,
               groups=in_channels, stride=stride, padding=padding),
        BatchNorm2d(in_channels),
        nn.ReLU6(),
        Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1),
    )

def conv_bn(inp, oup, stride, use_batch_norm=True):
    if use_batch_norm:
        return nn.Sequential(
            nn.Conv2d(inp, oup, 3, stride, 1, bias=False),
            nn.BatchNorm2d(oup),
            nn.ReLU6(inplace=True)
        )
    else:
        return nn.Sequential(
            nn.Conv2d(inp, oup, 3, stride, 1, bias=False),
            nn.ReLU6(inplace=True)
        )


def conv_1x1_bn(inp, oup, use_batch_norm=True):
    if use_batch_norm:
        return nn.Sequential(
            nn.Conv2d(inp, oup, 1, 1, 0, bias=False),
            nn.BatchNorm2d(oup),
            nn.ReLU6(inplace=True)
        )
    else:
        return nn.Sequential(
            nn.Conv2d(inp, oup, 1, 1, 0, bias=False),
            nn.ReLU6(inplace=True)
        )


class InvertedResidual(nn.Module):
    def __init__(self, inp, oup, stride, expand_ratio, use_batch_norm=True):
        super(InvertedResidual, self).__init__()

        self.stride = stride
        assert stride in [1, 2]

        hidden_dim = round(inp * expand_ratio)
        self.use_res_connect = self.stride == 1 and inp == oup

        if expand_ratio == 1:
            if use_batch_norm:
                self.conv = nn.Sequential(
                    # dw
                    nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False),
                    nn.BatchNorm2d(hidden_dim),
                    nn.ReLU6(inplace=True),
                    # pw-linear
                    nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
                    nn.BatchNorm2d(oup),
                )
            else:
                self.conv = nn.Sequential(
                    # dw
                    nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False),
                    nn.ReLU6(inplace=True),
                    # pw-linear
                    nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
                )
        else:
            if use_batch_norm:
                self.conv = nn.Sequential(
                    # pw
                    nn.Conv2d(inp, hidden_dim, 1, 1, 0, bias=False),
                    nn.BatchNorm2d(hidden_dim),
                    nn.ReLU6(inplace=True),
                    # dw
                    nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False),
                    nn.BatchNorm2d(hidden_dim),
                    nn.ReLU6(inplace=True),
                    # pw-linear
                    nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
                    nn.BatchNorm2d(oup),
                )
            else:
                self.conv = nn.Sequential(
                    # pw
                    nn.Conv2d(inp, hidden_dim, 1, 1, 0, bias=False),
                    nn.ReLU6(inplace=True),
                    # dw
                    nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False),
                    nn.ReLU6(inplace=True),
                    # pw-linear
                    nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
                )

    def forward(self, x):
        if self.use_res_connect:
            return x + self.conv(x)
        else:
            return self.conv(x)


In [0]:
class MyNet(nn.Module):
    def __init__(self):
        super(MyNet, self).__init__()
        self.conv = nn.Sequential(InvertedResidual(1, 4, 1, 1),
                                  InvertedResidual(4, 16, 2, 2),
                                  InvertedResidual(16, 32, 2, 2),
                                  InvertedResidual(32, 32, 1, 2),
                                  InvertedResidual(32, 48, 2, 2),
                                  InvertedResidual(48, 48, 1, 2),
                                  InvertedResidual(48, 72, 2, 2),
                                  InvertedResidual(72, 72, 1, 2),
                                  InvertedResidual(72, 64, 2, 2),
                                  InvertedResidual(64, 64, 1, 2),
                                  InvertedResidual(64, 64, 2, 2),
                                  conv_bn(64, 1, 2, use_batch_norm=False),
                                  nn.Sigmoid())
    
    def forward(self, x):
        return self.conv(x)
    
    def predict(self, x):
        pred = self.forward(x)
        ans = []
        for t in pred:
            if t > torch.tensor(0.5).float():
                ans.append(1)
            else:
                ans.append(0)
        return torch.tensor(ans).float().to(torch.device("cuda:0" if torch.cuda.is_available() else "cpu"))

In [0]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
criterion = nn.BCELoss(reduction='mean')
net = MyNet()
optimizer = optim.Adam(net.parameters())

In [8]:
net.to(device)
net.train()
for epoch in range(10):
    running_loss = 0.0
    epoch_loss = 0.0
    for i, data in enumerate(train_loader, 0):
        inputs = data['image'].to(device)
        labels = data['label'].to(device)

        optimizer.zero_grad()

        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        epoch_loss += loss.item()
        if i % 40 == 39:
            print(f'[{epoch + 1}, {i + 1}] loss: {running_loss/40}')
            running_loss = 0.0
    print(epoch_loss/len(train_loader))

print('Finished Training')

  return F.binary_cross_entropy(input, target, weight=self.weight, reduction=self.reduction)


[1, 40] loss: 0.5768156841397285
[1, 80] loss: 0.5853825502097607
0.5798657042905688
[2, 40] loss: 0.5679561994969845
[2, 80] loss: 0.5271564804017543
0.5487073042119542
[3, 40] loss: 0.5225200951099396
[3, 80] loss: 0.5177174039185047
0.5145597690716386
[4, 40] loss: 0.5102825343608857
[4, 80] loss: 0.5020036026835442
0.5118848324442903
[5, 40] loss: 0.4949504464864731
[5, 80] loss: 0.5030071698129177
0.49874237397064763
[6, 40] loss: 0.47536383792757986
[6, 80] loss: 0.5121039278805256
0.5032587358728051
[7, 40] loss: 0.49782843217253686
[7, 80] loss: 0.489799927175045
0.4938807946940263
[8, 40] loss: 0.4849592611193657
[8, 80] loss: 0.4776630409061909
0.4825567655886213
[9, 40] loss: 0.46373102962970736
[9, 80] loss: 0.4978439949452877
0.4808033263931672
[10, 40] loss: 0.4721595004200935
[10, 80] loss: 0.47924825735390186
0.48588932569449145
Finished Training


In [0]:
torch.save(net.state_dict(), './net.pth')

In [12]:
net.load_state_dict(torch.load('./net.pth'))

<All keys matched successfully>

In [25]:
test = TestDataset('dataset_test_ids.csv')
pos = 0
prediction = open('prediction.csv', 'w')
prediction.write('Id,Predicted\n')
for i, data in enumerate(test_loader, 0):
    inputs = data['image'].to(device)
    labels = data['label'].to(device)
    url = data['url']
    outputs = net.predict(inputs)
    prediction.write(f'{url},{np.int(outputs.item())}\n')
    if abs(outputs - labels) < 0.5:
        pos += 1
print(f'Accuracy: {pos/len(test)}')


Accuracy: 0.8214285714285714


In [14]:
import time
test = TestDataset('dataset_test_ids.csv')
pos = 0
for i, data in enumerate(test_loader, 0):
    input = data['image'].to(device)
    break
start = time.time()
for i in range(10000):
    net.predict(input)
end = time.time()
print(f'FPS: {10000/(end-start)}')
    

FPS: 130.33482014779935


In [21]:
print(net.predict(input).item())

0.0
