<a href="https://colab.research.google.com/github/Developer-Yee/2021-1-Data-Science/blob/main/Pokemon.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!nvidia-smi

In [None]:
import numpy as np
import pandas as pd
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision

from torchvision import transforms
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torchvision.datasets.folder import pil_loader
from matplotlib.pyplot import imshow
from glob import glob
from PIL import Image

%matplotlib inline

## Load Zip File

In [None]:
import zipfile

# train.zip
output_unzip = zipfile.ZipFile('/content/drive/MyDrive/Colab Notebooks/train.zip', 'r')
output_unzip.extractall('/content/')
output_unzip.close()

# test.zip
output_unzip = zipfile.ZipFile('/content/drive/MyDrive/Colab Notebooks/test.zip', 'r')
output_unzip.extractall('/content/')
output_unzip.close()

In [None]:
!unzip '/content/drive/MyDrive/Colab Notebooks/train.zip' -d '/content'

## Train data

In [None]:
pokemon = []

for dirname, _, filenames in os.walk('/content/train'):
    pokemon.append(dirname.replace('/content/train/', ''))

pokemon.pop(0)
pokemon.sort()

# poketmon list
print(pokemon)   
      
# poketmon length 
print(len(pokemon))     

## Test data

In [None]:
test_pokemon = []

for dirname, _, filenames in os.walk('/content/test/'):
    test_pokemon.append(filenames)

print(test_pokemon)
print(len(test_pokemon[0]))

## Custom Data

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

torch.manual_seed(777)
if device == 'cuda':
    torch.cuda.manual_seed_all(777)

In [None]:
class CustomDataset(Dataset):
    def __init__(self, path, transform, data_type = 'train'):
        super(CustomDataset, self).__init__()
        
        self.path = path
        self.transform = transform
        self.data_type = data_type
        self.image_list = self.get_img_list()

    def __len__(self):
        return len(self.image_list)

    def __getitem__(self, index):
        image = Image.open(self.image_list[index]).convert('RGB')
        image = self.transform(image)
        return image

    def get_img_list(self):
        if(self.data_type == 'train'):
            pass
        elif(self.data_type == 'val'):
            pass
        elif(self.data_type == 'test'):
            return glob(self.path + '/*')


In [None]:
trans = torchvision.transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomCrop(224),
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
])

# Train data
train_data = torchvision.datasets.ImageFolder(root='/content/train', transform=trans)
print(len(train_data))
print(train_data)
data_loader = DataLoader(dataset=train_data, batch_size=32, shuffle=True, num_workers=2)

trans = torchvision.transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
])

# Test data
test_data = CustomDataset('/content/test', trans, data_type='test')
test_set = DataLoader(dataset=test_data, batch_size=1)
print(len(test_data))

## VGG_A

In [None]:
class VGG_A(nn.Module):
    def __init__(self, num_classes: int = 501, init_weights: bool = True):
        super(VGG_A, self).__init__()
        self.convnet = nn.Sequential(
            # Input Channel (RGB: 3)
            nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, padding=1, stride=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2), # 224 -> 112
            
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1, stride=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2), # 112 -> 56
            
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1, stride=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, padding=1, stride=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2), # 56 -> 28

            nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, padding=1, stride=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1, stride=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2), # 28 -> 14

            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1, stride=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1, stride=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2), # 14 -> 7
        )

        self.fclayer = nn.Sequential(
            nn.Linear(512 * 7 * 7, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(4096, num_classes),
        )

    def forward(self, x:torch.Tensor):
        out = self.convnet(x)
        out = torch.flatten(out, 1)
        out = self.fclayer(out)
        return out

In [None]:
vgg_a = VGG_A().to(device)
test_input = (torch.Tensor(8, 3, 256, 256)).to(device)
test_out = vgg_a(test_input)
print(test_out.shape)

## RESNET

In [None]:
import torch.nn as nn
import torch.utils.model_zoo as model_zoo


__all__ = ['ResNet', 'resnet18', 'resnet34', 'resnet50', 'resnet101',
           'resnet152']


model_urls = {
    'resnet18': 'https://download.pytorch.org/models/resnet18-5c106cde.pth',
    'resnet34': 'https://download.pytorch.org/models/resnet34-333f7ec4.pth',
    'resnet50': 'https://download.pytorch.org/models/resnet50-19c8e357.pth',
    'resnet101': 'https://download.pytorch.org/models/resnet101-5d3b4d8f.pth',
    'resnet152': 'https://download.pytorch.org/models/resnet152-b121ed2d.pth',
}

In [None]:
def conv3x3(in_planes, out_planes, stride=1):
    """3x3 convolution with padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=1, bias=False)


def conv1x1(in_planes, out_planes, stride=1):
    """1x1 convolution"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)

In [None]:
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        self.conv1 = conv3x3(inplanes, planes, stride)
        self.bn1 = nn.BatchNorm2d(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(planes, planes)
        self.bn2 = nn.BatchNorm2d(planes)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):

        identity = x

        out = self.conv1(x) # 3x3 stride = 2
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out) # 3x3 stride = 1
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out

In [None]:
class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super(Bottleneck, self).__init__()
        self.conv1 = conv1x1(inplanes, planes) #conv1x1(64,64)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = conv3x3(planes, planes, stride)#conv3x3(64,64)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = conv1x1(planes, planes * self.expansion) #conv1x1(64,256)
        self.bn3 = nn.BatchNorm2d(planes * self.expansion)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x

        out = self.conv1(x) # 1x1 stride = 1
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out) # 3x3 stride = stride 
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out) # 1x1 stride = 1
        out = self.bn3(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out

In [None]:
class ResNet(nn.Module):
    # model = ResNet(Bottleneck, [3, 4, 6, 3], **kwargs) #resnet 50 
    def __init__(self, block, layers, num_classes=1000, zero_init_residual=False):
        super(ResNet, self).__init__()
        
        self.inplanes = 64

        # input [3x256x256]
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        
        self.layer1 = self._make_layer(block, 64, layers[0]'''3''')
        self.layer2 = self._make_layer(block, 128, layers[1]'''4''', stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2]'''6''', stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3]'''3''', stride=2)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

        # Zero-initialize the last BN in each residual branch,
        # so that the residual branch starts with zeros, and each residual block behaves like an identity.
        # This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677
        if zero_init_residual:
            for m in self.modules():
                if isinstance(m, Bottleneck):
                    nn.init.constant_(m.bn3.weight, 0)
                elif isinstance(m, BasicBlock):
                    nn.init.constant_(m.bn2.weight, 0)
    
    def _make_layer(self, block, planes, blocks, stride=1):
        
        downsample = None
        
        if stride != 1 or self.inplanes != planes * block.expansion: 
            
            downsample = nn.Sequential(
                conv1x1(self.inplanes, planes * block.expansion, stride), #conv1x1(256, 512, 2)
                nn.BatchNorm2d(planes * block.expansion), #batchnrom2d(512)
            )

        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample))
        
        self.inplanes = planes * block.expansion #self.inplanes = 128 * 4
        
        for _ in range(1, blocks): 
            layers.append(block(self.inplanes, planes)) # * 3

        return nn.Sequential(*layers)
    

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)

        return x

In [None]:
def resnet18(pretrained=False, **kwargs):
    model = ResNet(BasicBlock, [2, 2, 2, 2], **kwargs) #=> 2*(2+2+2+2) +1(conv1) +1(fc)  = 16 +2 =resnet 18
    return model

In [None]:
def resnet50(pretrained=False, **kwargs):
    model = ResNet(Bottleneck, [3, 4, 6, 3], **kwargs) #=> 3*(3+4+6+3) +(conv1) +1(fc) = 48 +2 = 50
    return model

In [None]:
def resnet152(pretrained=False, **kwargs):
    model = ResNet(Bottleneck, [3, 8, 36, 3], **kwargs) # 3*(3+8+36+3) +2 = 150+2 = resnet152    
    return mode

In [None]:
import torchvision.models.resnet as resnet

In [None]:
res = resnet.resnet50()

In [None]:
res

In [None]:
num_classes = 151
test_input = (torch.Tensor(8, 3, 256, 256)).to(device)
num_ftrs = res.fc.in_features
res.fc = nn.Linear(num_ftrs, num_classes)
res.to(device)
test_out = res(test_input)
print(test_out.shape)

## Optimizer && Cost Function

In [None]:
optimizer = optim.Adam(res.parameters(), lr=0.0001)
loss_func = nn.CrossEntropyLoss().to(device)

## Train 정확도 추가

In [None]:
pre_accuracy = 0.0
total_batch = len(data_loader)
epochs = 30

for epoch in range(epochs):
    avg_cost = 0.0
    for num, data in enumerate(data_loader):
        imgs, labels = data
        imgs = imgs.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        out = res(imgs)
        loss = loss_func(out, labels)
        loss.backward()
        optimizer.step()

        avg_cost += loss / total_batch

        correct_prediction = torch.argmax(out, 1) == labels
    
    accuracy = correct_prediction.float().mean()

    if(pre_accuracy < accuracy):
        pre_accuracy = accuracy
        torch.save(res.state_dict(), '/content/drive/MyDrive/Colab Notebooks/train_data/res50.pth')

    print('[Epoch:{}] cost = {} accuracy = {}'.format(epoch+1, avg_cost, accuracy))

print('Learning Finished!')

## 저장한 모델 불러오기

In [None]:
new_net = VGG_A().to(device)
new_net.load_state_dict(torch.load('/content/drive/MyDrive/Colab Notebooks/train_data/vgg_a_p05.pth'))

## Testing

In [None]:
l = []

print(len(test_set))
with torch.no_grad():
    for num, data in enumerate(test_set):

        imgs = data
        imgs = imgs.to(device)
        # label = label.to(device)

        prediction = vgg_a(imgs)

        index = torch.argmax(prediction, 1).tolist()[0]
        l.append([pokemon[index], index])

        # correct_prediction = torch.argmax(prediction, 1) == label
        
        # accuracy = correct_prediction.float().mean()
        # print('Accuracy', accuracy.item())

## To CSV

In [None]:
df = pd.DataFrame(l)
print(df)
df.to_csv('/content/test.csv', index=False)

In [None]:
from torch.utils.data.sampler import SubsetRandomSampler