# Import requirements & Custom Dataset

In [None]:
import torch
import os
import cv2
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader, ConcatDataset
import numpy as np
import torchvision.transforms as transforms
import torch.nn.functional as F

USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_CUDA else "cpu")
print(DEVICE)

EPOCH = 25

transforms = transforms.Compose([
    transforms.ToTensor(), 
    transforms.Normalize(mean=[0.485, 0.456, 0.486], std=[0.229, 0.224, 0.225])
])

# Class for load data from directory
class MyDataset(Dataset):
    def __init__(self, image_dir, label, transforms=None, test=False):
        self.image_dir = image_dir
        self.label = label
        self.image_list = os.listdir(self.image_dir)
        self.transforms = transforms
        self.test_mode = test
    
    def __len__(self):
        return len(self.image_list)
    
    def __getitem__(self,idx):
        image_name = os.path.join(self.image_dir, self.image_list[idx])
        image = cv2.imread(image_name)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = cv2.resize(image, (224,224), cv2.INTER_AREA)
        ###transform
        image = transforms(image)
       
        if self.test_mode:
            return (image, self.image_list[idx])
        else:
            return (image, self.label)



# Augmentation

In [None]:
from torchvision.transforms import Compose, ToTensor, ToPILImage

# Class for load data from directory with augmentation by fliping images
class MyDataset_aug(Dataset):
    def __init__(self, image_dir, label, transforms=None, test=False):
        self.image_dir = image_dir
        self.label = label
        self.image_list = os.listdir(self.image_dir)
        self.transforms = transforms
        self.test_mode = test
    
    def __len__(self):
        return len(self.image_list)
    
    def __getitem__(self,idx):
        image_name = os.path.join(self.image_dir, self.image_list[idx])
        image = cv2.imread(image_name)
        flipLR_img = np.fliplr(image) # Flip original image
        flipLR_img = cv2.cvtColor(flipLR_img, cv2.COLOR_BGR2RGB)
        flipLR_img = cv2.resize(flipLR_img, (224,224), cv2.INTER_AREA)
        ###transform
        flipLR_img = transforms(flipLR_img)
        
        if self.test_mode:
            return (flipLR_img, self.image_list[idx])
        else:
            return (flipLR_img, self.label)

# ConcatDataset & DataLoader

In [None]:
balancing_train = MyDataset("../input/yoga6classes/balancing_train", 0, transforms)
balancing_train_aug = MyDataset_aug("../input/yoga6classes/balancing_train", 0, transforms)

inverted_train = MyDataset("../input/yoga6classes/inverted_train", 1, transforms)
inverted_train_aug = MyDataset_aug("../input/yoga6classes/inverted_train", 1, transforms)

reclining_train = MyDataset("../input/yoga6classes/reclining_train", 2, transforms)
reclining_train_aug = MyDataset_aug("../input/yoga6classes/reclining_train", 2, transforms)

sitting_train = MyDataset("../input/yoga6classes/sitting_train", 3, transforms)
sitting_train_aug = MyDataset_aug("../input/yoga6classes/sitting_train", 3, transforms)

standing_train = MyDataset("../input/yoga6classes/standing_train", 4, transforms)
standing_train_aug = MyDataset_aug("../input/yoga6classes/standing_train", 4, transforms)

wheel_train = MyDataset("../input/yoga6classes/wheel_train", 5, transforms)
wheel_train_aug = MyDataset_aug("../input/yoga6classes/wheel_train", 5, transforms)

# Concat original images and augmented images
train_set = ConcatDataset([balancing_train, inverted_train, reclining_train, sitting_train, standing_train, wheel_train,
                          balancing_train_aug, inverted_train_aug, reclining_train_aug, sitting_train_aug, standing_train_aug, wheel_train_aug])
print("Number of Training set images : ", len(train_set))

balancing_valid = MyDataset("../input/yoga6classes/balancing_valid", 0, transforms)
inverted_valid = MyDataset("../input/yoga6classes/inverted_valid", 1, transforms)
reclining_valid = MyDataset("../input/yoga6classes/reclining_valid", 2, transforms)
sitting_valid = MyDataset("../input/yoga6classes/sitting_valid", 3, transforms)
standing_valid = MyDataset("../input/yoga6classes/standing_valid", 4, transforms)
wheel_valid = MyDataset("../input/yoga6classes/wheel_valid", 5, transforms)
val_set = ConcatDataset([balancing_valid, inverted_valid, reclining_valid, sitting_valid, standing_valid, wheel_valid])
print("Number of Validation set images : ", len(val_set))


# Build model

In [None]:
import torchvision.models as models
from torch.optim import lr_scheduler
import torch.optim as optim
import torch.nn as nn

model = models.resnet18(pretrained=True) # Using pretrained ResNet18
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 6) # Define fully connected layer
model = model.to(DEVICE)

print("create model and optimizer")

# Train code & Loss function & test code & evaluation code

In [None]:
# Function for train model
def train(model, train_loader, optimizer, epoch):
    model.train()
    for i, (image, target) in enumerate(train_loader):
        image, target = image.to(DEVICE), target.to(DEVICE)
        output = model(image)
 
        optimizer.zero_grad()
        loss = F.cross_entropy(output, target).to(DEVICE)
        loss.backward()
        optimizer.step()     
        if i % 10 == 0:
            print('Train Epoch : {} [{}/{} ({:.0f})%]\tLoss: {:.6f}'
                 .format(epoch, i*len(image), len(train_loader.dataset), 100.*i / len(train_loader), loss.item()))

# Function for validate model
def evaluate(model, val_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for(image, target) in val_loader:
            image,target = image.to(DEVICE), target.to(DEVICE)
            output = model(image)
            
            test_loss += F.cross_entropy(output, target, reduction = 'sum').item()
            pred = output.max(1, keepdim = True)[1]
            correct += pred.eq(target.view_as(pred)).sum().item()
            
    test_loss /= len(val_loader.dataset)
    test_accuracy = 100. * correct / len(val_loader.dataset)
    return test_loss, test_accuracy


# Tuner

In [None]:
import random
import copy
from itertools import product

# Configuration set for tuner
config = {
    "lr": [0.001, 0.005, 0.0005],
    "batch_size": [64, 32, 16, 8],
    "optimizer": ['SGD','Adagrad', 'Adam']
}

# Epoch used for training in tuner
tuning_epoch = 10 

'''
Tuner to find out the best combination of hyper parameters and optimizer
Strategy: Find the right combination with less epoch and train it back to that combination with more epoch
'''
def tuner(model, train_data, test_data, config):
    # Make all of combinations from configuration
    combinations = list(product(config["lr"], config["batch_size"], config["optimizer"]))
   
    best_combination = {"combination": [], "acc": 0.0}
    best_loss = 1
    best_epoch = 0
    best_model = copy.deepcopy(model)
    
    for combi in combinations:
        # Deep Copy original model to training to initialize weights at every combinations
        compare_model = copy.deepcopy(model)
        compare_model = compare_model.to(DEVICE)
        
        # Make Optimizer
        if(combi[2] == 'SGD'):
            optimizer = optim.SGD(compare_model.parameters(), lr=combi[0], momentum = 0.9, nesterov=True)
        elif(combi[2] == 'Adagrad'):
            optimizer = optim.Adagrad(compare_model.parameters(), lr=combi[0])
        elif(combi[2] == 'Adam'):
            optimizer = optim.Adam(compare_model.parameters(), lr=combi[0])
        
        # Make Loader
        train_loader = DataLoader(train_data, batch_size=combi[1], shuffle=True)
        val_loader = DataLoader(test_data, batch_size=combi[1], shuffle=False)
        
        # Training model with a combination
        print('Training combination: ', combi, ' ...')
        best = 0
        for epoch in range(tuning_epoch):
            train(compare_model, train_loader, optimizer, epoch)
            test_loss, test_accuracy = evaluate(compare_model, val_loader)
            if test_accuracy > best:
                best = test_accuracy
                best_loss = test_loss
                best_epoch = epoch
            print('[{}] Test Loss : {:.4f}, Accuracy : {:.4f}%'.format(epoch, test_loss, test_accuracy))
    
            # Stop epoch if it doesn't renew more than 5 epochs based on lowest loss
            if epoch > best_epoch+5 and test_loss > best_loss:
                break
        
        # Deciding best model
        if best > best_combination["acc"]:
            best_combination["acc"] = best
            best_combination["combination"] = combi
            best_model = compare_model
            
        print('---------------------------------------------------------')
        print("finish train a combination")
        print("current best acc: ", best_combination["acc"], "with combination: ", "lr: {}, batch_size: {}, optimizer: {}".format(best_combination["combination"][0], best_combination["combination"][1], best_combination["combination"][2]))
    
                    
    model = best_model
    torch.save(model.state_dict(), "./best_model.pth")
    return best_combination["combination"]
    
# Get the best combination of hyper paramers and optimizer
combi = tuner(model, train_set, val_set, config)


# Main code & Save the best model

In [None]:
import time
start = time.time()
best = 0

# Decide optimizer to use
if(combi[2] == 'SGD'):
    optimizer = optim.SGD(model.parameters(), lr=combi[0], momentum = 0.9, nesterov=True)

if(combi[2] == 'Adagrad'):
    optimizer = optim.Adagrad(model.parameters(), lr=combi[0])
    
if(combi[2] == 'Adam'):
    optimizer = optim.Adam(model.parameters(), lr=combi[0])

BATCH_SIZE = combi[1]

# Make loaders
train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_set, batch_size=BATCH_SIZE, shuffle=False)

# Train
for epoch in range(EPOCH):
    train(model, train_loader, optimizer, epoch)
    test_loss, test_accuracy = evaluate(model, val_loader)
    if test_accuracy > best:
        best = test_accuracy
        torch.save(model.state_dict(), "./best_model.pth")
    print('[{}] Test Loss : {:.4f}, Accuracy : {:.4f}%'.format(epoch, test_loss, test_accuracy))

end = time.time()
time = end - start

# Print results
print("finish train")
print("best acc: ", best)
print("time: {}h, {}m,{}s".format(int(time/3600), int(time/60), time%60))


# Save the predicted value as a CSV file

In [None]:
import csv

load = torch.load('./best_model.pth')
model.load_state_dict(load) # load weights from the best model
model.eval()
print("load model for test set")

f = open("./prediction.csv", "w", newline="") # open file to save prediction results
w = csv.writer(f)
w.writerow(['id', 'target'])

# Make test data loader
test_set = MyDataset("../input/yogatestdataset", 0,transforms, test = True)
test_loader = DataLoader(test_set, batch_size=combi[1], shuffle=False)

preds = []
img_ids = []
correct = 0

# Predict class with test data
with torch.no_grad():
    for (image, image_name) in test_loader:
        image = image.to(DEVICE)
        output = model(image)
        
        pred = output.max(1, keepdim = True)[1]
        preds.extend(pred)
        img_ids.extend(image_name)
        
# Write results to .csv
for i in range(600):
    w.writerow([img_ids[i][:-4], str(preds[i].item())])
    
f.close()
print("save prediction csv")