In [60]:
# !pip install thop
# !pip install torchsummary
# !pip install einops
# !pip install -q kaggle
# !pip install torch
# !pip install numpy
# !pip install opencv-python
# !pip install matplotlib
# !pip install natsort
# !pip install torchvision

In [61]:
import numpy as np
import os
import sys
import json
import cv2
import math
import time
from tqdm import tqdm
import matplotlib
import matplotlib.pyplot as plt
from collections import OrderedDict
from glob import glob
from natsort import natsorted

import torch
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
import torch.optim as optim
import torch.nn as nn
from torchvision import transforms
import torch.nn.functional as F

from thop import profile
from thop import clever_format
from torchsummary import summary

from einops import rearrange, repeat
from einops.layers.torch import Rearrange

%matplotlib inline
matplotlib.rcParams['figure.facecolor'] = '#ffffff'


In [62]:
result = torch.cuda.get_device_name(torch.cuda.current_device()) if torch.cuda.is_available() else "cpu"
print("Current device: " + result)
torch.cuda.empty_cache()


Current device: NVIDIA GeForce MX250


# Download Data

In [63]:
!unzip Set5.zip
# !rm ./adarlab-ai-training.zip

'unzip' ���O�����Υ~���R�O�B�i���檺�{���Χ妸�ɡC


# Checking Download

In [64]:
!pwd
img_data_dir = './Set5/Set5/LRbicx3'
images = natsorted(glob(os.path.join(img_data_dir, '*.png')))
print(len(images))

ans_data_dir = './Set5/Set5/original'
ans = natsorted(glob(os.path.join(ans_data_dir, '*.png')))
print(len(ans))

5
5


'pwd' ���O�����Υ~���R�O�B�i���檺�{���Χ妸�ɡC


# Dataloader

In [72]:
config = {
  "data_dir": './Set5',
  "data_num": 5,
  "device": torch.device("cuda" if torch.cuda.is_available() else "cpu"),
  # you can set your own training configurations
  "batch_size": 1,
  "learning_rate": 0.0005,
  "n_epochs": 3,
  "rot_num": 1,
}

In [73]:
transform = None
# transform = transforms.Compose{
    
# }

In [79]:
class TrainImg(Dataset):
  def __init__(self, config, set_type="train", transform=None):
    self.device = config["device"]
    self.transform = transform
    ###########################your code here#############################
    # get the image path list -> self.image_names
    self.image_names = images
    self.ans_names = ans

    ########################################################################
    if set_type == "train":
        n_start = 0
        n_end = 4
    elif set_type == "val":
        n_start = 4
        n_end = config['data_num']

    self.image_names = self.image_names[n_start:n_end]
    self.ans_names = self.ans_names[n_start:n_end]
  
    ########################################################################

  def __len__(self):
    return len(self.image_names)

  def __getitem__(self, idx):
    ########################################################################
    # get the input image step by step
    # 1. read the image using cv2
    # 2. transpose the dimension from [h, w, 3] to [3, h, w]
    # 3. from numpy array to tensor
    # 4. normalize the value from [0, 255] to [0, 1]
    image_name = self.image_names[idx]
    image = cv2.imread(image_name, cv2.IMREAD_COLOR)
    image = np.transpose(image, (2, 0, 1))  # transpose the dimension from [h, w, 3] to [3, h, w]
    image = torch.from_numpy(image).float() # from numpy array to tensor
    # image = image / 255 # normalize the value from [0, 255] to [0, 1]
    
    image_rot_arr = []
    image_rot_arr.append(image)
    length = config["rot_num"] - 1
    if self.transform:
        for i in range(length):
            image_rot = self.transform((image))
            image_rot_arr.append(image_rot)
    else:
        for i in range(length):
            image_rot_arr.append(image)
    
    image = image / 255 # normalize the value from [0, 255] to [0, 1]
    
    for i in range(length + 1):
        image_rot_arr[i] = image_rot_arr[i] / 255
        
    ans_name = self.ans__names[idx]
    ans = cv2.imread(ans_name, cv2.IMREAD_COLOR)
    ans = np.transpose(ans, (2, 0, 1))  # transpose the dimension from [h, w, 3] to [3, h, w]
    ans = torch.from_numpy(ans).float() # from numpy array to tensor
    # image = image / 255 # normalize the value from [0, 255] to [0, 1]
    
    ans_rot_arr = []
    ans_rot_arr.append(ans)
    length = config["rot_num"] - 1
    if self.transform:
        for i in range(length):
            ans_rot = self.transform((ans))
            ans_rot_arr.append(ans_rot)
    else:
        for i in range(length):
            ans_rot_arr.append(ans)
    
    ans = ans / 255 # normalize the value from [0, 255] to [0, 1]
    
    for i in range(length + 1):
        ans_rot_arr[i] = ans_rot_arr[i] / 255
    
    return {
        'image_name': image_name,
        'image': image_rot_arr,
        'ans': ans_rot_arr,
    }

In [80]:
train_ds = TrainImg(config, set_type='train', transform=transform)
val_ds = TrainImg(config, set_type='val')

train_dl = DataLoader(train_ds, config["batch_size"], shuffle=True, drop_last=True, num_workers=1)
val_dl = DataLoader(val_ds, config["batch_size"], shuffle=True, drop_last=True, num_workers=1)

print("Total dataset length: ", config['data_num'])
print("Train dataset length: ", len(train_ds))
print("Validation dataset length: ", len(val_ds))

Total dataset length:  5
Train dataset length:  4
Validation dataset length:  1


# Show images

In [81]:
def draw_img(image, vis=None, color_fixed=None, linewidth=1, img_order='rgb', draw_kp=True, kp_style=None):
  """ Inpaints a hand stick figure into a matplotlib figure.
  image:    original image input
  coords_hw:  predicted keypoint (non normalized) -> [0, 224)
  """
  if kp_style is None:
    # kp_style[0] for circle radius, kp_style[1] for circle point thickness
    kp_style = (1, 2)

  # if image have four dimension like [1. 224. 224. 3] then squeeze to [3. 224. 3]
  image = np.squeeze(image)

  if len(image.shape) == 2:
    image = np.expand_dims(image, 2)
  s = image.shape
  assert len(s) == 3, "This only works for single images."

  convert_to_uint8 = False

  if s[2] == 1:
    # grayscale case
    image = (image - np.min(image)) / (np.max(image) - np.min(image) + 1e-4)
    image = np.tile(image, [1, 1, 3])
    pass

  elif s[2] == 3:
    # RGB case
    if image.dtype == np.uint8:
        convert_to_uint8 = True
        image = image.astype('float64') / 255.0
    elif image.dtype == np.float32:
        # convert to gray image
        image = np.mean(image, axis=2)
        image = (image - np.min(image)) / (np.max(image) - np.min(image) + 1e-4)
        image = np.expand_dims(image, 2)
        image = np.tile(image, [1, 1, 3])
  else:
    assert 0, "Unknown image dimensions."

  colors = np.array(
    [[0.4, 0.4, 0.4],
    [0.4, 0.0, 0.0],
    [0.6, 0.0, 0.0],
    [0.8, 0.0, 0.0],
    [1.0, 0.0, 0.0],
    [0.4, 0.4, 0.0],
    [0.6, 0.6, 0.0],
    [0.8, 0.8, 0.0],
    [1.0, 1.0, 0.0],
    [0.0, 0.4, 0.2],
    [0.0, 0.6, 0.3],
    [0.0, 0.8, 0.4],
    [0.0, 1.0, 0.5],
    [0.0, 0.2, 0.4],
    [0.0, 0.3, 0.6],
    [0.0, 0.4, 0.8],
    [0.0, 0.5, 1.0],
    [0.4, 0.0, 0.4],
    [0.6, 0.0, 0.6],
    [0.7, 0.0, 0.8],
    [1.0, 0.0, 1.0]]
  )

  if img_order == 'rgb':
    # cv2 operation under BGR
    colors = colors[:, ::-1]

  color_map = {
    'k': np.array([0.0, 0.0, 0.0]),
    'w': np.array([1.0, 1.0, 1.0]),
    'b': np.array([0.0, 0.0, 1.0]),
    'g': np.array([0.0, 1.0, 0.0]),
    'r': np.array([1.0, 0.0, 0.0]),
    'm': np.array([1.0, 1.0, 0.0]),
    'c': np.array([0.0, 1.0, 1.0])
  }

  if convert_to_uint8:
    image = (image * 255).astype('uint8')

  return image

In [82]:
batch_iter = iter(train_dl)
batch = next(batch_iter)

imgs = batch['image'] #return[img1, img2, img3]
# print(batch['image'])
pic_num = 4
fig, axes = plt.subplots(len(imgs), pic_num, figsize=(15, 10))
for i in range(len(imgs)):
    for j in range(pic_num):
        # 將圖像和關鍵點轉換為 NumPy 格式
        img_np = imgs[i][j].permute(1, 2, 0).numpy()

        # 使用 draw_hand 函數繪製關鍵點
        trainimg = draw_img
        
        # 顯示結果
        axes[i, j].imshow(trainimg)
        # axes[i, j].axis('off')

plt.show()


RuntimeError: DataLoader worker (pid(s) 5108) exited unexpectedly

# Model

+ Model Specifications:
  + Input: **`[B, 3, 224, 224]`**
  + Output: **`[B, 21, 2]`** --> 21 for the num of the landmarks, 2 for the coordinates in (x, y) format
  + Layer: You can build up your own model architecture with no limitations.
  + Cost: The computational cost (FLOPs) may not over **`20 GFLOPs`**

In [83]:
from model import SuperResolution

Net = SuperResolution()

# Testing Model Computational Cost

In [90]:
# # pseudo image
# image = torch.rand(1, 3, 224, 224).cuda()

# # define your model
model = Net.to(config["device"])

# out = model(image)

# # torchsummary report
# summary(model, input_size=(3, 224, 224))
# print(f'From input shape: {image.shape} to output shape: {out.shape}')

# # thop report
# macs, parm = profile(model, inputs=(image, ))
# print(f'FLOPS: {macs * 2 / 1e9} G, Params: {parm / 1e6} M.')

# Criterion

In [91]:
# class CMSELoss(nn.Module):
#   """
#   Coordinate MSE loss
#   input :
#     y_pred = b, 21, 2   (coordinate of 21 keypoints)
#     y_true = b, 21, 2   (keypoints, (y, x))
#   """
#   def __init__(self):
#     super().__init__()
#     self.loss = nn.MSELoss()

#   def forward(self, y_pred, y_true):
#     y_true = torch.flip(y_true, [2]) # flip (y, x) to (x, y)
#     return self.loss(y_pred, y_true)

In [92]:
# criterion = CMSELoss()

# Optimizer and Scheduler (optional)

In [93]:
optimizer = torch.optim.Adam(model.parameters(), lr=config['learning_rate'])
# optimizer = torch.optim.Adadelta(model.parameters(), lr=1)

In [94]:
# scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.002, steps_per_epoch=len(train_dl), epochs=config["n_epochs"])
# scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.001, steps_per_epoch=len(train_dl), epochs=config["n_epochs"], div_factor=2, final_div_factor=5, pct_start=0.09)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max = config["rot_num"] * (len(train_dl.dataset) / config["batch_size"]) * config["n_epochs"], eta_min = 0.00005)
print((len(train_dl.dataset) / config["batch_size"]) * config["n_epochs"])

12.0


# Training
  + Record the **`loss / epoch`** learning curve
  + If using learning rate scheduler, record the **`lr / epoch`** curve

In [95]:
# weight_path = '/home/va8800/ken_ai/model_weights.pth'
# weight_path = '/home/STuser19/MID/model_weights.pth'

# checkpoint = torch.load(weight_path)
# model.load_state_dict(checkpoint, strict=True)

In [96]:
# initialize tracker for minimum validation loss
# valid_loss_min = np.Inf # set initial "min" to infinity
valid_loss_min = np.inf # set initial "min" to infinity

# initialize history for recording what we want to know
history = []

for epoch in range(config["n_epochs"]):
    # monitor training loss, validation loss and learning rate
    train_loss = 0.0
    valid_loss = 0.0
    lrs    = []
    result = {'train_loss': [], 'val_loss': [], 'lrs': []}

    # prepare model for training
    model.train()

    #######################
    # train the model #
    #######################
    for batch in tqdm(train_dl):
        for i in range(len(batch['image'])):
            # clear the gradients of all optimized variables
            optimizer.zero_grad()
            # forward pass: compute predicted outputs by passing inputs to the model
            output = model(batch['image'][i].to(config['device']))
            # calculate the loss
            # loss = criterion(output, batch[''][i].to(config['device']))
            loss = F.cross_entropy(output, batch['image'][i].to(config['device'])).item()
            # backward pass: compute gradient of the loss with respect to model parameters
            loss.backward()
            # perform a single optimization step (parameter update)
            optimizer.step()
            scheduler.step()

            # record learning rate
            lrs.append(optimizer.param_groups[0]['lr'])

            # update running training loss
            train_loss += loss.item()*batch['image'][i].size(0)

    ######################
    # validate the model #
    ######################
    model.eval()
    for batch in val_dl:
        # compute predicted outputs by passing inputs to the model
        output = model(batch['image'][0].to(config['device']))
        # calculate the loss
        loss = criterion(output, batch[''][0].to(config['device']))
        # update running validation loss
        valid_loss += loss.item()*batch['image'][0].size(0)

    # print training/validation statistics
    # calculate average loss over an epoch
    train_loss = train_loss/(len(train_dl.dataset)*config["rot_num"])
    result['train_loss'] = train_loss
    valid_loss = valid_loss/len(val_dl.dataset)
    result['val_loss'] = valid_loss
    leaning_rate = lrs
    result['lrs'] = leaning_rate
    history.append(result)

    print('Epoch {:2d}: Learning Rate: {:.6f} Training Loss: {:.6f} Validation Loss:{:.6f}'.format(
        epoch+1,
        leaning_rate[-1],
        train_loss,
        valid_loss
        ))

    # save model if validation loss has decreased
    if valid_loss <= valid_loss_min:
        print("Validation loss decreased({:.6f}-->{:.6f}). Saving model ..".format(
        valid_loss_min,
        valid_loss
        ))
        torch.save(model.state_dict(),"model.pt")
        valid_loss_min = valid_loss

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/4 [00:15<?, ?it/s]


RuntimeError: DataLoader worker (pid(s) 1876) exited unexpectedly

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

# torch.save(model.state_dict(), '/home/va8800/ken_ai/v3_2/model_weights.pth')
torch.save(model.state_dict(), '/home/STuser19/MID/v3/model_weights.pth')
print("Save model weight successfully.")

In [None]:
def plot_losses(history):
  train_losses = [x.get('train_loss') for x in history]
  val_losses = [x['val_loss'] for x in history]
  plt.plot(train_losses, '-bx')
  plt.plot(val_losses, '-rx')
  plt.xlabel('epoch')
  plt.ylabel('loss')
  plt.legend(['Training', 'Validation'])
  plt.title('Loss vs. No. of epochs');

plot_losses(history)

In [None]:
def plot_lrs(history):
  lrs = np.concatenate([x.get('lrs', []) for x in history])
  plt.plot(lrs)
  plt.xlabel('Batch no.')
  plt.ylabel('Learning rate')
  plt.title('Learning Rate vs. Batch no.');

plot_lrs(history)

# Load your model

In [None]:
# weight_path = '/home/va8800/ken_ai/v3_2/model_weights.pth'
weight_path = '/home/STuser19/MID/v3/model_weights.pth'

checkpoint = torch.load(weight_path)
model.eval()
model.load_state_dict(checkpoint, strict=True)

In [None]:
print(model)

# Testing part (**do not modify!!**)

In [None]:
class Testing_dataset(Dataset):
  def __init__(self, config):
    self.image_names = natsorted(glob(os.path.join(config['data_dir'], 'testing/rgb', '*.jpg')))

  def __len__(self):
    return len(self.image_names)

  def __getitem__(self, idx):
    # input
    image_name = self.image_names[idx]
    image = cv2.imread(image_name)
    image = torch.from_numpy(image.transpose(2, 0 ,1))
    image = image / 255

    return {
      'image_name': image_name,
      'image': image,
    }

In [None]:
test_ds = Testing_dataset(config)
test_dl = DataLoader(test_ds, batch_size=1, shuffle=False, drop_last=False, num_workers=0)

In [None]:
with open('./FreiHAND_pub_v2/golden_out.json', 'r') as f:
  golden_out = json.load(f)

In [None]:
def evaluate(dataloader, model, golden_out):
  print(f'Total {len(dataloader)} iterations to be tested.')

  with torch.no_grad():
    result = {}
    loss = []
    pixeldiff = []
    for i, data in tqdm(enumerate(dataloader)):
      image_name = data['image_name'][0][-22:]
      # get golden ans and inputs
      golden_ans = torch.from_numpy(np.array(golden_out[image_name])).float().cuda()
      inputs = data['image'].cuda()

      # get predicted output
      outputs = model(inputs)

      # get loss
      l = criterion(outputs, golden_ans).cpu().numpy()
      loss.append(l)
      pixeldiff.append(np.sqrt(l) * 223)

      pred = outputs.cpu().detach().numpy().tolist()
      result.update({image_name: pred})

  print('\nTesting successfully!')
  return result, loss, pixeldiff

In [None]:
result, loss, pixeldiff = evaluate(test_dl, model, golden_out)

with open("result.json", "w") as outfile:
  json.dump(result, outfile)

In [None]:
with open('ID_result.csv', 'w') as f:
  f.write('Id,Loss\n')
  for i, data in enumerate(pixeldiff):
    f.write(str(i)+','+str(data)+'\n')

In [None]:
import shutil
shutil.move('ID_result.csv', '/home/STuser19/MID/v3/ID_result.csv')
# shutil.move('ID_result.csv', '/home/va8800/ken_ai/v3_2/ID_result.csv')

In [None]:
print(f'FLOPS: {macs * 2 / 1e9} G, Params: {parm / 1e6} M., avg pixeldiff: {np.mean(pixeldiff)}, avg loss: {np.mean(loss)}')
score = (macs * 2 / 1e9) * (parm / 1e6) * math.exp(np.mean(pixeldiff))
print("ranking score: ", "{:,}".format(score))