# 1)**import module**

In [None]:
import numpy as np
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# **2) Dataset build**

In [None]:
project_folder = '/content/drive/MyDrive/Project3'

import os

image = []
label = []

for subdir, _, files in os.walk(project_folder):
    for file in files:
        if file.endswith(('png', 'jpg', 'jpeg')):
            image_path = os.path.join(subdir, file)
            image.append(image_path)

            label_name = os.path.basename(subdir)
            label.append(label_name)

In [None]:
BATCH_SIZE = 1 #변경 금지

TRAINING_image = []
TRAINING_label = []
TEST_image = []
TEST_label = []

for i in range(0,80):
  for j in range(0,20):
    for k in range(0,2):
      TRAINING_image.append(image[200*j+i+k])
      TRAINING_label.append(label[200*j+i+k])

for i in range(80, 100):
  for j in range(0,20):
    for k in range(0,2):
      TEST_image.append(image[200*j+i+k])
      TEST_label.append(label[200*j+i+k])


import random

def shuffle_images_labels(image_list, label_list):
    combined_list = list(zip(image_list, label_list))  # 이미지와 라벨을 쌍으로 묶음
    for i in range(1, len(combined_list)-1, 2):
        if random.random() < 0.3:  # 50% 확률로 스왑 수행
            combined_list[i], combined_list[i+1] = combined_list[i+1], combined_list[1]
    return zip(*combined_list)


TRAINING_image, TRAINING_label = shuffle_images_labels(TRAINING_image, TRAINING_label)

In [None]:
class CustomDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        label = self.labels[idx]
        image = Image.open(image_path).convert('RGB')
        image = transforms.Resize((224, 224))(image)
        image = transforms.ToTensor()(image)

        return image, label

    #Renoir 1.0에서는 Randomcrop((224,224))을 사용하였으나,
    #이미지가 모델에 입력되기도 전에 정보 손실이 생기지 않게 하기 위해
    #Renoir 2.0에서는 Resize((224,224)) 사용

train_dataset = CustomDataset(TRAINING_image, TRAINING_label)
train_loader = DataLoader(train_dataset, batch_size = BATCH_SIZE)
test_dataset = CustomDataset(TEST_image, TEST_label)
test_loader = DataLoader(test_dataset, batch_size = BATCH_SIZE)

데이터를 분할한다.

학습 데이터 이미지와 라벨을 랜덤하게 섞어준다.

이미지 경로와 라벨 리스트를 받아 데이터를 읽고 224X224 크기로 크기를 맞춰주고 텐서로 변환한다.

# **3) Model Define**

In [None]:
import torch
import torch.nn as nn
from torch import Tensor

def conv_1(in_dim, out_dim):
  model = nn.Sequential(
      nn.Conv2d(in_dim, out_dim, 1, 1),
      nn.ReLU(),
  )
  return model

def conv_1_3(in_dim,mid_dim,out_dim):
    model = nn.Sequential(
        nn.Conv2d(in_dim,mid_dim,1,1),
        nn.ReLU(),
        nn.Conv2d(mid_dim,out_dim,3,1,1),
        nn.ReLU()
    )
    return model

def conv_1_5(in_dim,mid_dim,out_dim):
    model = nn.Sequential(
        nn.Conv2d(in_dim,mid_dim,1,1),
        nn.ReLU(),
        nn.Conv2d(mid_dim,out_dim,5,1,2),
        nn.ReLU()
    )
    return model

def max_3_1(in_dim,out_dim):
    model = nn.Sequential(
        nn.MaxPool2d(kernel_size=3,stride=1,padding=1),
        nn.Conv2d(in_dim,out_dim,1,1),
        nn.ReLU(),
    )
    return model

class inception_module(nn.Module):
    def __init__(self,in_dim,out_dim_1,mid_dim_3,out_dim_3,mid_dim_5,out_dim_5,pool_dim):
        super(inception_module,self).__init__()

        self.conv_1 = conv_1(in_dim,out_dim_1)
        self.conv_1_3 = conv_1_3(in_dim,mid_dim_3,out_dim_3)
        self.conv_1_5 = conv_1_5(in_dim,mid_dim_5,out_dim_5)
        self.max_3_1 = max_3_1(in_dim,pool_dim)

    def forward(self,x):
        out_1 = self.conv_1(x)
        out_2 = self.conv_1_3(x)
        out_3 = self.conv_1_5(x)
        out_4 = self.max_3_1(x)

        output = torch.cat([out_1,out_2,out_3,out_4],1)
        return output


In [None]:
#nn.Conv2d(input_chennel, output_channel, kernel_size, stride, padding)
#nn.MaxPool2d(kernel_size, stride, padding)

class GoogLeNet(nn.Module):
    def __init__(self, base_dim):
        super(GoogLeNet, self).__init__()
        self.layer_1 = nn.Sequential(
            nn.Conv2d(3,base_dim,7,2,3),
            nn.MaxPool2d(3,2,1),
            nn.Conv2d(base_dim,base_dim*3,3,1,1),
            nn.MaxPool2d(3,2,1),
        )
        self.layer_2 = nn.Sequential(
            inception_module(base_dim*3,64,96,128,16,32,32),
            inception_module(base_dim*4,128,128,192,32,96,64),
            nn.MaxPool2d(3,2,1),
        )
        self.layer_3 = nn.Sequential(
            inception_module(480,192,96,208,16,48,64),
            inception_module(512,160,112,224,24,64,64),
            inception_module(512,128,128,256,24,64,64),
            inception_module(512,112,144,288,32,64,64),
            inception_module(528,256,160,320,32,128,128),
            nn.MaxPool2d(3,2,1),
        )
        self.layer_4 = nn.Sequential(
            inception_module(832,256,160,320,32,128,128),
            inception_module(832,384,192,384,48,128,128),
            nn.AvgPool2d(7,1), #nn.AvgPool2d(7,1) output size : 1x1x1024
        )

    def forward(self, x):
        out = self.layer_1(x)
        out = self.layer_2(out)
        out = self.layer_3(out)
        out = self.layer_4(out)
        out = out.view(out.size(0), -1)
        return out  #self.layer_4를 통과한 tensor return

컨볼루션과 풀링 연산을 수행한다.

- 컨볼루션: 이미지의 특징을 추출하는 역할

- 풀링: 풀링은 데이터의 세로 및 가로 방향의 공간을 줄이는 연산이다. 최대풀링과 평균 풀링이 있다.

인셉션 모듈을 정의한다. 서로 다른 커널 크기를 사용해 입력에서 다양한 스케일의 특징을 추출한다.

GoogLeNet 전체 네트워크 구조를 정의한다. 여러 계층으로 나뉘어 있다.

forward함수는 입력 이미지가 구글넷 각 계층을 통과하는 과정을 정의한다.

- 포워드 함수: 입력 텐서로부터 출력 텐서를 계산한다

# **4)Model initialization**

In [None]:
import torch.nn.init as init
import torch.nn as nn

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = GoogLeNet(base_dim=64)
model = model.to(DEVICE)

loss = nn.BCELoss()
optimizer =torch.optim.SGD(model.parameters(), lr = 0.005,momentum = 0.9, weight_decay = 0.0005)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=8, gamma=0.04)

# **5) TEST function define**

In [None]:
def test(test_loader, DEVICE):
  correct_pred = 0

  for i, (_image1, _label1) in enumerate(test_loader):
    image1 = _image1.to(DEVICE)
    label1 = _label1[0]
    vector1_tensor = model(image1)

    if (i == 0):  # Exception Case
      label2 = label1
      vector2_tensor = vector1_tensor

    target_vector = [label1 == label2]
    target_tensor = torch.tensor(target_vector).float()
    target_tensor = target_tensor.to(DEVICE)

    similarity =  F.cosine_similarity(vector1_tensor, vector2_tensor, dim= -1)
    scaled_similarity = torch.sigmoid(similarity)

    if label1 == label2 and scaled_similarity.item() > 0.8:
        correct_pred += 1
    elif label1 != label2 and scaled_similarity.item() < 0.8:
        correct_pred += 1

    #연산량 감소를 위한 텐서 재활용
    label2 = label1
    vector2_tensor = vector1_tensor.detach()

  return correct_pred

# **6) Training**

In [None]:
import torch.nn.functional as F
import time

EPOCH = 50

start_time = time.time()
train_acc_lst, test_acc_lst = [],[]

for epoch in range(EPOCH):
  model.train()
  correct_pred, num_examples, test_acc = 0, len(train_loader), 0

  for i, (_image1, _label1) in enumerate(train_loader):
    optimizer.zero_grad()

    image1 = _image1.to(DEVICE)
    label1 = _label1[0]
    vector1_tensor = model(image1)

    if (i == 0):  # Exception Case
      label2 = label1
      vector2_tensor = vector1_tensor

    target_vector = [label1 == label2]
    target_tensor = torch.tensor(target_vector).float()
    target_tensor = target_tensor.to(DEVICE)

    similarity =  F.cosine_similarity(vector1_tensor, vector2_tensor, dim= -1)
    scaled_similarity = torch.sigmoid(similarity)

    if label1 == label2 and scaled_similarity.item() > 0.8:
        correct_pred += 1
    elif label1 != label2 and scaled_similarity.item() < 0.8:
        correct_pred += 1

    cost= loss(scaled_similarity, target_tensor)
    cost.backward()
    optimizer.step()

    #연산량 감소를 위한 텐서 재활용
    label2 = label1
    vector2_tensor = vector1_tensor.detach()

    if not i % 40:
      print (f'Epoch: {epoch+65:03d} | '
            f'Batch {i:03d}/{len(train_loader):03d} |'
             f' Cost: {cost:.4f}')

  test_acc = test(test_loader, DEVICE)

  PATH = f"/content/drive/MyDrive/model_weights_epoch_{epoch}.pth"  # 에포크별로 파일 이름 지정
  torch.save(model.state_dict(), PATH)

  print('training accuracy : ', (correct_pred/len(train_loader))*100,'% | test accuracy : ', (test_acc/len(test_loader))*100, '%')

  elapsed = (time.time() - start_time)/60
  print(f'Time elapsed: {elapsed:.2f} min')

elapsed = (time.time() - start_time)/60
print(f'Total Training Time: {elapsed:.2f} min')

50에포크를 진행한다.

에포크마다 모델 학습하고 성능을 평가한다. 모델의 가중치를 저장한다.

- 유사도 계산<br>
두 레이블이 같고 유사도가 0.8 이상이거나 레이블이 다르고 유사도가 0.8 미만이면 맞춘 것이다.

계산된 유사도와 목표값으로 손실을 계산한다.

