In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as data_
import os
import numpy as np
import torchvision.transforms as transforms
import torch.optim as optim
from PIL import Image
from torch.autograd import Variable

In [2]:
class BottleNet(nn.Module):
    def __init__(self, inplane, plane, stride=1, downsample=None):
        exposion = 4
        
        super(BottleNet, self).__init__()
        self.conv1 = nn.Conv2d(inplane, plane, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(plane)
        self.conv2 = nn.Conv2d(plane, plane, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(plane)
        self.conv3 = nn.Conv2d(plane, exposion*plane, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(exposion*plane)
        self.relu = nn.ReLU(inplace=True)
        
        self.downsample = downsample
        
    def forward(self, x):
        resduial = x
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.conv3(x)
        x = self.bn3(x)
        
        
        if self.downsample:
            resduial = self.downsample(resduial)
        
        output = x + resduial
        output = self.relu(output)
        return output

class ResNet(nn.Module):
    def __init__(self, res=(64,128,256,512), blocks=(3,4,6,3), n_classes=1000):
        super(ResNet, self).__init__()
        self.n_classes = n_classes
        self.inplane = res[0]
        self.pre_layer1 = nn.Conv2d(3, res[0], kernel_size=7, stride=2, padding=3)
        self.pre_layer2 = nn.MaxPool2d(kernel_size=3, stride=2)
        #print(blocks[0])
        self.layer1 = self._make_layer(res[0], blocks[0])
        self.layer2 = self._make_layer(res[1], blocks[1],2)
        self.layer3 = self._make_layer(res[2], blocks[2],2)
        self.layer4 = self._make_layer(res[3], blocks[3],2)
        self.linear1 = nn.Linear(2048, 1024)
        self.linear2 = nn.Linear(1024, self.n_classes)
    
        
    def forward(self,x):
        output = self.pre_layer1(x)
        output = self.pre_layer2(output)
        output = self.layer1(output)
        output = self.layer2(output)
        output = self.layer3(output)
        output = self.layer4(output)
        #print(output.shape)
        output = F.avg_pool2d(output, kernel_size=(output.shape[1], output.shape[2]))
        output = torch.flatten(output, start_dim=1, end_dim=-1)
        #print(output.shape[1])
        #print(type(output))
        output = self.linear1(output)
        output = F.relu(output)
        output = self.linear2(output)
        return output
        
        
    def _make_layer(self, plane, blocks, stride=1):
        exposion = 4
        layers = []
        if stride != 1 or self.inplane != plane*exposion:
            downsample = nn.Sequential(nn.Conv2d(self.inplane, plane*exposion, kernel_size=1, stride=stride, bias=False),
                                       nn.BatchNorm2d(plane*exposion),
                                      nn.ReLU(inplace=True))
        layers.append(BottleNet(self.inplane, plane, stride, downsample))
        self.inplane = plane*exposion
        for i in range(blocks):
            layers.append(BottleNet(self.inplane, plane))
        return nn.Sequential(*layers)

In [3]:
def create_data(data_path):
    data_list = list()
    for path in os.listdir(data_path):
        img_path = os.path.join(data_path, path)
        label = str(path)[0]
        data_list.append((img_path, label))
    return data_list

def create_transform(split):
    trans = list()
    if split == 'train' or split == 'val':
        trans.append(transforms.Resize(224))
        trans.append(transforms.ToTensor())
        trans.append(transforms.Normalize(mean=[0.5467658 ,0.56146331 , 0.2991529 ], std=[0.32788563 ,0.2752062 ,0.32276475]))
        trans = transforms.Compose(trans)
    else:
        trans.append(transforms.Resize(224))
        trans.append(transforms.ToTensor())
        trans.append(transforms.Normalize(mean=[0.5467658 ,0.56146331 , 0.2991529 ], std=[0.32788563 ,0.2752062 ,0.32276475]))
        trans = transforms.Compose(trans)
    return trans
    

class Flo_Dataset(data_.Dataset):
    def __init__(self, data_path,split='train'):
        self.classes = ['L','W','Y']
        self.n_classes = 3
        self.data_path = data_path
        self.data_list = create_data(data_path)
        self.split = split
        self.trans = create_transform(split)
        
    def __getitem__(self, idx):
        data = Image.open(self.data_list[idx][0])
        data = self.trans(data)
        if self.split == 'train' or self.split == 'val':
            label = torch.LongTensor([self.classes.index(self.data_list[idx][1])]).squeeze()
            #print(label.shape)
            return (data,label)
        else:
            return data

    def __len__(self):
        return len(self.data_list)

In [4]:
def eval(model, data_iter, device=None):
    model.eval()
    num = 0
    acc_sum = 0
    test_loss = 0
    test_total_loss = 0
    test_acc = 0
    for i, (data,label) in enumerate(data_iter):
        if device:
            data, label = data.to(device), label.to(device)
        pred = model(data)
        loss = F.cross_entropy(pred,label)
        num += len(label)
        #print(pred.shape, label.shape)
        #print(pred, label)
        #print(pred.argmax(1))
        #print((pred.argmax(1) == label).float())
        acc_sum += sum((pred.argmax(1) == label).float())
        #print(acc_sum)
        test_loss += loss.data
    test_total_loss = test_loss/len(data_iter)
    acc = acc_sum/num
    return test_total_loss, acc

In [5]:
device = torch.device('cuda' if torch.cuda.is_available() else None)
#device = None

batch_size = 4
num_worker = 0
epochs = 20
load_path = None
train_path = r'E:\flower_recognition\img_files'
test_path = r'E:\flower_recognition\test'
train_data = Flo_Dataset(train_path, 'train')
val_data = Flo_Dataset(test_path, 'val')

train_iter = data_.DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=num_worker)
val_iter = data_.DataLoader(val_data, batch_size=batch_size, shuffle=False, num_workers=num_worker)

model = ResNet(n_classes=3)
print(model)
#for name, param in model.named_parameters():
#    print(name, param)
torch.cuda.empty_cache()
if device:
    model.to(device)
if load_path:
    print('loading weight...')
    model.load_state_dict(torch.load(load_path))
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
best_acc = 0
#torch.save(model, r'E:\flower_recognition\{}.pth'.format('flower'))
#print(optimizer.parameters()[group])
for epoch in range(epochs):
    model.train()
    acc_sum = 0
    num = 0
    train_loss = 0
    leng = len(train_iter)
    for i, (data, label) in enumerate(train_iter):
        if device:
            data, label = data.to(device).cuda(), label.to(device).cuda()
        #print(data.shape, label.shape, label)
        #print(data.shape)
        optimizer.zero_grad()
        pred = model(data)
        #print(pred.shape)
        loss = F.cross_entropy(pred,label)
        loss.backward()
        optimizer.step()
        if i % 30 == 0:
            print('completed batch:%d/%d'%(i, leng))
        num += len(label)
        #print(pred.shape, label.shape)
        #print(pred, label)
        #print(pred.argmax(1))
        #print((pred.argmax(1) == label).float())
        acc_sum += sum((pred.argmax(1) == label).float())
        #print(acc_sum)
        train_loss += loss.data
    print('epoch:%d, train_loss:%f, acc:%f'%(epoch, train_loss/len(train_iter), acc_sum/num))
    with torch.no_grad():
        test_total_loss, test_acc = eval(model, val_iter, device=device)
    print('test_loss: %f, test_acc:%f'%(test_total_loss, test_acc))
    if test_acc > best_acc:
        best_acc = test_acc
        torch.save(model.state_dict(), r'E:\flower_recognition\resnet_{}.pth'.format('epoch%d'%epoch))

ResNet(
  (pre_layer1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3))
  (pre_layer2): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BottleNet(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU(

)
completed batch:0/93
completed batch:30/93
completed batch:60/93
completed batch:90/93
epoch:0, train_loss:0.946055, acc:0.547170
test_loss: 0.425746, test_acc:0.898990
completed batch:0/93
completed batch:30/93
completed batch:60/93
completed batch:90/93
epoch:1, train_loss:0.670935, acc:0.743935
test_loss: 0.272424, test_acc:0.898990
completed batch:0/93
completed batch:30/93
completed batch:60/93
completed batch:90/93
epoch:2, train_loss:0.560814, acc:0.805930
test_loss: 0.185994, test_acc:0.949495
completed batch:0/93
completed batch:30/93
completed batch:60/93
completed batch:90/93
epoch:3, train_loss:0.508431, acc:0.800539
test_loss: 0.283871, test_acc:0.909091
completed batch:0/93
completed batch:30/93
completed batch:60/93
completed batch:90/93
epoch:4, train_loss:0.434217, acc:0.849057
test_loss: 0.159502, test_acc:0.939394
completed batch:0/93
completed batch:30/93
completed batch:60/93
completed batch:90/93
epoch:5, train_loss:0.338366, acc:0.881402
test_loss: 0.179710, te

In [7]:
from sklearn.metrics import classification_report

prediction = list()
labels = list()
model.eval()
for i, (data, label) in enumerate(val_iter):
    data, label = data.to(device), label.to(device)
    with torch.no_grad():
        pred = model(data)
        #ipdb.set_trace()
        labels += (label.cpu().numpy().tolist())
        prediction += (pred.argmax(1).cpu().numpy().tolist())
target_names = val_iter.dataset.classes
print(classification_report(labels, prediction, target_names=target_names))

              precision    recall  f1-score   support

           L       1.00      0.97      0.98        33
           W       0.96      1.00      0.98        27
           Y       1.00      1.00      1.00        39

    accuracy                           0.99        99
   macro avg       0.99      0.99      0.99        99
weighted avg       0.99      0.99      0.99        99



In [8]:
from sklearn.metrics import confusion_matrix
confusion_matrix(labels, prediction, labels=list(range(len(val_iter.dataset.classes))))

array([[32,  1,  0],
       [ 0, 27,  0],
       [ 0,  0, 39]], dtype=int64)