# 이미지 다중 속성 분류 - Pytorch Ver


> 운송수단 이미지 분류 - 타입, 색상의 큰 카테고리로 나눌 수 있음 
>> 타입의 세부 속성(5): bike, car, helicopter, ship, truck 

>> 색상의 세부 속성(4): black, red, blue, green 


In [None]:
!pip install torch

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import os 
import cv2
import matplotlib.pyplot as plt
import pandas as pd 
import tensorflow as tf 
import torch 

In [None]:
from torch import nn 
import torch.nn.functional as F
import torch.optim as optim 
import numpy as np
from tqdm import tqdm, tqdm_notebook

In [None]:
#드라이브-코랩 연동 
from google.colab import drive 
drive.mount('/content/gdrive/')

Mounted at /content/gdrive/


In [None]:
!pip install einops
from coca_pytorch import *

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting einops
  Downloading einops-0.4.1-py3-none-any.whl (28 kB)
Installing collected packages: einops
Successfully installed einops-0.4.1


In [None]:
cd /content/gdrive/MyDrive/dataset/automobile_img

/content/gdrive/MyDrive/dataset/automobile_img


In [None]:
data_dir = '/content/gdrive/MyDrive/dataset/automobile_img'
list_of_data = []
img_size = 128
for fol in os.listdir(data_dir):
    for file in os.listdir(os.path.join(data_dir, fol)):
        json_dict = {}
        img_arr = cv2.imread(os.path.join(os.path.join(data_dir, fol), file))[...,::-1] #convert BGR to RGB format
        resized_arr = cv2.resize(img_arr, (img_size, img_size)) # Reshaping images to preferred size
        json_dict['image'] = resized_arr
        json_dict['class_1'] = fol.split('_')[0]
        json_dict['class_2'] = fol.split('_')[1]
        list_of_data.append(json_dict)


In [None]:
from posixpath import lexists
x_train, y_train = [], []
NUM_IMG = len(list_of_data)

for i in range(NUM_IMG):
  x_train.append(list_of_data[i]['image'])
  y_train.append([list_of_data[i]['class_1'], list_of_data[i]['class_2']])

x_train = np.array(x_train)/255
print(x_train.shape) # (948, 128, 128, 3) == (128, 128, 3)이미지 * 948개 

(948, 128, 128, 3)


In [None]:
print(y_train.shape)

In [None]:
#train, validation dataset 나누기 
train_len = int(len(x_train)*0.75)
vali_len = int(len(x_train)*0.25)

x_val = x_train[train_len:]
y_val = y_train[train_len:]

x_train = x_train[:train_len]

y_train = y_train[:train_len]

In [None]:
print("train:", train_len, "validation:", vali_len)

In [None]:
import torchvision.transformers as transformers

from torch.utils.data import DataLoader # 배치사이즈 단위로 데이터 로딩 
from torch.utils.data import dataset

class CustomDataset(Dataset):
  def __init__(self, img_list, label_list, train_mode = True, transforms = None):
    self.transforms = transforms
    self.train_mode = train_mode
    self.img_list = img_list
    self.label_list = label_list

  def __getitem__(self, index): #index번째 data를 return
    # Get image data & transform
    image = img_list[index]
    if self.transforms is not None:
        image = self.transforms(image)

    if self.train_mode:
        label = self.label_list[index]
        return image, label
    else:
        return image

  def __len__(self): #길이 return
    return len(self.img_list)

In [None]:
train_transform = transforms.Compose([
    transforms.ToPILImage(), # numpy -> PIL
    transforms.Resize([128, 128]), #이미지 사이즈 변형
    transforms.ToTensor(), #이미지 데이터를 tensor
    transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)) #이미지 정규화

])

test_transform = transforms.Compose([
                    transforms.ToPILImage(),
                    transforms.Resize([128, 128]),
                    transforms.ToTensor(),
                    transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
                    ])

In [None]:
# train dataset 최종 custom 
x_train = CustomDataset(x_train, y_train, train_mode = True, transforms=train_transform)
# dataloader를 이용해 batch 단위의 데이터 만들어주기 
train_loader = DataLoader(x_train, batch_size = 100, shuffle=True, num_workers=0)

x_val = CustomDataset(x_val, y_val, train_mode=True, transforms=test_transform)
vali_loader = DataLoader(x_val, batch_size = 100, shuffle = True, shuffle=False, num_workers=0)

In [None]:
print('total train imgs :',len(x_train),'/ total train batches :', len(train_loader))
print('total valid imgs :',len(x_val), '/ total valid batches :', len(vali_loader))

In [None]:
y_train = np.array(y_train)
class_1 = y_train[:, 0]
class_2 = y_train[:, 1]
NUM_C1 = len(set(class_1))
NUM_C2 = len(set(class_2))

map_1 = {}
for i, j in enumerate(list(set(class_1))):
  map_1[j] = i
map_2 = {}
for i, j in enumerate(list(set(class_2))):
  map_2[j] = i

print(map_1)
print(map_2)

{'black': 0, 'white': 1, 'green': 2, 'red': 3}
{'bike': 0, 'ship': 1, 'truck': 2, 'helicopter': 3, 'car': 4}


In [None]:
class_arr1 = []
for x in class_1:
  class_arr1.append(map_1[x])
class_arr1 = np.array(class_arr1)

class_arr2 = []
for x in class_2:
  class_arr2.append(map_2[x])
class_arr2 = np.array(class_arr2)

In [None]:
## modeling ##
!pip install coca-pytorch

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting coca-pytorch
  Downloading CoCa_pytorch-0.0.6-py3-none-any.whl (6.3 kB)
Installing collected packages: coca-pytorch
Successfully installed coca-pytorch-0.0.6


In [None]:
!pip install vit-pytorch>=0.35.8

In [None]:
from coca_pytorch.coca_pytorch import CoCa
import torch.optim as optim 
coca = CoCa(
    dim = 512,                     # model dimension
    img_encoder = vit,             # vision transformer - image encoder, returning image embeddings as (batch, seq, dim)
    image_dim = 1024,              # image embedding dimension, if not the same as model dimensions
    num_tokens = 20000,            # number of text tokens
    unimodal_depth = 6,            # depth of the unimodal transformer
    multimodal_depth = 6,          # depth of the multimodal transformer
    dim_head = 64,                 # dimension per attention head
    heads = 8,                     # number of attention heads
    caption_loss_weight = 1.,      # weight on the autoregressive caption loss
    contrastive_loss_weight = 1.,  # weight on the contrastive loss between image and text CLS embeddings
).cuda()

In [None]:
import torch.nn as nn 
import torch.nn.functional as F
class Model(nn.Module):
  def __init__(self):
    super(Model, self).__init__()

    self.coca = CoCa(
    dim = 512,                     # model dimension
    img_encoder = vit,             # vision transformer - image encoder, returning image embeddings as (batch, seq, dim)
    image_dim = 1024,              # image embedding dimension, if not the same as model dimensions
    num_tokens = 20000,            # number of text tokens
    unimodal_depth = 6,            # depth of the unimodal transformer
    multimodal_depth = 6,          # depth of the multimodal transformer
    dim_head = 64,                 # dimension per attention head
    heads = 8,                     # number of attention heads
    caption_loss_weight = 1.,      # weight on the autoregressive caption loss
    contrastive_loss_weight = 1.,  # weight on the contrastive loss between image and text CLS embeddings
    ).cuda()
    
    self.fc = nn.Linear( , 1024)
    self.fc1 = nn.Linear( , NUM_C!)
    self.fc2 = nn.Linear( , NUM_C2)

    def forward(self, x):
      x = self.coca(x)
      x = F.relu(x)

      x = self.fc(x)
      x = F.relu(x)

      branch1 = self.fc1(x)
      branch1 = F.log_softmax(branch1, dim = 1)

      branch2 = self.fc2(x)
      branch2 = F.log_softmax(branch2, dim = 1)

      output = [branch1, branch2]
      return output


In [None]:
model = Model()

In [None]:
criterion = nn.CrossEntropyLoss() # classification
optimizer = optim.Adam(coca.parameters(), lr=0.001, weight_decay=0.0001)
scheduler = None

In [None]:
#train
def train(model, optimizer, train_loader, scheduler, device):
  model.to(device)
  n = len(train_loader)

  criterion = nn.CrossEntropyLoss().to(device)
  best_acc = 0

  for epoch in range(1, 100):
    ##train##
    model.train()
    running_loss = 0.0

    for img, label in tqdm(iter(train_loader)): # batch 단위로 이미지 데이터 iter 반복 
      img, label = img.to(device), label.to(device)
      optimizer.zero_grad() # batch마다 optimizer 초기화 

      logit = model(img) #예측값 산출
      loss = criterion(logit, label) # 손실함수 계산 
      
      #backward propagation
      loss.backward() # 손실함수 기준 역전파 
      optimizer.step() # 가중치 최적화
      running_loss += loss.item()

    print("[%d] train loss: % 10f"%(epoch, running_loss/len(train_loader)))

    if scheduler is not None:
      scheduler.step()
    
    ##evaluate: validation 평가##
    model.eval()
    val_loss = 0.0
    correct = 0 
    with torch.no_grad(): #파라미터 업데이트 안함 
      for img, label in tqdm(iter(val_loder)): #vali_loader : validation data set의 배치 단위 불러오기 
        img, label = img.to(device), label.to(device)

        logit = model(img)
        val_loss += criterion(logit, label)
        pred = logit.argmax(dim = 1, leepdim = True)

        correct += pred.eq(label.view_as(pred)).sum().item() # 예측값과 label이 같으면 1, 1인 것들의 합 , view_as() : 인자로 들어가는 텐서의 모양으로 재정렬 
    
    val_acc = 100 * correct/len(val_loader.dataset) # 정답인 갯수/dataloader의 전체 데이터 셋의 갯수
    print("val set: loss {:4f}, accuracy: {}/{} ({:0f}%) \n".format(val_loss / len(val_loader), correct, len(val_loader.dataset), 100 * correct / len(val_loader.dataset)))

    #베스트 모델 저장 
    if best_acc < val_acc:
      best_acc = val_acc 
      torch.save(model,state_dict(), '/saved/best_model.pth') # 현재 디렉토리에 best_model.pth 저장 
      
    
train(model, optimizer, train_loader, scheduler, device)


In [None]:
# predict
def predict(model, test_loader, device):
  model.eval()
  model_pred = []
  with torch.no_grad():
    for img in tqdm(iter(test_loader)):
      img = img.to(device)

      pred_logit = model(img)
      pred_logit = pred_logit.argmax(dim = 1, leepdim = True).squeeze(1)

      model_pred.extend(pred_logit.tolist())
  return model_pred