In [2]:
import torch
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torchvision import datasets, transforms
import torch.optim as optim
import matplotlib.pyplot as plt
import torch.utils.data as data
import os

In [3]:
TEST_DATA_PATH =  "./test"
TRAIN_DATA_PATH = "./train"

In [11]:
# data transform, you can add different transform methods
img_size = 224
train_transform_orig = transforms.Compose([
                                    transforms.Resize((img_size,img_size)),
                                    transforms.ToTensor(),
                                    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
                                    ])
train_transform_aug1 = transforms.Compose([
                                    transforms.RandomResizedCrop(224),
                                    transforms.RandomHorizontalFlip(),
                                    transforms.ToTensor(),
                                    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
                                    ])
train_transform_aug2 = transforms.Compose([
                                    transforms.RandomResizedCrop(224),
                                    transforms.RandomHorizontalFlip(),
                                    transforms.ToTensor(),
                                    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
                                    ])

test_transform = transforms.Compose([transforms.Resize((img_size,img_size)),
                                    transforms.ToTensor(),
                                    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
                                   ])

orig_dataset = datasets.ImageFolder(root=TRAIN_DATA_PATH,transform=train_transform_orig)
aug1_dataset = datasets.ImageFolder(root=TRAIN_DATA_PATH,transform=train_transform_aug1)
aug2_dataset = datasets.ImageFolder(root=TRAIN_DATA_PATH,transform=train_transform_aug2)

dataset = data.ConcatDataset([orig_dataset, aug1_dataset, aug2_dataset])
test_data = datasets.ImageFolder(root=TEST_DATA_PATH,transform=test_transform)

# spilt data into training and validation
TOTAL_SIZE = len(dataset)
ratio = 0.9
train_len = round(TOTAL_SIZE * ratio)
val_len = round(TOTAL_SIZE * (1-ratio))

train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_len, val_len])

# data loader, you can choose the input arguments by yourself

train_data_loader = data.DataLoader(train_dataset, batch_size=32, shuffle=True,  num_workers=4)
val_data_loader = data.DataLoader(val_dataset, batch_size=32, shuffle=True,  num_workers=4)
test_data_loader  = data.DataLoader(test_data, batch_size=8, shuffle=False, num_workers=4) 

print(dataset)
# print(dataset.class_to_idx)

<torch.utils.data.dataset.ConcatDataset object at 0x7f3be7c2a5c0>


In [None]:
def train(model, data_loader, optimizer, epoch, verbose=True):
    model.train()
    loss_avg = 0.0
    for batch_idx, (data, target) in enumerate(data_loader):
        data, target = data.to(device), target.to(device)
        
        output = model(data)
        # loss function
        loss   = loss_func(output,target)
        optimizer.zero_grad()   
        loss_avg = loss.item()
        
        # do back propagation
        loss.backward()
        optimizer.step()
        print(len(data_loader))
        #print
        verbose_step = len(data_loader) // 10
#         if batch_idx % verbose_step == 0 and verbose:
#             print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
#                 epoch, batch_idx * len(data), len(data_loader.dataset),
#                 100. * batch_idx / len(data_loader), loss.item()))

    print(loss_avg/len(data_loader))
    return loss_avg / len(data_loader)

def valid(model, data_loader):
    with torch.no_grad():
        model.eval()
        valid_loss = 0
        correct = 0
        for data, target in data_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            valid_loss += F.cross_entropy(output, target, reduction='sum').item() # sum up batch loss
            pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability
            correct += pred.eq(target.data.view_as(pred)).cpu().sum().item() 

        valid_loss /= len(data_loader.dataset)
        print('\nValid set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
            valid_loss, correct, len(data_loader.dataset),
            100. * correct / len(data_loader.dataset)))
    return float(correct) / len(data_loader.dataset)

In [None]:
class ResNet(nn.Module):
    def __init__(self,basicblock,num_classes=5):
    
        super(ResNet, self).__init__()
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1=nn.Sequential(BasicBlock(inplanes=64, planes=64, stride=1),BasicBlock(inplanes=64, planes=64, stride=1))
        self.layer2=nn.Sequential(BasicBlock(inplanes=64, planes=128, stride=2),BasicBlock(inplanes=128, planes=128, stride=1))
        self.layer3=nn.Sequential(BasicBlock(inplanes=128, planes=256, stride=2),BasicBlock(inplanes=256, planes=256, stride=1))
        self.layer4=nn.Sequential(BasicBlock(inplanes=256, planes=512, stride=2),BasicBlock(inplanes=512, planes=512, stride=1))
        self.avgpool=nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(512,5)
    def forward(self, x):
        # 输入
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        # 中间卷积
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        # 输出
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)

        return x

# 生成一个res18网络
def resnet18(pretrained=False, **kwargs):
    model = ResNet(BasicBlock, [2, 2, 2, 2], **kwargs)
    if pretrained:
        model.load_state_dict(model_zoo.load_url(model_urls['resnet18']))
    return model

In [None]:
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(inplanes, planes,kernel_size=3,stride=stride,padding=1)
        self.bn1 = nn.BatchNorm2d(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(planes, planes,kernel_size=3,stride=1,padding=1)
        self.bn2 = nn.BatchNorm2d(planes)
        self.downsample = None
        if(stride==2):
            self.downsample = nn.Sequential(nn.Conv2d(inplanes, planes,kernel_size=1,stride=2),nn.BatchNorm2d(planes))
        self.stride = stride

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out

In [None]:
# using gpu if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
####################  implement your optimizer ###################################
## yo can use any training methods if you want (ex:lr decay, weight decay.....)

model = ResNet(BasicBlock)
model.to(device=device)
lr=0.001
optimizer = torch.optim.Adam(model.parameters(),lr=lr)
loss_func = nn.CrossEntropyLoss()
# start training
epochs = 40
acc = 0.0
for epoch in range(epochs):
    model.train()
    train(model, train_data_loader, optimizer, epoch)
    accuracy = valid(model, val_data_loader)
    if accuracy > acc:
        acc = accuracy
        print("-------------saving model--------------")
        # save the model
        torch.save(model, "model.pth")

In [None]:
test_transform = transforms.Compose([transforms.Resize((img_size,img_size)),
                                    transforms.ToTensor()
                                    ])

test_data = datasets.ImageFolder(root=TEST_DATA_PATH,transform=test_transform)
test_data_loader  = data.DataLoader(test_data, batch_size=8, shuffle=False, num_workers=4) 

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# load the model so that you don't need to train the model again
test_model = torch.load("model.pth").to(device)

In [None]:
def test(model,data_loader):
    with torch.no_grad():
        model.eval()
        valid_loss = 0
        correct = 0
        bs = test_data_loader.batch_size
        result = []
        for i, (data, target) in enumerate(test_data_loader):
            data, target = data.to(device), target.to(device)
            output = model(data)
            pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability
            arr = pred.data.cpu().numpy()
            for j in range(pred.size()[0]):
                file_name = test_data.samples[i*bs+j][0].split('/')[-1]
                result.append((file_name,pred[j].cpu().numpy()[0]))
    return result

In [None]:
result = test(test_model,test_data_loader)

# Write results to csv

In [None]:
with open ('ID_result.csv','w') as f:
    f.write('ID,label\n')
    for data in result:
        f.write(data[0]+','+str(data[1])+'\n')