In [None]:
# AlexNet

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import os
from PIL import Image
from glob import glob
import random
from sklearn.model_selection import train_test_split
from torchsummary import summary
from tqdm import tqdm
import datetime
from collections import namedtuple
%matplotlib inline

In [None]:
device = torch.device('cuda:0'if torch.cuda.is_available() else 'cpu')
print(device)

In [None]:
cat_directory = r'./data/cat/'
dog_directory = r'./data/dog/'

cat_images_filepaths = glob(cat_directory + '*.jpg')
dog_images_filepaths = glob(dog_directory + '*.jpg')

images_filepaths = [*cat_images_filepaths[:3000], *dog_images_filepaths[:3000]]
random.seed(42)
random.shuffle(images_filepaths)

train_path, test_path = train_test_split(images_filepaths, test_size=0.2, random_state=42)
train_path, val_path, = train_test_split(train_path, test_size=0.25, random_state=42)
print(len(train_path), len(val_path), len(test_path))

In [None]:
class Custom_Dataset(Dataset):
    def __init__(self, path, transforms=None):
        self.path = path
        self.transform = transforms
    
    def __len__(self):
        return len(self.path)
        
    def __getitem__(self, idx):
        img_path = self.path[idx]
        img = Image.open(img_path)
        img = img.resize((224, 224))
        
        if self.transform:
            img = self.transform(img)
        
        label = self.path[idx].split('\\')[-1].split('.')[0]
        
        if label == 'dog':
            label = 1
        elif label == 'cat':
            label = 0
        
        return img, label

In [None]:
transform = transforms.Compose([
    transforms.ToTensor(),
    # transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

In [None]:
batch_size = 8

train_set = Custom_Dataset(train_path, transforms=transform)
val_set = Custom_Dataset(val_path, transforms=transform)
test_set = Custom_Dataset(test_path, transforms=transform)

train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False)
classes = ('cat', 'dog')

In [None]:
# 이미지를 보여주기 위한 함수
def imshow(img, label):
    plt.figure(figsize=(10, 2))
    img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.title(label)
    plt.show()
# 학습용 이미지를 무작위로 가져오기
dataiter = next(iter(train_loader))
images, labels = dataiter
# 이미지 보여주기
imshow(torchvision.utils.make_grid(images), labels)
# 정답(label) 출력
print(' '.join(f'{classes[labels[j]]:5s}'for j in range(batch_size)))

class AlexNet(nn.Module):
    def __init__(self) -> None:
        super(AlexNet, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(64, 192, kernel_size=5, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(192, 384, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(384, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
        )
        self.avgpool = nn.AdaptiveAvgPool2d((6, 6))
        self.classifier = nn.Sequential(
            nn.Dropout(),
            nn.Linear(256 * 6 * 6, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(4096, 512),
            nn.ReLU(inplace=True),
            nn.Linear(512, 2),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        
        return x

model = AlexNet().to(device)
summary(model, (3, 224, 224))

In [None]:
epoch = 10
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

In [None]:
t_data_len = len(train_set)
v_data_len = len(val_set)
t_t_loss, t_t_acc = [], []
t_v_loss, t_v_acc = [], []
best_loss = np.inf

for e in range(epoch):
    r_t_loss, r_t_acc = 0.0, 0.0
    r_v_loss, r_v_acc = 0.0, 0.0
    
    model.train()
    for inputs, labels in tqdm(train_loader):
        inputs, labels = inputs.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(inputs)
        
        loss = criterion(outputs, labels)
        
        loss.backward()
        optimizer.step()
        
        r_t_loss += loss.item() * inputs.size(0)
        
        _, pred = torch.max(outputs, 1)
        r_t_acc += torch.sum(pred == labels).item()
    
    t_t_loss.append(r_t_loss / t_data_len)
    t_t_acc.append(r_t_acc / t_data_len)
    
    
    with torch.no_grad():
        model.eval()
        for inputs, labels in tqdm(val_loader):
            inputs, labels = inputs.to(device), labels.to(device)
            
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            r_v_loss += loss.item() * inputs.size(0)
            
            _, pred = torch.max(outputs, 1)
            r_v_acc += torch.sum(pred == labels).item()
    
    loss = r_v_loss / v_data_len
    
    t_v_loss.append(loss)
    t_v_acc.append(r_v_acc / v_data_len)
    
    if best_loss > loss:
        best_loss = loss
        torch.save(model.state_dict(), './save_net.pth')
    
    print(
        f'epoch : {e + 1}\n'
        f'train_loss : {t_t_loss[e]:.4f}\ttrain_acc : {t_t_acc[e] * 100:.2f}%\n'
        f'val_loss : {t_v_loss[e]:.4f}\tval_acc : {t_v_acc[e] * 100:.2f}%')

In [None]:
plt.figure(figsize=(10, 4))
plt.subplot(1, 2, 1)
plt.plot(range(1, epoch + 1), t_t_loss, 'b', label='train_loss')
plt.plot(range(1, epoch + 1), t_v_loss, 'r', label='val_loss')
plt.xlabel('epoch')
plt.ylabel('loss')
plt.legend()
plt.subplot(1, 2, 2)
plt.plot(range(1, epoch + 1), t_t_acc, 'b', label='train_acc')
plt.plot(range(1, epoch + 1), t_v_acc, 'r', label='val_acc')
plt.xlabel('epoch')
plt.ylabel('acc')
plt.legend()
plt.tight_layout()

In [None]:
model.load_state_dict(torch.load('./save_net.pth'))

r_t_loss, r_t_acc = 0.0, 0.0
t_data_len = len(test_set)

with torch.no_grad():
    model.eval()
    for inputs, labels in tqdm(test_loader):
        inputs, labels = inputs.to(device), labels.to(device)
        
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        r_t_loss += loss.item() * inputs.size(0)
        
        _, pred = torch.max(outputs, 1)
        r_t_acc += torch.sum(pred == labels).item()
loss = r_t_loss / t_data_len
acc = r_t_acc / t_data_len
print(f'Loss : {loss:.4f}, Accuracy : {acc * 100:.2f}%')

In [None]:
# 테스트용 이미지를 무작위로 가져오기
dataiter = next(iter(test_loader))
images, labels = dataiter
images = images.to(device)
output = model(images)
_, pred = torch.max(output, 1)
# 이미지 보여주기
imshow(torchvision.utils.make_grid(images.detach().cpu()), pred)
# 예측, 정답(label) 출력
print('예측 : ', end="")
print(' '.join(f'{classes[pred[j]]:5s}'for j in range(batch_size)))
print('정답 : ', end="")
print(' '.join(f'{classes[labels[j]]:5s}'for j in range(batch_size)))

In [None]:
# VGGNet

In [None]:
class VGG(nn.Module):
    def __init__(self, features, output_dim):
        super().__init__()        
        self.features = features        
        self.maxpool = nn.MaxPool2d(7)
        self.classifier = nn.Sequential(
            nn.Linear(512, 4096),
            nn.ReLU(inplace = True),
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace = True),
            nn.Dropout(0.5),
            nn.Linear(4096, output_dim),
        )
    def forward(self, x):
        x = self.features(x)
        x = self.maxpool(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x

In [None]:
vgg11_config = [64, 'M', 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M']
vgg13_config = [64, 64, 'M', 128, 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M']
vgg16_config = [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 
                512, 'M']
vgg19_config = [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 
                512, 512, 512, 512, 'M']

In [None]:
def get_vgg_layers(config, batch_norm):    
    layers = []
    in_channels = 3
    
    for c in config:
        assert c == 'M'or isinstance(c, int)
        if c == 'M':
            layers += [nn.MaxPool2d(kernel_size = 2)]
        else:
            conv2d = nn.Conv2d(in_channels, c, kernel_size = 3, padding = 1)
            if batch_norm:
                layers += [conv2d, nn.BatchNorm2d(c), nn.ReLU(inplace = True)]
            else:
                layers += [conv2d, nn.ReLU(inplace = True)]
            in_channels = c
            
    return nn.Sequential(*layers)

In [None]:
vgg11_layers = get_vgg_layers(vgg11_config, batch_norm = True)
OUTPUT_DIM = 2
model = VGG(vgg11_layers, OUTPUT_DIM).to(device)
print(model)

In [None]:
# ResNet

In [None]:
class BasicBlock(nn.Module):    
    expansion = 1
    
    def __init__(self, in_channels, out_channels, stride = 1, downsample = False):
        super().__init__()                
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size = 3, 
                               stride = stride, padding = 1, bias = False)
        self.bn1 = nn.BatchNorm2d(out_channels)        
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size = 3, 
                               stride = 1, padding = 1, bias = False)
        self.bn2 = nn.BatchNorm2d(out_channels)        
        self.relu = nn.ReLU(inplace = True)
        
        if downsample:
            conv = nn.Conv2d(in_channels, out_channels, kernel_size = 1, 
                             stride = stride, bias = False)
            bn = nn.BatchNorm2d(out_channels)
            downsample = nn.Sequential(conv, bn)
        else:
            downsample = None        
        self.downsample = downsample
        
    def forward(self, x):       
        i = x       
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)        
        x = self.conv2(x)
        x = self.bn2(x)
        
        if self.downsample is not None:
            i = self.downsample(i)
                        
        x += i
        x = self.relu(x)
        
        return x

In [None]:
class Bottleneck(nn.Module):    
    expansion = 4
    
    def __init__(self, in_channels, out_channels, stride = 1, downsample = False):
        super().__init__()    
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size = 1, stride = 1, bias = False)
        self.bn1 = nn.BatchNorm2d(out_channels)        
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size = 3, stride = stride, padding = 1, bias = False)
        self.bn2 = nn.BatchNorm2d(out_channels)        
        self.conv3 = nn.Conv2d(out_channels, self.expansion * out_channels, kernel_size = 1,
                               stride = 1, bias = False)
        self.bn3 = nn.BatchNorm2d(self.expansion * out_channels)        
        self.relu = nn.ReLU(inplace = True)
        
        if downsample:
            conv = nn.Conv2d(in_channels, self.expansion * out_channels, kernel_size = 1, 
                             stride = stride, bias = False)
            bn = nn.BatchNorm2d(self.expansion * out_channels)
            downsample = nn.Sequential(conv, bn)
        else:
            downsample = None            
        self.downsample = downsample
        
    def forward(self, x):        
        i = x        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)        
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)        
        x = self.conv3(x)
        x = self.bn3(x)
                
        if self.downsample is not None:
            i = self.downsample(i)
            
        x += i
        x = self.relu(x)
    
        return x

In [None]:
class ResNet(nn.Module):
    def __init__(self, config, output_dim, zero_init_residual=False):
        super().__init__()
                
        block, n_blocks, channels = config
        self.in_channels = channels[0]            
        assert len(n_blocks) == len(channels) == 4
        
        self.conv1 = nn.Conv2d(3, self.in_channels, kernel_size = 7, stride = 2, padding = 3, bias = False)
        self.bn1 = nn.BatchNorm2d(self.in_channels)
        self.relu = nn.ReLU(inplace = True)
        self.maxpool = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
        
        self.layer1 = self.get_resnet_layer(block, n_blocks[0], channels[0])
        self.layer2 = self.get_resnet_layer(block, n_blocks[1], channels[1], stride = 2)
        self.layer3 = self.get_resnet_layer(block, n_blocks[2], channels[2], stride = 2)
        self.layer4 = self.get_resnet_layer(block, n_blocks[3], channels[3], stride = 2)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(self.in_channels, output_dim)
        if zero_init_residual:
            for m in self.modules():
                if isinstance(m, Bottleneck):
                    nn.init.constant_(m.bn3.weight, 0)
                elif isinstance(m, BasicBlock):
                    nn.init.constant_(m.bn2.weight, 0)
        
    def get_resnet_layer(self, block, n_blocks, channels, stride = 1):   
        layers = []        
        if self.in_channels != block.expansion * channels:
            downsample = True
        else:
            downsample = False
        
        layers.append(block(self.in_channels, channels, stride, downsample))
        
        for i in range(1, n_blocks):
            layers.append(block(block.expansion * channels, channels))
        self.in_channels = block.expansion * channels            
        return nn.Sequential(*layers)
        
    def forward(self, x):        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)        
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)        
        return x

In [None]:
from collections import namedtuple
ResNetConfig = namedtuple('ResNetConfig', ['block', 'n_blocks', 'channels'])

In [None]:
resnet18_config = ResNetConfig(block = BasicBlock,
                               n_blocks = [2,2,2,2],
                               channels = [64, 128, 256, 512])
resnet34_config = ResNetConfig(block = BasicBlock,
                               n_blocks = [3,4,6,3],
                               channels = [64, 128, 256, 512])
resnet50_config = ResNetConfig(block = Bottleneck,
                               n_blocks = [3, 4, 6, 3],
                               channels = [64, 128, 256, 512])
resnet101_config = ResNetConfig(block = Bottleneck,
                                n_blocks = [3, 4, 23, 3],
                                channels = [64, 128, 256, 512])
resnet152_config = ResNetConfig(block = Bottleneck,
                                n_blocks = [3, 8, 36, 3],
                                channels = [64, 128, 256, 512])

In [None]:
OUTPUT_DIM = 2
model = ResNet(resnet50_config, OUTPUT_DIM).to(device)
print(model)