# Pytorch Template
+ made by JSY
+ https://github.com/HideOnHouse/TorchBase

In [None]:
import sys
import pickle
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from tqdm import tqdm, notebook

## Data Import

In [None]:
# label_tags
label_tags = ['T-Shirt', 'Trouser', 'Pullover', 'Dress', 'Coat', 'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle Boot']

train_path = ""
test_path = ""

train_data = pd.read_csv(train_path)
test_data = pd.read_csv(test_path)

In [None]:
# your Data Pre-Processing
train_x, train_y = train_data.iloc[:, 1:], train_data.iloc[:, :1]
test_x, test_y = test_data.iloc[:, 1:], test_data.iloc[:, :1]

# data split
train_x, valid_x, train_y, valid_y = train_test_split(train_x, train_y, stratify=train_y, random_state=17, test_size=0.1)

# Check Train, Valid, Test Image's Shape
print("The Shape of Train Images: ", train_x.shape)
print("The Shape of Valid Images: ", valid_x.shape)
print("The Shape of Test Images: ", test_x.shape)

# Check Train, Valid Label's Shape
print("The Shape of Train Labels: ", train_y.shape)
print("The Shape of Valid Labels: ", valid_y.shape)
print("The Shape of Valid Labels: ", test_y.shape)

## Define Custom Dataset

In [None]:
class MyDataset(Dataset):
    def __init__(self):
        super(MyDataset, self).__init__()
        pass

    def __len__(self):
        pass

    def __getitem__(self, idx):
        pass

    def show_item(self, idx=0):
        feature, label = self.__getitem__(idx)

        print("Feature's Shape : {}".format(feature.shape))
        print("Label's Shape : {}".format(label.shape))

        return feature, label

In [None]:
# Create Dataset and DataLoader
train_dataset = MyDataset(train_x, train_y)
valid_dataset = MyDataset(valid_x, valid_y)
test_dataset = MyDataset(test_x, test_y)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=32)
test_loader = DataLoader(test_dataset, batch_size=32)

## Define Learning Function

In [None]:
def calc_acc(output, label):
    o_val, o_idx = torch.max(output, dim=-1)
    l_val, l_idx = torch.max(label, dim=-1)
    return (o_idx == l_idx).sum().item()


def train(model, device, optimizer, criterion, epochs, train_loader, valid_loader=None) -> dict:
    """
    :param model: your model
    :param device: your device(cuda or cpu)
    :param optimizer: your optimizer
    :param criterion: loss function
    :param epoch: train epochs
    :param train_loader: train dataset
    :param valid_loader: valid dataset
    :return: history dictionary that contains train_loss, train_acc, valid_loss, valid_acc as list
    """
    history = {
        'train_loss': [],
        'train_acc': [],
        'valid_loss': [],
        'valid_acc': []
    }
    model.to(device)
    for epoch in range(1, epochs + 1):
        model.train()
        train_loss = train_acc = 0

        # in notebook
        # pabr = notebook.tqdm(enumerate(train_loader), file=sys.stdout)

        # in interpreter
        pbar = tqdm(enumerate(train_loader), file=sys.stdout)
        for batch_idx, (data, target) in pbar:
            data, target = data.to(device), target.to(device)

            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            acc = calc_acc(output, target)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            train_acc += acc

            acc = train_acc / (batch_idx * train_loader.batch_size + len(data))
            pbar.set_postfix(epoch=f'{epoch}/{epochs}', loss='{:.6f}, acc={:.3f}'.format(loss, acc))
        pbar.close()

        train_loss = train_loss / len(train_loader)
        train_acc = train_acc / len(train_loader.dataset)

        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)

        if valid_loader is not None:
            valid_loss, valid_acc = evaluate(model, device, criterion, valid_loader)

            history['valid_loss'].append(valid_loss)
            history['valid_acc'].append(valid_acc)

    return history


def evaluate(model, device, criterion, data_loader):
    """
    :param model: your model
    :param device: your device(cuda or cpu)
    :param criterion: loss function
    :param data_loader: valid or test Datasets
    :return: (valid or test) loss and acc
    """
    model.eval()
    total_loss = total_acc = 0

    with torch.no_grad():
        # in notebook
        # pabr = notebook.tqdm(enumerate(valid_loader), file=sys.stdout)

        # in interpreter
        pbar = tqdm(enumerate(data_loader), file=sys.stdout)

        for batch_idx, (data, target) in pbar:
            data, target = data.to(device), target.to(device),

            output = model(data)
            loss = criterion(output, target)
            acc = calc_acc(output, target)

            total_loss += loss.item()
            total_acc += acc

            acc = total_acc / (batch_idx * data_loader.batch_size + len(data))
            pbar.set_postfix(loss='{:.6f}, acc={:.3f}'.format(loss, acc))
        pbar.close()

    total_loss = total_loss / len(data_loader)
    total_acc = total_acc / len(data_loader.dataset)

    return total_loss, total_acc

## Define Custom Model

In [None]:
class MyModel(nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()

    def forward(self, x):
        return x

## Create Model

In [None]:
model = MyModel()
device = 'cuda' if torch.cuda.is_available() else 'cpu'
optimizer = torch.optim.AdamW(model.parameters())
criterion = torch.nn.CrossEntropyLoss()

## Train

In [None]:
# train
print("============================= Train =============================")
history = train(model, device, optimizer, criterion, 16, train_loader, valid_loader)

## Test

In [None]:
# Test
print("============================= Test =============================")
test_loss, test_acc = evaluate(model, device, test_loader, criterion)
print("test loss : {:.6f}".format(test_loss))
print("test acc : {:.3f}".format(test_acc))

## Inference

In [None]:
# Inference
# print("=========================== Inference ===========================")
# inference(model, device, infer_dataloader)

## Model Save & display history

In [None]:
def draw_history(history):
    train_loss = history["train_loss"]
    train_acc = history["train_acc"]
    valid_loss = history["valid_loss"]
    valid_acc = history["valid_acc"]

    plt.subplot(2,1,1)
    plt.plot(train_loss, label="train")
    plt.plot(valid_loss, label="valid")
    plt.legend()

    plt.subplot(2,1,2)
    plt.plot(train_acc, label="train")
    plt.plot(valid_acc, label="valid")
    plt.legend()

    plt.show()

In [None]:
file_name = "model1"
torch.save(model, f"models/{file_name}.pt")
with open(f"models/{file_name}_history.pickle", 'wb') as f:
    pickle.dump(history, f, pickle.HIGHEST_PROTOCOL)

# print(history)
draw_history(history)