In [1]:
import torch
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
from torchvision import transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# 設定使用gpu訓練
CUDA = True
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 輸出cuda代表有使用gpu
print(device)

# 設定batch大小
batch_size = 64

# 設定data前處理以及augmentation
# 加入transforms.ToTensor()將資料轉換為tensor
train_transform = transforms.Compose([
                  transforms.RandomResizedCrop(size = (256,256),scale=(0.7, 1.0), ratio=(1.0, 1.0)),
                  transforms.RandomHorizontalFlip(p = 0.5),
                  transforms.Resize((256, 256)),
                  transforms.ToTensor(),
                  transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# validation data也需要做前處理
val_transform = transforms.Compose([
                  transforms.Resize((256, 256)),
                  transforms.ToTensor(),
                  transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# 使用torchvision.datasets.ImageFolder讀取training data
image_folder = ImageFolder('D:/project/dataset/AnimalFaces/afhq/train', transform = train_transform, target_transform = None)
# 建立DataLoader，shuffle = True表示會將data順序打亂
train_loader = DataLoader(dataset = image_folder, batch_size = batch_size, shuffle = True, num_workers = 2)

# 使用torchvision.datasets.ImageFolder讀取validation data
val_image_folder = ImageFolder('D:/project/dataset/AnimalFaces/afhq/val', transform = val_transform, target_transform = None)
# 建立DataLoader，shuffle = True表示不會將data順序打亂
val_loader = DataLoader(dataset = val_image_folder, batch_size = batch_size, shuffle = False, num_workers = 2)


  from . import _distributor_init


cuda


In [2]:
# 建立普通Block
class BasicBlock(nn.Module):
    def __init__(self, in_ch, out_ch, down_sample = False):
        super(BasicBlock, self).__init__()
        self.down_sample = down_sample
        if self.down_sample:
            self.conv1 = nn.Conv2d(in_channels=in_ch, out_channels=out_ch, kernel_size=3, padding=1, stride=2)
            self.conv_shortcut = nn.Conv2d(in_channels=in_ch, out_channels=out_ch, kernel_size=1, stride=2)
        else:
            self.conv1 = nn.Conv2d(in_channels=in_ch, out_channels=out_ch, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(out_ch)
        self.conv2 = nn.Conv2d(in_channels=out_ch, out_channels=out_ch, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(out_ch)

    def forward(self, x):
        x_residual = x
        if self.down_sample:
            x_residual = self.conv_shortcut(x_residual)
        x = self.conv1(x)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = x_residual + x
        x = F.relu(x)
        return x
    
# 建立BottleNeck
class BottleNeckBlock(nn.Module):
    def __init__(self, in_ch, out_ch, down_sample = False):
        super(BottleNeckBlock, self).__init__()
        self.down_sample = down_sample
        if self.down_sample:
            self.conv1 = nn.Conv2d(in_channels=in_ch, out_channels=out_ch//4, kernel_size=1, stride=2)
            self.conv_shortcut = nn.Conv2d(in_channels=in_ch, out_channels=out_ch, kernel_size=1, stride=2)
        else:
            self.conv1 = nn.Conv2d(in_channels=in_ch, out_channels=out_ch//4, kernel_size=1)
        self.bn1 = nn.BatchNorm2d(out_ch//4)
        self.conv2 = nn.Conv2d(in_channels=out_ch//4, out_channels=out_ch//4, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(out_ch//4)
        self.conv3 = nn.Conv2d(in_channels=out_ch//4, out_channels=out_ch, kernel_size=1)
        self.bn3 = nn.BatchNorm2d(out_ch)

    def forward(self, x):
        x_residual = x
        if self.down_sample:
            x_residual = self.conv_shortcut(x_residual)
        x = self.conv1(x)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = F.relu(x)
        x = self.conv3(x)
        x = self.bn3(x)
        x = x_residual + x
        x = F.relu(x)
        return x
    
class Blocks(nn.Module):
    def __init__(self, Block, in_ch, out_ch, number):
        super(Blocks, self).__init__()
        self.block_list = nn.ModuleList()
        self.block_list.append(Block(in_ch, out_ch, down_sample = True))
        for i in range(1, number):
            self.block_list.append(Block(out_ch, out_ch, down_sample = False))
            
    def forward(self, x):
        for i in range(len(self.block_list)):
            x = self.block_list[i](x)
        return x
        

In [3]:
# 建立model
class Model(nn.Module):
    def __init__(self, BlockType, out_feature = 3, block_num_list = [2, 2, 2, 2], in_channel_list = [64, 64, 128, 256], out_channel_list = [64, 128, 256, 512]):
        super(Model, self).__init__()
        # 在此加入用到的各項操作
        # convolution需指定輸入channel數量以及輸出channel數量
        # stride為步長, 在此用來進行down sample操作
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=64, kernel_size=7, padding=3, stride=2)
        # batch norm需要指定輸入channel數量
        self.bn1 = nn.BatchNorm2d(64)
        
        assert len(block_num_list) == len(in_channel_list), '數量必須相等'
        self.blocks_list = nn.ModuleList()
        for i in range(len(block_num_list)):
            self.blocks_list.append(Blocks(BlockType, in_channel_list[i], out_channel_list[i], block_num_list[i]))

        # dropout操作, 注意這裡指定的是drop掉的比例
        self.drop = nn.Dropout(p=0.5)
        self.fc1 = nn.Linear(in_features=out_channel_list[-1], out_features=out_feature)

    def forward(self, x):
        # relu activation function處理
        x = F.relu(self.conv1(x))
        x = self.bn1(x)
        # max pooling指定大小, padding和stride
        x = F.max_pool2d(x, kernel_size=3, padding=1, stride=2)
        # block結構
        for i in range(len(self.blocks_list)):
            x = self.blocks_list[i](x)
        # global max pooling
        x = F.max_pool2d(x, kernel_size=x.size()[2:])
        x = torch.flatten(x, 1)
        x = self.drop(x)
        x = self.fc1(x)
        return x

model = Model(BasicBlock, out_feature = 3, block_num_list = [3, 4, 6, 3], in_channel_list = [64, 64, 128, 256], out_channel_list = [64, 128, 256, 512])
# 若報錯 Input type (torch.cuda.FloatTensor) and weight type (torch.FloatTensor) should be the same
# 需要將model轉換為使用GPU
model = model.cuda()

In [4]:
# 指定loss function以及optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)


In [None]:
# 開始訓練
for epoch in range(10):
    train_loss = 0.0
    train_acc = 0.0
    train_step_count = 0.0
    # 將model切換為training模式, 影響dropout和BN
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        if CUDA:
            # 資料轉換為使用GPU
            data, target = data.cuda(), target.cuda()
            
        optimizer.zero_grad()
        
        # Forward propagation
        output = model(data)
        loss = criterion(output, target)

        # Calculate gradients
        loss.backward()

        # Update parameters
        optimizer.step()

        # 獲取判斷結果
        predicted = torch.max(output.data, 1)[1]
        
        train_loss += loss.item()
        train_acc += torch.sum(predicted == target, dtype = torch.float32).item()/batch_size
        train_step_count += 1.0
        
    print('train epoch:', epoch, 'loss', train_loss/train_step_count, 'acc', train_acc/train_step_count)
    
    # 每個epoch進行validation
    val_loss = 0.0
    val_acc = 0.0
    val_step_count = 0.0
    # 將model切換模式, 影響dropout和BN
    model.eval()
    # validation時不計算gradients
    with torch.no_grad():
        for val_batch_idx, (val_data, val_target) in enumerate(val_loader):
            if CUDA:
                val_data, val_target = val_data.cuda(), val_target.cuda()
            val_output = model(val_data)
            val_predicted = torch.max(val_output.data, 1)[1]
            val_loss += criterion(val_output, val_target).item()
            val_acc += torch.sum(val_predicted == val_target, dtype = torch.float32).item()/batch_size
            val_step_count += 1.0
        
    print('validation:', 'loss', val_loss/val_step_count, 'acc', val_acc/val_step_count)
        

train epoch: 0 loss 1.33937311927304 acc 0.506004366812227
validation: loss 0.7298508249223232 acc 0.6399739583333334
train epoch: 1 loss 0.5650005721889729 acc 0.7941457423580786
validation: loss 0.3483299103875955 acc 0.8470052083333334


In [6]:
# 儲存權重
torch.save(model.state_dict(), 'ResNet_weights.pth')


In [7]:
# 讀取權重
model.load_state_dict(torch.load('ResNet_weights.pth'))

# 可以進行測試集的效果評估, 再此以validation結果代替
val_acc = 0.0
val_step_count = 0.0
model.eval()
with torch.no_grad():
    for val_batch_idx, (val_data, val_target) in enumerate(val_loader):
        if CUDA:
            val_data, val_target = val_data.cuda(), val_target.cuda()
        val_output = model(val_data)
        val_predicted = torch.max(val_output.data, 1)[1]
        val_acc += torch.sum(val_predicted == val_target, dtype = torch.float32).item()/batch_size
        val_step_count += 1.0

print('validation:', 'acc', val_acc/val_step_count)

validation: acc 0.9427083333333334
