In [1]:
import os
import shutil
import math
import torch
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
from torchvision import models
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim import lr_scheduler
import copy
import time

In [2]:
original_dataset_dir = './dataset'
classes_list = os.listdir(original_dataset_dir)

base_dir = './splitted'
os.mkdir(base_dir)

train_dir = os.path.join(base_dir, 'train')
os.mkdir(train_dir)
validation_dir = os.path.join(base_dir, 'val')
os.mkdir(validation_dir)
test_dir = os.path.join(base_dir, 'test')
os.mkdir(test_dir)

for cls in classes_list:
    os.mkdir(os.path.join(train_dir, cls))
    os.mkdir(os.path.join(validation_dir, cls))
    os.mkdir(os.path.join(test_dir, cls))

In [3]:
for cls in classes_list:
    path = os.path.join(original_dataset_dir, cls)
    fnames = os.listdir(path)

    train_size = math.floor(len(fnames) * 0.6)
    validation_size = math.floor(len(fnames) * 0.2)
    test_size = math.floor(len(fnames) * 0.2)

    train_fnames = fnames[:train_size]
    print("Train size(",cls,"): ", len(train_fnames))
    for fname in train_fnames:
        src = os.path.join(path, fname)
        dst = os.path.join(os.path.join(train_dir, cls), fname)
        shutil.copyfile(src, dst)

    validation_fnames = fnames[train_size:(validation_size + train_size)]
    print("Validation size(",cls,"): ", len(validation_fnames))
    for fname in validation_fnames:
        src = os.path.join(path, fname)
        dst = os.path.join(os.path.join(validation_dir, cls), fname)
        shutil.copyfile(src, dst)

    test_fnames = fnames[(train_size+validation_size):(validation_size + train_size +test_size)]

    print("Test size(",cls,"): ", len(test_fnames))
    for fname in test_fnames:
        src = os.path.join(path, fname)
        dst = os.path.join(os.path.join(test_dir, cls), fname)
        shutil.copyfile(src, dst)

Train size( ADONIS ):  52
Validation size( ADONIS ):  17
Test size( ADONIS ):  17
Train size( AFRICAN GIANT SWALLOWTAIL ):  45
Validation size( AFRICAN GIANT SWALLOWTAIL ):  15
Test size( AFRICAN GIANT SWALLOWTAIL ):  15
Train size( AMERICAN SNOOT ):  44
Validation size( AMERICAN SNOOT ):  14
Test size( AMERICAN SNOOT ):  14
Train size( AN 88 ):  51
Validation size( AN 88 ):  17
Test size( AN 88 ):  17
Train size( APPOLLO ):  54
Validation size( APPOLLO ):  18
Test size( APPOLLO ):  18
Train size( ATALA ):  60
Validation size( ATALA ):  20
Test size( ATALA ):  20
Train size( BANDED ORANGE HELICONIAN ):  58
Validation size( BANDED ORANGE HELICONIAN ):  19
Test size( BANDED ORANGE HELICONIAN ):  19
Train size( BANDED PEACOCK ):  49
Validation size( BANDED PEACOCK ):  16
Test size( BANDED PEACOCK ):  16
Train size( BECKERS WHITE ):  48
Validation size( BECKERS WHITE ):  16
Test size( BECKERS WHITE ):  16
Train size( BLACK HAIRSTREAK ):  51
Validation size( BLACK HAIRSTREAK ):  17
Test siz

In [5]:
USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_CUDA else "cpu")
BATCH_SIZE = 256
EPOCH = 30

In [6]:
data_transforms = {
    'train': transforms.Compose([transforms.Resize([64,64]),
        transforms.RandomHorizontalFlip(), transforms.RandomVerticalFlip(),
        transforms.RandomCrop(52), transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]),

    'val': transforms.Compose([transforms.Resize([64,64]),
        transforms.RandomCrop(52), transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ])
}


data_dir = './splitted'
image_datasets = {x: ImageFolder(root=os.path.join(data_dir, x), transform=data_transforms[x]) for x in ['train', 'val']}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=BATCH_SIZE, shuffle=True, num_workers=4) for x in ['train', 'val']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}
class_names = image_datasets['train'].classes

In [7]:
from torchvision.models import ResNet50_Weights

resnet = models.resnet50(weights=ResNet50_Weights.IMAGENET1K_V1)
num_ftrs = resnet.fc.in_features
resnet.fc = nn.Linear(num_ftrs, 75)
resnet = resnet.to('cpu')

criterion = nn.CrossEntropyLoss()
optimizer_ft = optim.Adam(filter(lambda p: p.requires_grad, resnet.parameters()), lr=0.001)

exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)

In [8]:
ct = 0
for child in resnet.children():
    ct += 1
    if ct < 6:
        for param in child.parameters():
            param.requires_grad = False

In [11]:
def train_resnet(model, criterion, optimizer, scheduler, num_epochs=25):

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('-------------- epoch {} ----------------'.format(epoch+1))
        since = time.time()
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0


            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(DEVICE)
                labels = labels.to(DEVICE)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss/dataset_sizes[phase]
            epoch_acc = running_corrects.double()/dataset_sizes[phase]

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))


            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        time_elapsed = time.time() - since
        print('Completed in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    model.load_state_dict(best_model_wts)

    return model

In [14]:
model_resnet50 = train_resnet(resnet, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=10)

torch.save(model_resnet50, 'resnet50.pt')

-------------- epoch 1 ----------------
train Loss: 0.1365 Acc: 0.9556
val Loss: 0.5822 Acc: 0.8532
Completed in 1m 47s
-------------- epoch 2 ----------------
train Loss: 0.1296 Acc: 0.9571
val Loss: 0.5827 Acc: 0.8501
Completed in 1m 44s
-------------- epoch 3 ----------------
train Loss: 0.1283 Acc: 0.9605
val Loss: 0.5864 Acc: 0.8446
Completed in 1m 45s
-------------- epoch 4 ----------------
train Loss: 0.1266 Acc: 0.9644
val Loss: 0.5832 Acc: 0.8493
Completed in 1m 48s
-------------- epoch 5 ----------------
train Loss: 0.1217 Acc: 0.9620
val Loss: 0.5976 Acc: 0.8493
Completed in 1m 46s
-------------- epoch 6 ----------------
train Loss: 0.1176 Acc: 0.9623
val Loss: 0.5747 Acc: 0.8493
Completed in 1m 49s
-------------- epoch 7 ----------------
train Loss: 0.1176 Acc: 0.9620
val Loss: 0.5744 Acc: 0.8579
Completed in 1m 43s
-------------- epoch 8 ----------------
train Loss: 0.1182 Acc: 0.9623
val Loss: 0.5688 Acc: 0.8532
Completed in 5m 13s
-------------- epoch 9 ----------------


In [15]:
def evaluate(model, test_loader):
    model.eval()
    test_loss = 0
    correct = 0

    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(DEVICE), target.to(DEVICE)
            output = model(data)

            test_loss += F.cross_entropy(output,target, reduction='sum').item()


            pred = output.max(1, keepdim=True)[1]
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    test_accuracy = 100. * correct / len(test_loader.dataset)
    return test_loss, test_accuracy

In [16]:
transform_resNet = transforms.Compose([
        transforms.Resize([64,64]),
        transforms.RandomCrop(52),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

test_resNet = ImageFolder(root='./splitted/test', transform=transform_resNet)
test_loader_resNet = torch.utils.data.DataLoader(test_resNet, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)

In [17]:
resnet50=torch.load('resnet50.pt')
resnet50.eval()
test_loss, test_accuracy = evaluate(resnet50, test_loader_resNet)

print('ResNet test acc:  ', test_accuracy)

ResNet test acc:   86.65620094191523
