# Loading 

In [2]:
import os, torch, torchvision, random
import pandas as pd
import numpy as np
import torch.nn as nn
import matplotlib.pyplot as plt
from PIL import ImageFile, Image
ImageFile.LOAD_TRUNCATED_IMAGES = True

import torch.nn.functional as F
import torchvision.transforms as transforms
import torchvision.models as models
from torch.utils.data import Dataset, random_split, DataLoader
from torchvision.datasets import ImageFolder
from torchvision import models
from torch import optim
from torch.optim import lr_scheduler
from torchsummary import summary
from torch.utils.tensorboard import SummaryWriter
import time

In [3]:
%load_ext tensorboard

In [4]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [5]:
from google.colab import drive
drive.mount('/content/drive',force_remount=False)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [6]:
! cp '/content/drive/MyDrive/py_data/dataset.zip' '../'

In [None]:
! unzip '../dataset.zip' -d 'data/'

Archive:  ../dataset.zip
   creating: data/dataset/PetImages/
   creating: data/dataset/PetImages/Cat/
  inflating: data/dataset/PetImages/Cat/0.jpg  
  inflating: data/dataset/PetImages/Cat/1.jpg  
  inflating: data/dataset/PetImages/Cat/10.jpg  
  inflating: data/dataset/PetImages/Cat/100.jpg  
  inflating: data/dataset/PetImages/Cat/1000.jpg  
  inflating: data/dataset/PetImages/Cat/10000.jpg  
  inflating: data/dataset/PetImages/Cat/10001.jpg  
  inflating: data/dataset/PetImages/Cat/10002.jpg  
  inflating: data/dataset/PetImages/Cat/10003.jpg  
  inflating: data/dataset/PetImages/Cat/10004.jpg  
  inflating: data/dataset/PetImages/Cat/10005.jpg  
  inflating: data/dataset/PetImages/Cat/10006.jpg  
  inflating: data/dataset/PetImages/Cat/10007.jpg  
  inflating: data/dataset/PetImages/Cat/10008.jpg  
  inflating: data/dataset/PetImages/Cat/10009.jpg  
  inflating: data/dataset/PetImages/Cat/1001.jpg  
  inflating: data/dataset/PetImages/Cat/10010.jpg  
  inflating: data/dataset/Pe

# TensorBoard

In [None]:
%tensorboard --logdir=runs

# helpers

In [None]:
class RandomErasing(object):
    def __init__(self, probability=1, sl=0.02, sh=0.4, r1=0.3, mean=None):
        self.probability = probability
        self.mean = mean
        self.sl = sl
        self.sh = sh
        self.r1 = r1

    def __call__(self, img):

        if random.uniform(0, 1) > self.probability:
            return img

        while True:
            channel, h, w = img.shape
            area = h * w

            gt_area = random.uniform(self.sl, self.sh) * area
            aspect_ratio = random.uniform(self.r1, 1 / self.r1)

            h = int(round(math.sqrt(gt_area * aspect_ratio)))
            w = int(round(math.sqrt(gt_area / aspect_ratio)))

            if w < w and h < h:
                x = random.randint(0, h - h)
                y = random.randint(0, w - w)
                if channel == 3:
                    img[:, x:x + h, y:y + w] = torch.empty((3, h, w), dtype=torch.float32).normal_()
                else:
                    img[0, x:x + h, y:y + w] = torch.empty((1, h, w), dtype=torch.float32).normal_()
                return img

# Dataset

In [None]:
class Dataset(Dataset):
    
    def __init__(self, filenames, labels, transform):
        
        self.filenames = filenames # 資料集的所有檔名
        self.labels = labels # 影像的標籤
        self.transform = transform # 影像的轉換方式
 
    def __len__(self):
        
        return len(self.filenames) # return DataSet 長度
 
    def __getitem__(self, idx):
        
        try:
            image = Image.open(self.filenames[idx]).convert('RGB')
            image = self.transform(image)
            label = np.array(self.labels[idx])
        except:
            return None
           
        return image, label # return 模型訓練所需的資訊


In [None]:
def collate_fn(batch):
    batch = list(filter(lambda x: x is not None, batch))
    return torch.utils.data.dataloader.default_collate(batch)

# DataLoader

In [None]:
normalize = transforms.Normalize(mean = [0.485, 0.456, 0.406], std = [0.229, 0.224, 0.225])

# Transformer
train_transformer = transforms.Compose([
    transforms.Resize(256),
    # transforms.RandomResizedCrop(224),
    # transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    normalize
])
 
test_transformer = transforms.Compose([
    transforms.Resize(224),
    # transforms.CenterCrop(224),
    transforms.ToTensor(),
    normalize
])

# 1.2. 填入 ??? 的部份

def split_Train_Val_Data(data_dir, batch_size= 32):
    
    dataset = ImageFolder(data_dir) 
    
    # 建立 20 類的 list
    character = [[] for i in range(len(dataset.classes))]
    # print(character)
    
    # 將每一類的檔名依序存入相對應的 list
    for x, y in dataset.samples:
        character[y].append(x)
      
    train_inputs, test_inputs = [], []
    train_labels, test_labels = [], []
    
    for i, data in enumerate(character): # 讀取每個類別中所有的檔名 (i: label, data: filename)
        
        np.random.seed(42)
        np.random.shuffle(data)
            
        # -------------------------------------------
        # 將每一類都以 8:2 的比例分成訓練資料和測試資料
        # -------------------------------------------
        
        num_sample_train = 0.8
        num_sample_test = 0.2
        
        # print(str(i) + ': ' + str(len(data)) + ' | ' + str(num_sample_train) + ' | ' + str(num_sample_test))
        
        for x in data[:int(len(data)*num_sample_train)] : # 前 80% 資料存進 training list
            train_inputs.append(x)
            train_labels.append(i)
            
        for x in data[int(len(data)*num_sample_train):] : # 後 20% 資料存進 testing list
            test_inputs.append(x)
            test_labels.append(i)

    train_dataloader = DataLoader(Dataset(train_inputs, train_labels, train_transformer),
                                  collate_fn= collate_fn, batch_size = batch_size, shuffle = True, num_workers= 4)
    test_dataloader = DataLoader(Dataset(test_inputs, test_labels, test_transformer),
                                  collate_fn= collate_fn, batch_size = batch_size, shuffle = False, num_workers= 4)
 
    return train_dataloader, test_dataloader

# Model

In [None]:
class block(nn.Module):
	def __init__(self, in_channels, out_channels, downsample= None, stride= 1):
		super(block, self).__init__()
		self.expansion = 4
		self.conv1=  nn.Conv2d(in_channels= in_channels, out_channels= out_channels, kernel_size= 1, stride= 1, padding= 0)
		self.bn1= nn.BatchNorm2d(out_channels)
		self.conv2 = nn.Conv2d(in_channels= out_channels, out_channels= out_channels, kernel_size= 3, stride= stride, padding= 1)
		self.bn2= nn.BatchNorm2d(out_channels)
		self.conv3 = nn.Conv2d(in_channels= out_channels, out_channels= out_channels*self.expansion, kernel_size= 1, stride= 1, padding= 0)
		self.bn3= nn.BatchNorm2d(out_channels*self.expansion)
		self.relu= nn.ReLU()
		self.downsample = downsample

	def forward(self, x):

		identity= x

		x= self.conv1(x)
		x= self.bn1(x)
		x= self.relu(x)
		x= self.conv2(x)
		x= self.bn2(x)
		x= self.relu(x)
		x= self.conv3(x)
		x= self.bn3(x)

		if self.downsample is not None:
			identity = self.downsample(identity)

		x += identity
		x = self.relu(x)
		return x 


class ResNet(nn.Module): # [3, 4, 6, 3]
	def __init__(self, block, layers, img_channels, n_class):
		super(ResNet, self).__init__()
		self.expansion= 4
		self.in_channels= 64
		self.conv1= nn.Conv2d(in_channels= img_channels, out_channels= self.in_channels, kernel_size= 7, stride= 2, padding= 3)
		self.bn1 = nn.BatchNorm2d(64)
		self.relu = nn.ReLU()
		self.maxpool = nn.MaxPool2d(kernel_size= 3, stride= 2, padding= 1)
		
		self.layer1 = self.ResNet_layer(block= block, n_res_blocks= layers[0], out_channels= 64, stride=1)
		self.layer2 = self.ResNet_layer(block= block, n_res_blocks= layers[1], out_channels= 128, stride=2)
		self.layer3 = self.ResNet_layer(block= block, n_res_blocks= layers[2], out_channels= 256, stride=2)
		self.layer4 = self.ResNet_layer(block= block, n_res_blocks= layers[3], out_channels= 512, stride=2)

		self.avgpool= nn.AdaptiveAvgPool2d((1,1))
		self.fc = nn.Linear(512*4, n_class)

	def forward(self, x):

		x= self.conv1(x)
		x= self.bn1(x)
		x= self.relu(x)
		x= self.maxpool(x)

		x= self.layer1(x)
		x= self.layer2(x)
		x= self.layer3(x)
		x= self.layer4(x)

		x= self.avgpool(x)
		x = x.reshape(x.shape[0], -1)
		x= self.fc(x)

		return x



	def ResNet_layer(self, block, n_res_blocks, out_channels, stride):
		downsample = None
		layers = []

		if stride != 1 or self.in_channels != out_channels * self.expansion:
			downsample = nn.Sequential(nn.Conv2d(self.in_channels, out_channels*self.expansion, kernel_size= 1, stride= stride),
			nn.BatchNorm2d(out_channels*self.expansion))

		layers.append(block(self.in_channels, out_channels, downsample, stride))
		self.in_channels= out_channels*self.expansion

		for i in range(n_res_blocks- 1):
			layers.append(block(self.in_channels, out_channels))

		return nn.Sequential(*layers)

def ResNet50(img_channels= 3, n_class= 10):
	return ResNet(block, [3, 4, 6, 3], img_channels= img_channels, n_class= n_class)

# 參數設定

In [None]:
batch_size = 128
lr = 1e-3
weight_decay = 1e-5
momentum = 0.9 
n_classes = 2 
step_size=  10
epochs = 10

data_dir = '/content/data/dataset/PetImages/'

In [None]:
train_dataloader, test_dataloader = split_Train_Val_Data(data_dir, batch_size= batch_size)


# C = models.resnet50(pretrained=True)
# num_features = C.fc.in_features
# C.fc = nn.Linear(num_features, n_classes)

C = ResNet50(img_channels=3, n_class=2).to(device)

optimizer_C = optim.SGD(C.parameters(), lr = lr, momentum= momentum, weight_decay= weight_decay) # 選擇你想用的 optimizer
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_C, step_size=step_size, gamma=0.1)

print(summary(C, (3,224,224))) # 利用 torchsummary 的 summary package 印出模型資訊，input size: (3 * 224 * 224)

# Loss function
criterion = nn.CrossEntropyLoss()

# 訓練

In [None]:
loss_epoch_C = []
train_acc, test_acc = [], []
best_acc, best_auc = 0.0, 0.0

In [None]:
def train():
  if __name__ == '__main__':

    writer = SummaryWriter(f'runs/hw2/no random erasing lr {lr} batch_size {batch_size} weight_decay {weight_decay} num_epoch {epochs}')
  
    for epoch in range(epochs):
      start = time.time()
      iter = 0
      correct_train, total_train = 0, 0
      correct_test, total_test = 0, 0
      train_loss_C = 0.0

      C.train() # 設定 train 或 eval
    
      print('epoch: ' + str(epoch + 1) + ' / ' + str(epochs))  
      
      # ---------------------------
      # Training Stage
      # ---------------------------
      
      for i, (x, label) in enumerate(train_dataloader) :
          iter_start = time.time()   

          x, label = x.to(device), label.to(device)
                      
          optimizer_C.zero_grad()
          input_start = time.time()
          outputs = C(x) # 將訓練資料輸入至模型進行訓練
          loss = criterion(outputs, label) # 計算 loss
          
          loss.backward() # 將 loss 反向傳播
          optimizer_C.step() # 更新權重
          
          # 計算訓練資料的準確度 (correct_train / total_train)
          _, predicted = torch.max(outputs.data,1)
          total_train += label.size(0)
          correct_train += (predicted == label).sum().item()

          train_loss_C += loss.item()
          iter += 1

          writer.add_scalar('training loss', loss.item(), global_step= epoch*len(train_dataloader) + i+1)
          writer.add_scalar('training accuracy', (predicted == label).sum().item()/label.size(0), global_step=epoch*len(train_dataloader) + i+1)

      exp_lr_scheduler.step()  

      print('Training epoch: %d / loss_C: %.3f | acc: %.3f' % \
            (epoch + 1, train_loss_C / iter, correct_train / total_train))
      end = time.time()
      print('Training takes %.3f secs'%(end-start))

      
      
      
      # --------------------------
      # Testing Stage
      # --------------------------
      
      C.eval() # 設定 train 或 eval
        
      for i, (x, label) in enumerate(test_dataloader) :
        
          with torch.no_grad(): # 測試階段不需要求梯度
              x, label = x.to(device), label.to(device)
              
              outputs = C(x) # 將測試資料輸入至模型進行測試
              _, predicted = torch.max(outputs.data,1)
              loss = criterion(outputs, label)
              total_test += label.size(0)
              correct_test += (predicted == label).sum().item()

              writer.add_scalar('test loss', loss.item(), global_step= epoch*len(test_dataloader) + i+1)
              writer.add_scalar('test accuracy', (predicted == label).sum().item()/label.size(0), global_step=epoch*len(test_dataloader) + i+1)
      
      print('Testing acc: %.3f' % (correct_test / total_test))
                                    
      train_acc.append(100 *(correct_train / total_train)) # training accuracy
      test_acc.append(100 * (correct_test / total_test))  # testing accuracy

      

      loss_epoch_C.append(train_loss_C / iter) # loss 
  return 'finished'

In [None]:
train()

In [None]:
torch.save(C, 'ResNet_hw2.pt')

In [None]:
# !cp 'ResNet_hw2.pt' '/content/drive/My Drive/py_data'

# 結果

In [None]:
plt.figure()

plt.plot(list(range(1,epochs+1)), loss_epoch_C) # plot your loss

plt.title('Training Loss')
plt.ylabel('loss'), plt.xlabel('epoch')
plt.legend(['loss_C'], loc = 'upper left')
plt.savefig('loss.jpg')
plt.show()

plt.figure()

plt.plot(list(range(1,epochs+1)), train_acc) # plot your training accuracy
plt.plot(list(range(1,epochs+1)), test_acc) # plot your testing accuracy

plt.title('Training acc')
plt.ylabel('acc (%)'), plt.xlabel('epoch')
plt.legend(['training acc', 'testing acc'], loc = 'upper left')
plt.savefig('accu.jpg')
plt.show()