In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
# for dirname, _, filenames in os.walk('/kaggle/input'):
#     for filename in filenames:
#         print(os.path.join(dirname, filename))

# You can write up to 5GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

# Data Preparation
데이터셋을 생성하는 코드입니다. 이 부분은 수정하지 않으셔도 됩니다.

In [None]:
import time
import json
import shutil
import cv2
import pandas as pd

In [None]:
img_dir = "/kaggle/input/tinyquickdraw/quickdraw_simplified"
class_files = os.listdir(img_dir)

In [None]:
def strokes2img(strokes):
    """Strokes 형식의 데이터를 28x28 크기의 grayscale 이미지로 변환"""
    img = np.zeros((28, 28, 1))
    for stroke in strokes:
        pts = np.round(np.array(stroke).transpose().reshape(-1, 1, 2) * (28 / 256)).astype(int)
        img = cv2.polylines(img, [pts], isClosed=False, color=(255, 255, 255))
    img = np.array(255 - img)
    img = cv2.resize(img, dsize=(28, 28), interpolation=cv2.INTER_LINEAR)
    return img

In [None]:
def get_lines(data_path):
    """데이터 파일의 line 수를 리턴"""
    with open(data_path, "r") as d:
        return sum(1 for line in d)

In [None]:
# 지정된 폴더와 하위 디렉토리 폴더, 파일을 모두 삭제

def init_dirs():
    """디렉토리 초기화"""
    if os.path.exists("train"):
        shutil.rmtree("train")
    if os.path.exists("val"):
        shutil.rmtree("val")
    if os.path.exists("test"):
        shutil.rmtree("test")

In [None]:
# ndjson 형식의 파일을 이미지 파일로 바꿔 저장합니다.
# 총 345개의 클래스가 있지만, 용량 문제로 인해 200개의 클래스만 사용하며,
# 원래 해당 클래스에 속한 이미지 수의 1/100을 임의로 표집하여 사용합니다.

num_classes = 200
sample_scale = 100

np.random.seed(0)

print("This process may take a while...")

start_time = time.time()

init_dirs()

for class_idx, class_file in enumerate(class_files):
    
    if class_idx == num_classes:
        break
    
    class_name = class_file.split(".")[0]

    if not os.path.exists(f"train/{class_name}"):
        os.makedirs(f"train/{class_name}")
    
    if not os.path.exists(f"val/{class_name}"):
        os.makedirs(f"val/{class_name}")

    if not os.path.exists(f"test/{class_name}"):
        os.makedirs(f"test/{class_name}")

    file_path = os.path.join(img_dir, class_file)
    num_images = get_lines(file_path)
    num_samples = num_images // sample_scale
    sample_indices = sorted(np.random.choice(num_images, num_samples, replace=False).tolist())
    
    print(f"Processing class {class_idx + 1:03d}/{num_classes}, picking up {num_samples:06d} images. Time elapsed: {(time.time() - start_time)/60:.2f} mins.", end="\r")
    
    with open(file_path, "r") as d:
        for idx, line in enumerate(d):
            if idx == sample_indices[0]:
                strokes = json.loads(line)["drawing"]
                img = strokes2img(strokes)
                
                r = np.random.random()
                if r < 0.8:
                    cv2.imwrite(f"train/{class_name}/{idx}.png", img)
                elif r < 0.9:
                    cv2.imwrite(f"val/{class_name}/{idx}.png", img)
                else:
                    cv2.imwrite(f"test/{class_name}/{idx}.png", img)
                
                sample_indices.pop(0)
                if len(sample_indices) == 0:
                    break
            else:
                pass

In [None]:
def make_csv_file(division):
    """저장된 이미지를 기반으로 데이터 로더를 위한 csv 파일을 작성합니다."""
    print(f"Writing csv data file for {division}...")
    i = 0
    with open(f"{division}.csv", "w") as csv_file:
        csv_file.write("path,cls\n")
        base_dir = division
        classes = os.listdir(base_dir)
        for cls in classes:
            imgs = os.listdir(os.path.join(base_dir, cls))
            for img in imgs:
                path = os.path.join(base_dir, cls, img)
                csv_file.write(f"{path},{cls}\n")
                i += 1
        print(f"Wrote {division}.csv file with {i} lines.")

In [None]:
make_csv_file("train")
make_csv_file("val")
make_csv_file("test")

In [None]:
!ls

# Class Ratio

In [None]:
dataframe = pd.read_csv('train.csv')

In [None]:
class_list = list(set(dataframe['cls']))
all_classes = list(dataframe['cls'])
class_count_dict = dict()

print('%s %10s'%('class_name'.ljust(20), 'count'))
print('-'*33)
for cls in class_list:
    counts = all_classes.count(cls)
    print('%s %10d'%(cls.ljust(20), counts))
    class_count_dict[cls] = counts

In [None]:
# There is a class imbalance problem

def f1(x):
    return class_count_dict[x]

key_max = max(class_count_dict.keys(), key=f1)
key_min = min(class_count_dict.keys(), key=f1)

print('Max : %s --> %d'%(key_max, class_count_dict[key_max]))
print('Min : %s --> %d'%(key_min, class_count_dict[key_min]))
print('Difference :', class_count_dict[key_max] - class_count_dict[key_min])

# Model Train
자유롭게 수정해주세요.

In [None]:
import os

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.autograd import Variable
import torchvision
from torchvision import transforms

import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image

In [None]:
# for Baseline Model
class CustomDataset(Dataset):
    
    def __init__(self, csv_path="train.csv", transform=None):
        data = pd.read_csv(csv_path)
        self.paths = data["path"].values
        self.classes = data["cls"].values
        self.cls_to_idx = {cls: idx for idx, cls in enumerate(set(self.classes))}
        self.transform = transforms.Compose([
            transforms.ToTensor()
        ])
    
    def __getitem__(self, idx):
        path = self.paths[idx]
        img = self.transform(Image.open(path))
        cls_idx = self.cls_to_idx[self.classes[idx]]
        return path, img, cls_idx
    
    def __len__(self):
        return len(self.paths)
    
    

# for VGG16_bn and ResNet50 Model
# class CustomDataset(Dataset):
    
#     def __init__(self, csv_path="train.csv", transform=None):
#         data = pd.read_csv(csv_path)
#         self.paths = data["path"].values
#         self.classes = data["cls"].values
#         self.cls_to_idx = {cls: idx for idx, cls in enumerate(set(self.classes))}
#         self.transform = transforms.Compose([
#             transforms.Grayscale(num_output_channels=3),
#             transforms.Resize(224),
#             transforms.ToTensor()
#         ])

    
#     def __getitem__(self, idx):
#         path = self.paths[idx]
#         img = self.transform(Image.open(path))
#         cls_idx = self.cls_to_idx[self.classes[idx]]
#         return path, img, cls_idx
    
#     def __len__(self):
#         return len(self.paths)



# for TS-CNN and AI_Inno_CNN Model
# class CustomDataset(Dataset):
    
#     def __init__(self, csv_path="train.csv", transform=None):
#         data = pd.read_csv(csv_path)
#         self.paths = data["path"].values
#         self.classes = data["cls"].values
#         self.cls_to_idx = {cls: idx for idx, cls in enumerate(set(self.classes))}
#         self.transform = transforms.Compose([
#             transforms.Resize(32),
#             transforms.ToTensor()
#         ])
    
#     def __getitem__(self, idx):
#         path = self.paths[idx]
#         img = self.transform(Image.open(path))
#         cls_idx = self.cls_to_idx[self.classes[idx]]
#         return path, img, cls_idx
    
#     def __len__(self):
#         return len(self.paths)

In [None]:
train_dataset = CustomDataset(csv_path="train.csv")
train_data_loader = DataLoader(train_dataset, batch_size=64, num_workers=0, shuffle=True)

val_dataset = CustomDataset(csv_path="val.csv")
val_data_loader = DataLoader(val_dataset, batch_size=128, num_workers=0, shuffle=False)

In [None]:
def train(model, criterion, optimizer, epochs, ES_patience=7):
    # cuda check
    model = model.cuda()
    model.train()
    losses = []
    acc_list = []
    ES_patience = ES_patience
    ES_counter = 0
    ES_best_val = float('inf')
    
    
    for epoch in range(epochs):
        print(f"Epoch {epoch + 1} start")
        for batch_idx, (path, img, cls_idx) in enumerate(train_data_loader):
            cls_scores = model(img.cuda())
            cost = criterion(cls_scores, cls_idx.cuda())
            optimizer.zero_grad()
            cost.backward()
            optimizer.step()
            if (batch_idx + 1) % 100 == 0 or (batch_idx + 1) == len(train_data_loader):
                print(f"Epoch: {epoch + 1:03d}/{epochs:03d} | Batch: {batch_idx + 1:04d}/{len(train_data_loader):04d} | Cost: {cost.item():.4f}")
                losses.append(cost.item())
        acc, val_cost = evaluate(model, val_data_loader, criterion)
        acc_list.append(acc)
        
        # Early Stopping
        if val_cost < ES_best_val:
            ES_counter = 0
            ES_best_val = val_cost
        else:
            ES_counter += 1
        if ES_counter == ES_patience:
            print('Early Stopping')
            break
        
    plt.figure(figsize=(12,6))

    plt.subplot(1,2,1)
    plt.plot(losses)
    plt.tick_params(
        axis='x',          # changes apply to the x-axis
        which='both',      # both major and minor ticks are affected
        bottom=False,      # ticks along the bottom Edge are off
        top=False,         # ticks along the top Edge are off
        labelbottom=False) # labels along the bottom Edge are off
    plt.title('loss curve')
    plt.grid()

    plt.subplot(1,2,2)
    plt.plot(acc_list, color='red')
    plt.ylim([0,1])
    plt.xlabel('epoch')
    plt.title('accuracy for validation dataset')
    plt.grid()
    print('Best val accuracy of this model is %.4f in %d epoch'%(max(acc_list), acc_list.index(max(acc_list))+1))
    return model

In [None]:
def evaluate(model, data_loader, criterion):
    """모델을 통한 prediction 및 accuracy 계산"""
    model.eval()
    with torch.no_grad():
        corrects_list = list()
        for path, img, cls_idx in val_data_loader:
            cls_scores = model(img.cuda())
            cost = criterion(cls_scores, cls_idx.cuda())
            cls_preds = torch.argmax(cls_scores, dim=1).cpu().numpy()
            corrects_list.extend(list(cls_preds == cls_idx.numpy()))
        accuracy = np.mean(corrects_list)
        print(f"Accuracy: {accuracy:.4f}")
    return accuracy, cost.item()

# Baseline Model

In [None]:
class Baseline(nn.Module):
    
    def __init__(self):
        super().__init__()
        self.num_classes = 200
        self.conv_layer = nn.Sequential(
            nn.Conv2d(1, 6, 5, 1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(6, 16, 5, 1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        
        self.fc_layer = nn.Sequential(
            nn.Linear(16*4*4, 256),
            nn.ReLU(),
            nn.Linear(256, self.num_classes),
        )
        
    def forward(self, x):
        x = self.conv_layer(x)
        x = x.view(x.shape[0], -1)
        x = self.fc_layer(x)
        return x

In [None]:
model = Baseline()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()
epochs = 100

# VGG16_bn(pretrained)

In [None]:
vgg16_bn = torchvision.models.vgg16_bn(pretrained=True).cuda()

In [None]:
for param in vgg16_bn.parameters():
    param.requires_grad = False
    
num_ftrs = vgg16_bn.classifier[6].in_features
vgg16_bn.classifier[6] = nn.Linear(num_ftrs, num_classes)

for param in vgg16_bn.classifier.parameters():
    param.requires_grad = True

In [None]:
optimizer = torch.optim.Adam(vgg16_bn.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
epochs = 100

In [None]:
vgg16_bn = train(vgg16_bn, criterion, optimizer, epochs)

# Resnet50(pretrained)

In [None]:
resnet50 = torchvision.models.resnet50(pretrained=True).cuda()

In [None]:
for param in resnet50.parameters():
    param.requires_grad = False

num_ftrs = resnet50.fc.in_features
resnet50.fc = nn.Linear(num_ftrs, num_classes)

resnet50.fc.weight.requires_grad = True    # double check

In [None]:
optimizer = torch.optim.Adam(resnet50.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
epochs = 100

In [None]:
resnet50 = train(resnet50, criterion, optimizer, epochs)

# Like_VGG

In [None]:
class Like_VGG(nn.Module):
    
    def __init__(self):
        super().__init__()
        self.num_classes = 200
        self.conv_layer = nn.Sequential(
            # (B, 1, 28, 28)
            nn.Conv2d(1, 32, 3, 1),
            # (B, 32, 26, 26)
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(32, 32, 3, 1),
            # (B, 32, 24, 24)
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),
            # (B, 32, 12, 12)
            nn.Conv2d(32, 64, 3, 1),
            # (B, 64, 10, 10)
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 64, 3, 1),
            # (B, 64, 8, 8)
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),
            # (B, 64, 4, 4)
        )
        
        self.fc_layer = nn.Sequential(
            nn.Linear(64*4*4, 64*4*4),
            nn.ReLU(),
            nn.Linear(64*4*4, self.num_classes),
        )
        
        # weights initializer
        for m in self.modules():
            if isinstance(m, (nn.Conv2d, nn.Linear)):
               nn.init.kaiming_uniform_(m.weight, nonlinearity='relu')
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
               nn.init.constant_(m.weight, 1)
               nn.init.constant_(m.bias, 0)
        
    def forward(self, x):
        x = self.conv_layer(x)
        x = x.view(x.shape[0], -1)
        x = self.fc_layer(x)
        return x

In [None]:
Like_VGG = Like_VGG()
optimizer = torch.optim.Adam(Like_VGG.parameters(), lr=0.001, )
criterion = nn.CrossEntropyLoss()
epochs = 100

In [None]:
Like_VGG = train(Like_VGG, criterion, optimizer, epochs)

In [None]:
Like_VGG = Like_VGG()
optimizer = torch.optim.RMSprop(Like_VGG.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
epochs = 100

In [None]:
Like_VGG = train(Like_VGG, criterion, optimizer, epochs)

In [None]:
Like_VGG = Like_VGG()
optimizer = torch.optim.AdamW(Like_VGG.parameters(), lr=0.001, weight_decay=0.01)
criterion = nn.CrossEntropyLoss()
epochs = 100

In [None]:
Like_VGG = train(Like_VGG, criterion, optimizer, epochs)

# Like_VGG with Focal Loss

[Focal Loss for Dense Object Detection](https://arxiv.org/abs/1708.02002)<br>
[code reference link](https://www.kaggle.com/c/tgs-salt-identification-challenge/discussion/65938)

In [None]:
class FocalLoss(nn.Module):
    def __init__(self, alpha=0.25, gamma=2, logits=True, reduce=True):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.logits = logits
        self.reduce = reduce

    def forward(self, inputs, targets):
        y = torch.zeros(targets.shape[0], num_classes)
        y[range(y.shape[0]), targets]=1
        targets = y.cuda()
        if self.logits:
            BCE_loss = F.binary_cross_entropy_with_logits(inputs, targets, reduce=False)
        else:
            BCE_loss = F.binary_cross_entropy(inputs, targets, reduce=False)
        pt = torch.exp(-BCE_loss)
        F_loss = self.alpha * (1-pt)**self.gamma * BCE_loss

        if self.reduce:
            return torch.mean(F_loss)    # to check for changes: mean --> sum
        else:
            return F_loss

In [None]:
Like_VGG_FL = Like_VGG()
optimizer = torch.optim.Adam(Like_VGG_FL.parameters(), lr=0.001)
criterion = FocalLoss()
# criterion = nn.CrossEntropyLoss()
epochs = 100

In [None]:
Like_VGG_FL = train(Like_VGG_FL, criterion, optimizer, epochs)

# AI Inno CNN

In [None]:
class AI_Inno_CNN(nn.Module):
    
    def __init__(self):
        super().__init__()
        self.num_classes = 200
        self.conv_layer = nn.Sequential(
            # (B, 1, 32, 32)
            nn.Conv2d(1, 32, 3, 1),
            # (B, 32, 30, 30)
            nn.ReLU(),
            nn.Conv2d(32, 64, 3, 1),
            # (B, 64, 28, 28)
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(2),
            # (B, 64, 14, 14)
            
            nn.Conv2d(64, 64, 3, 1),
            # (B, 64, 12, 12)
            nn.ReLU(),
            nn.Conv2d(64, 128, 3, 1),
            # (B, 128, 10, 10)
            nn.ReLU(),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(2),
            # (B, 128, 5, 5)
            
            nn.Conv2d(128, 256, 3, 1),
            # (B, 256, 3, 3)
            nn.ReLU(),
            nn.BatchNorm2d(256)
        )
        
        self.fc_layer = nn.Sequential(
            # (B, 256)
            nn.Linear(256, self.num_classes),
            # (B, 200)
            nn.ReLU(),
            nn.Linear(self.num_classes, self.num_classes),
            # (B, 200)
        )
        
        # weights initializer
        for m in self.modules():
            if isinstance(m, (nn.Conv2d, nn.Linear)):
               nn.init.kaiming_uniform_(m.weight, nonlinearity='relu')
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
               nn.init.constant_(m.weight, 1)
               nn.init.constant_(m.bias, 0)
        
    def forward(self, x):
        x = self.conv_layer(x)
        x = F.adaptive_avg_pool2d(x, (1,1))
        x = x.view(x.shape[0], -1)
        x = self.fc_layer(x)
        return x

In [None]:
AI_Inno_CNN = AI_Inno_CNN()
optimizer = torch.optim.Adam(AI_Inno_CNN.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
epochs = 100

In [None]:
AI_Inno_CNN = train(AI_Inno_CNN, criterion, optimizer, epochs)

# TS-CNN

HAND DRAWN SKETCH CLASSIFICATION USING CONVOLUTIONAL NEURAL NETWORKS [link](https://www.semanticscholar.org/paper/HAND-DRAWN-SKETCH-CLASSIFICATION-USING-NEURAL-Atabay/e688a6535dbdd6ce6928bc4eb2978f39628e5302)

In [None]:
class TS_CNN(nn.Module):
    
    def __init__(self):
        super().__init__()
        self.num_classes = 200
            # (B, 1, 32, 32)
        self.L1 = nn.Sequential(
            nn.Conv2d(1, 96, 5, 1),
            # (B, 96, 28, 28)
            nn.ReLU(),
            nn.MaxPool2d(3, stride=2),
            # (B, 96, 14, 14)
        )
        self.L2 = nn.Sequential(
            nn.Conv2d(96, 192, 5, 1),
            # (B, 192, 10, 10)
            nn.ReLU(),
            nn.MaxPool2d(3, stride=2),
            # (B, 192, 4, 4)
        )
        self.L3 = nn.Sequential(
            nn.Conv2d(192, 192, 3, 1),
            # (B, 192, 2, 2)
            nn.ReLU()
        )
        self.L4 = nn.Sequential(
            nn.Conv2d(192, self.num_classes, 1, 1),
            # (B, 200, 2, 2)
            nn.ReLU(),
            nn.Conv2d(self.num_classes, self.num_classes, 1, 1)
            
        )
        # weights initializer
        for m in self.modules():
            if isinstance(m, (nn.Conv2d, nn.Linear)):
               nn.init.kaiming_uniform_(m.weight, nonlinearity='relu')
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
               nn.init.constant_(m.weight, 1)
               nn.init.constant_(m.bias, 0)
        
        
    def forward(self, x):
        x = self.L1(x)
        x = self.L2(x)
        x = self.L3(x)
        x = self.L4(x)
        x = x.view(x.shape[0], -1)
        return x

In [None]:
TS_CNN = TS_CNN()
optimizer = torch.optim.SGD(TS_CNN.parameters(), lr=0.001, momentum=0.9,
                           weight_decay=0.0005)
criterion = nn.CrossEntropyLoss()
epochs = 100

In [None]:
TS_CNN = train(TS_CNN, criterion, optimizer, epochs)

# Evaluation
최종 스코어입니다. 이 정확도를 최대한 올려주세요!

In [None]:
test_dataset = CustomDataset(csv_path="test.csv")
test_data_loader = DataLoader(test_dataset, batch_size=128, num_workers=0, shuffle=False)

In [None]:
evaluate(model, test_data_loader)