In [1]:
%pip list|grep torch
%pip uninstall torchaudio torchvision -y
%pip install torchvision==0.13.1

Note: you may need to restart the kernel to use updated packages.


'grep' ���O�����Υ~���R�O�B�i���檺�{���Χ妸�ɡC


Note: you may need to restart the kernel to use updated packages.




Note: you may need to restart the kernel to use updated packages.


ERROR: Ignored the following yanked versions: 0.1.6, 0.1.7, 0.1.8, 0.1.9, 0.2.0, 0.2.1, 0.2.2, 0.2.2.post2, 0.2.2.post3
ERROR: Could not find a version that satisfies the requirement torchvision==0.13.1 (from versions: 0.17.0, 0.17.1, 0.17.2, 0.18.0, 0.18.1, 0.19.0, 0.19.1, 0.20.0, 0.20.1)

[notice] A new release of pip is available: 24.2 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip
ERROR: No matching distribution found for torchvision==0.13.1


In [22]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import os
import numpy as np
from PIL import Image
from torch.utils.data import Dataset, random_split

import torchvision.transforms as T
import torch.optim as optim

import logging
logging.basicConfig(level=logging.DEBUG) # 印出 debug log info


In [23]:
# 沒有權重(可調參數) 的放在 forward(), 有權重的放在 __init__()
# 例如 max pooling 沒有權重，所以放在 init
class DoubleConv(nn.Module):
  def __init__(self, in_channels, out_channels):
    super().__init__()

    # 一個簡易的模板 (由上到下)
    self.conv = nn.Sequential(
      # 3,1,1 指的是 kernel=3, stride=1, padding=1
      # 因為 batch norm 所以不需要 bias
      nn.Conv2d(in_channels, out_channels, 3, 1, 1, bias = False),

      # nn.BatchNorm2d: 可以讓 training 更快，也更不容易 overfitting
      # 通常以 channel 數當作 feature
      # 會多一些參數
      nn.BatchNorm2d(out_channels),

      # inplace: 不會製造新物件
      nn.ReLU(inplace = True),

      # double conv
      # 維持維度不變，out -> out
      nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias = False),
      nn.BatchNorm2d(out_channels),
      nn.ReLU(inplace=True)
    )

  # 在建立 DoubleConv 物件時會自動執行 forward
  def forward(self, x):
    # 只要繼承 nn.Module，就可以在呼叫之後把資料作前向傳播
    # return 剛剛的 conv 物件
    return self.conv(x)


In [24]:
# Build UNet from scrach
class UNet(nn.Module):
  # in_channels: rgb 影像, channel = 3
  # out_channels: 0/1 binary class(做 mask)
  # features: 上面的圖片寫的維度
  ############################################################
  #! fill the features in the list to determin how many channel your model going to use
  def __init__(self, in_channels=3, out_channels=1, features=[64, 128, 256, 512]):
  ############################################################
    super().__init__()
    self.downs = nn.ModuleList() # 前半
    self.ups = nn.ModuleList() # 後半

    # 依序 append 前半的四段 layer
    for feature in features:
      self.downs.append(DoubleConv(in_channels, feature))
      in_channels = feature

    # bottlenect: 512 -> 1024
    self.bottleneck = DoubleConv(features[-1], features[-1]*2)

    # 後半的四段 layer
    for feature in reversed(features):
      # 功能類似上取樣的插值法，但是參數可訓練
      # 大小減半 (第一層: 1024 -> 512)
      self.ups.append(nn.ConvTranspose2d(feature * 2, feature, 2, 2))
      # 因為中間的 skip connection concat 過來後，size 會疊加，所以 in_channel 還是2倍的 feature
      self.ups.append(DoubleConv(feature * 2, feature))

    # 最後 out_channel 是 64，但是我們需要一維的輸出，所以用 1*1 的 kernel 去卷積，把輸出拉直
    # features[0] = 64
    # out_channels = 1
    # kernel: 1*1, stride=1, padding=0
    self.final_conv = nn.Conv2d(features[0], out_channels, 1, 1, 0)

  def forward(self, x):
    # 真正架構的部分會在 forward 這邊實作
    # 會先暫存當下 layer 的內容 (因為要做 skip connection)，再繼續往下捲
    skip_connections = []
    for down in self.downs:
      logging.debug(f'shape of x: {x.shape}')
      x = down(x)
      skip_connections.append(x)
      x = F.max_pool2d(x, (2, 2))

    logging.debug(f'shape of x: {x.shape}')
    x = self.bottleneck(x)

    # 因為 skip connection 的安裝(?)順序是反著的，所以做一個 in-place 的 reverse
    skip_connections.reverse()
    # self.ups 裡面，每一階段有 2 個 layer 組成 (上採樣 convtranspose2d + 兩次卷積 doubleconv)
    for i in range(0, len(self.ups), 2):
      logging.debug(f'shape of x: {x.shape}')
      # 因為 ups 裡面的東西都有繼承 nn.Module，所以 call 他都會進行前向傳播
      # 先上採樣
      x = self.ups[i](x) # e.g. 藍箭頭的 512
      # 抓 skip_connections 清單裡面對應的 layer 出來
      skip_connection = skip_connections[i//2] # e.g. 灰箭頭
      # concat 起來(沿著 channel 維度)
      concat = torch.cat((skip_connection, x), dim=1) # N * C * H * W, e.g. 最右邊沒標大小的箭頭(應該是1024)
      # 做 double conv
      x = self.ups[i+1](concat) # 往上捲，往上一層

    # 把 channel 拉直
    return self.final_conv(x)



In [25]:
class MyDataset(Dataset):
  def __init__(self, image_dir, mask_dir, transform):
    super().__init__()
    self.image_dir = image_dir
    self.mask_dir = mask_dir
    self.transform = transform
    self.images = os.listdir(image_dir)

  def __len__(self):
    return len(self.images)

  def __getitem__(self, index):
    img_path = os.path.join(self.image_dir, self.images[index])
    mask_path = os.path.join(self.mask_dir, self.images[index].replace('.jpg', '_mask.gif'))
    image = np.array(Image.open(img_path))
    mask = np.array(Image.open(mask_path).convert('L'))
    return self.transform(image), self.transform(mask)




In [32]:
def train(model, num_epochs, train_loader, optimizer):
  for epoch in range(num_epochs):
    for _, (x, y) in enumerate(train_loader):
      model.train()
      x = x.to(device)
      y = y.to(device)
      out = model(x)
      out = torch.sigmoid(out)
      loss = loss_function(out, y)
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
    eval(model, val_loader, epoch)



In [33]:
def eval(model, val_loader, epoch):
  model.eval()
  num_correct = 0
  num_pixels = 0
  with torch.no_grad():
    for batch_idx, (x, y) in enumerate(val_loader):
      x = x.to(device)
      y = y.to(device)
      out_img = model(x)
      probability = torch.sigmoid(out_img)
      predictions = probability>0.5
      ############################################################
      #! save one sample picture to verify result
      if batch_idx == 0:  # 只保存第一個 batch 的樣本
          # 處理原始圖像
          original_image = x[0].permute(1, 2, 0).cpu().numpy()  # (H, W, C)
          original_image = (original_image * 255).astype(np.uint8)

          # 處理 Ground Truth Mask
          gt_mask = y[0].squeeze(0).cpu().numpy()  # (H, W)
          gt_mask = (gt_mask * 255).astype(np.uint8)

          # 處理 Predicted Mask
          pred_mask = predictions[0].squeeze(0).cpu().numpy()  # (H, W)
          pred_mask = (pred_mask * 255).astype(np.uint8)

          # 保存圖片
          original_img_pil = Image.fromarray(original_image)
          gt_mask_pil = Image.fromarray(gt_mask)
          pred_mask_pil = Image.fromarray(pred_mask)

          original_img_pil.save(f'original_image.png')
          gt_mask_pil.save(f'ground_truth_mask.png')
          pred_mask_pil.save(f'predicted_mask.png')
      ############################################################
      num_correct += (predictions==y).sum()
      num_pixels += BATCH_SIZE*IMG_WIDTH*IMG_HEIGHT
      break
  print(f'Epoch[{epoch+1}] Acc: {num_correct/num_pixels}')



In [None]:
# zipfile example
def zip_dir(path):
  zf = zipfile.ZipFile('{}.zip'.format(path), 'w', zipfile.ZIP_DEFLATED)

  for root, dirs, files in os.walk(path):
    for file_name in files:
      zf.write(os.path.join(root, file_name))

#解壓縮訓練資料
import zipfile
local_zip = './content/Cars.zip'
zip_ref = zipfile.ZipFile(local_zip, 'r')
zip_ref.extractall('image/')
zip_ref.close()

In [34]:
#! setting torch.cuda
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'device: {device}')

#! Create an unet model object
model = UNet(in_channels=3, out_channels=1, features=[64, 128, 256, 512]).to(device)

# hyper params
BATCH_SIZE = 16
NUM_EPOCHS = 3
IMG_WIDTH = 240
IMG_HEIGHT = 160

#! Load data
transform = T.Compose([T.ToTensor(), T.Resize((IMG_HEIGHT, IMG_WIDTH))])  # ToTensor 會將值除255，映射到0~1之間
image_dir = 'image/Cars/small_train'
mask_dir = 'image/Cars/small_train_masks'
all_data = MyDataset(image_dir, mask_dir, transform)

#! Split Data to train_data(70 %) and validate_data(30 %)
train_size = int(0.7 * len(all_data))
val_size = len(all_data) - train_size
train_data, val_data = random_split(all_data, [train_size, val_size])


from torch.utils.data import DataLoader
#! create loader for mini-batch gradient descent
train_loader = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True) # shuffle is required
val_loader = DataLoader(val_data, batch_size=BATCH_SIZE, shuffle=False) # shuffle or not is depends

#! The loss function for bianry classification
loss_function = nn.BCEWithLogitsLoss()

#! Choosing Adam as our optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001)

#! train
train(model, NUM_EPOCHS, train_loader, optimizer)

device: cuda


DEBUG:root:shape of x: torch.Size([16, 3, 160, 240])
DEBUG:root:shape of x: torch.Size([16, 64, 80, 120])
DEBUG:root:shape of x: torch.Size([16, 128, 40, 60])
DEBUG:root:shape of x: torch.Size([16, 256, 20, 30])
DEBUG:root:shape of x: torch.Size([16, 512, 10, 15])
DEBUG:root:shape of x: torch.Size([16, 1024, 10, 15])
DEBUG:root:shape of x: torch.Size([16, 512, 20, 30])
DEBUG:root:shape of x: torch.Size([16, 256, 40, 60])
DEBUG:root:shape of x: torch.Size([16, 128, 80, 120])
DEBUG:root:shape of x: torch.Size([16, 3, 160, 240])
DEBUG:root:shape of x: torch.Size([16, 64, 80, 120])
DEBUG:root:shape of x: torch.Size([16, 128, 40, 60])
DEBUG:root:shape of x: torch.Size([16, 256, 20, 30])
DEBUG:root:shape of x: torch.Size([16, 512, 10, 15])
DEBUG:root:shape of x: torch.Size([16, 1024, 10, 15])
DEBUG:root:shape of x: torch.Size([16, 512, 20, 30])
DEBUG:root:shape of x: torch.Size([16, 256, 40, 60])
DEBUG:root:shape of x: torch.Size([16, 128, 80, 120])
DEBUG:root:shape of x: torch.Size([16, 3, 

Epoch[1] Acc: 0.8748828172683716


DEBUG:root:shape of x: torch.Size([16, 3, 160, 240])
DEBUG:root:shape of x: torch.Size([16, 64, 80, 120])
DEBUG:root:shape of x: torch.Size([16, 128, 40, 60])
DEBUG:root:shape of x: torch.Size([16, 256, 20, 30])
DEBUG:root:shape of x: torch.Size([16, 512, 10, 15])
DEBUG:root:shape of x: torch.Size([16, 1024, 10, 15])
DEBUG:root:shape of x: torch.Size([16, 512, 20, 30])
DEBUG:root:shape of x: torch.Size([16, 256, 40, 60])
DEBUG:root:shape of x: torch.Size([16, 128, 80, 120])
DEBUG:root:shape of x: torch.Size([16, 3, 160, 240])
DEBUG:root:shape of x: torch.Size([16, 64, 80, 120])
DEBUG:root:shape of x: torch.Size([16, 128, 40, 60])
DEBUG:root:shape of x: torch.Size([16, 256, 20, 30])
DEBUG:root:shape of x: torch.Size([16, 512, 10, 15])
DEBUG:root:shape of x: torch.Size([16, 1024, 10, 15])
DEBUG:root:shape of x: torch.Size([16, 512, 20, 30])
DEBUG:root:shape of x: torch.Size([16, 256, 40, 60])
DEBUG:root:shape of x: torch.Size([16, 128, 80, 120])
DEBUG:root:shape of x: torch.Size([16, 3, 

Epoch[2] Acc: 0.8779850602149963


DEBUG:root:shape of x: torch.Size([16, 3, 160, 240])
DEBUG:root:shape of x: torch.Size([16, 64, 80, 120])
DEBUG:root:shape of x: torch.Size([16, 128, 40, 60])
DEBUG:root:shape of x: torch.Size([16, 256, 20, 30])
DEBUG:root:shape of x: torch.Size([16, 512, 10, 15])
DEBUG:root:shape of x: torch.Size([16, 1024, 10, 15])
DEBUG:root:shape of x: torch.Size([16, 512, 20, 30])
DEBUG:root:shape of x: torch.Size([16, 256, 40, 60])
DEBUG:root:shape of x: torch.Size([16, 128, 80, 120])
DEBUG:root:shape of x: torch.Size([16, 3, 160, 240])
DEBUG:root:shape of x: torch.Size([16, 64, 80, 120])
DEBUG:root:shape of x: torch.Size([16, 128, 40, 60])
DEBUG:root:shape of x: torch.Size([16, 256, 20, 30])
DEBUG:root:shape of x: torch.Size([16, 512, 10, 15])
DEBUG:root:shape of x: torch.Size([16, 1024, 10, 15])
DEBUG:root:shape of x: torch.Size([16, 512, 20, 30])
DEBUG:root:shape of x: torch.Size([16, 256, 40, 60])
DEBUG:root:shape of x: torch.Size([16, 128, 80, 120])
DEBUG:root:shape of x: torch.Size([16, 3, 

Epoch[3] Acc: 0.88227379322052
