In [48]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
import cv2
from PIL import Image
import os
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, Dataset, random_split
import torchvision.transforms as transforms
from torchinfo import summary
import torchvision.models as models

In [88]:
file = pd.read_csv('encoded_file.csv')
file.tail()

Unnamed: 0.1,Unnamed: 0,image_name,comment_num,comment,0,1,2,3,4,5,...,68,69,70,71,72,73,74,75,76,77
158910,158910,998845445.jpg,0,A man in shorts and a Hawaiian shirt leans ov...,0,21,0,715,0,0,...,1,1,1,1,1,1,1,1,1,1
158911,158911,998845445.jpg,1,A young man hanging over the side of a boat ...,0,3,21,9,0,0,...,1,1,1,1,1,1,1,1,1,1
158912,158912,998845445.jpg,2,A man is leaning off of the side of a blue an...,0,21,0,68,0,0,...,1,1,1,1,1,1,1,1,1,1
158913,158913,998845445.jpg,3,A man riding a small boat in a harbor with f...,0,21,477,0,254,1101,...,1,1,1,1,1,1,1,1,1,1
158914,158914,998845445.jpg,4,A man on a moored blue and white boat with hi...,0,21,0,0,4433,22,...,1,1,1,1,1,1,1,1,1,1


In [50]:
data = []
for k in range(file.shape[0]):
    sample = []
    img_dir = './data/flickr30k_images/'
    img_file = img_dir + file.iloc[k][1] 
    sample.append(img_file)

    capt = []
    for idx in range(78):
        capt.append(file.iloc[k][idx+4])
        # print(file.iloc[k][idx+4])

    sample.append(capt)
    data.append(sample)

In [75]:
data[0]

['./data/flickr30k_images/1000092795.jpg',
 [2,
  3,
  4,
  0,
  5,
  6,
  7,
  0,
  0,
  8,
  0,
  9,
  0,
  0,
  0,
  10,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1,
  1]]

In [76]:
# 사용자 정의 데이터셋 정의
class CustomDataset(Dataset):
    def __init__(self, data, transform):
        self.data = data
        self.transform = transform
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        # 이미지와 캡션을 로드하고 전처리하여 반환
        image_path, caption = self.data[idx]
        image = Image.open(image_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, caption

In [77]:
# 데이터셋 인스턴스 생성
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
])
dataset = CustomDataset(data, transform=transform)

In [78]:
# dataset에서 train, valid, test를 나누기 
seed_gen = torch.Generator().manual_seed(42)
tr, val, ts = 0.7,0.1,0.2
trainDS, validDS, testDS = random_split(dataset, [tr, val, ts], generator=seed_gen)
print(len(trainDS), len(validDS), len(testDS))

111241 15891 31783


In [79]:
# dataloader 생성
batch_size = 256
train_dl = DataLoader(trainDS, batch_size=batch_size, shuffle=True, drop_last = True)
valid_dl = DataLoader(validDS, batch_size=batch_size, shuffle=True, drop_last = True)
test_dl = DataLoader(testDS, batch_size = batch_size, shuffle=True, drop_last = True)
print(len(train_dl), len(valid_dl), len(test_dl))

370 52 105


In [86]:
for img, capt in train_dl:
    print(capt)
    print()
    break

[tensor([    0,     0,     0,     0,   156,     0,     0,    30,     0,     0,
            0,     0,     2,     0,     2,     0,     0,     0,    88,     0,
            0,     0,     0,     0,     0,     0,    47,    15,     0,     0,
            0,     0,     0,  1615,     0,    30,     2,     0,     0,     0,
         4148,    47,    47,     2,   156,     0,     0,     0, 17730,     0,
            2,   854,     0,     0,     0,     2,     2,     0,     0,     0,
           47,     0,   133,    21,     0,     0,     0,     0,     0,    47,
           59,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     2,     0,     0,     0,     0,     0,
            0,     0,   228,     2,     0,     0,     0,     0,     0,     0,
            0,    47,     0,     0,     0,    47,     0,     0,    88,   337,
         9691,     0,     0,     0,    88,     0,     0,   234,     0,     0,
            0,     0,     0,     0,     0,     0,     0,  1372,

In [81]:
# 1개의 배치 안에 있는 이미지 확인
from torchvision.utils import make_grid

def show_batch(dl):
    """Plot images grid of single batch"""
    for (images, labels) in dl:
        fig,ax = plt.subplots(figsize = (16,12))
        ax.set_xticks([])
        ax.set_yticks([])
        ax.imshow(make_grid(images,nrow=16).permute(1,2,0))
        break
        
# show_batch(train_dl)

### RNN을 위한 인코딩된 파일 불러오기

In [82]:
file = pd.read_csv('encoded_file.csv')
file.head(2)

Unnamed: 0.1,Unnamed: 0,image_name,comment_num,comment,0,1,2,3,4,5,...,68,69,70,71,72,73,74,75,76,77
0,0,1000092795.jpg,0,Two young guys with shaggy hair look at their...,2,3,4,0,5,6,...,1,1,1,1,1,1,1,1,1,1
1,1,1000092795.jpg,1,Two young White males are outside near many ...,2,3,11,12,0,13,...,1,1,1,1,1,1,1,1,1,1


### 클래스 생성

### Image Captioning with Pytorch
- 필요한 모델 : CNN & RNN 
- 인코딩용 : CNN => Resnet
- 디코딩용 : RNN => LSTM
- CNN에서 나온 결과물을 LSTM에 연결 

In [83]:
# 인코딩용 CNN 모델 생성 : RESNET18 (가중치O)

resnet = models.resnet18()
# 전결합층 변경
resnet.fc = nn.Linear(in_features = 512, out_features = 1)

# 모델의 합성곱층 가중치 고정 (완전 연결층은 학습시켜야함)
for name, param in resnet.named_parameters():
    param.requires_grad = False
for name, param in resnet.fc.named_parameters():
    param.requires_grad = True 

In [84]:
# 디코딩용 RNN 모델 생성 : LSTM

from torch.nn.utils.rnn import pack_padded_sequence
# 패딩된 시퀀스를 실제 데이터 길이에 맞게 패킹하여 효율적인 연산을 수행할 수 있게 해줌

class decoder(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers = 1):
        super(decoder, self).__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, vocab_size)

    def forward(self, features, captions, lengths):
        embedding = self.embed(captions)
        embedding = torch.cat((features.unsqueeze(1), embedding),1)
        packed = pack_padded_sequence(embedding, lengths, batch_first = True)
        hiddens, _ = self.lstm(packed)
        outputs = self.linear(hiddens[0])
        return outputs

- class로 만든거

In [62]:
# 인코딩용 CNN 모델 생성 : RESNET18 (가중치O)

# res_model = models.resnet18(weights = ( "ResNet18_Weights.DEFAULT"))
# 전결합층 변경
# res_model.fc = nn.Linear(in_features = 512, out_features = 1)


class encoderCNN(nn.Module):
    def __init__(self, embed_size):
        super(encoderCNN, self).__init_()
        self.resnet = models.resnet18(weights = ( "ResNet18_Weights.DEFAULT"))
        # 전이학습 모델의 전결합층 변경
        self.resent.fc = nn.Linear(self.resenet.fc.in_features, embed_size)

        self.dropout = nn.Dropout(0.5)  # 흠,, 필요할까?
        self.relu = nn.Relu()
    
    def forward(self, x):
        features = self.resnet(x)
        
        # 모델의 합성곱층 가중치 고정 (완전 연결층은 학습시켜야함)
        for name, param in self.resnet.named_parameters():
            param.requires_grad = False
        for name, param in self.resnet.fc.named_parameters():
            param.requires_grad = True 

        result = self.relu(features)

        return result

In [63]:
# 디코딩용 RNN 모델 생성 : LSTM

class decoderRNN(nn.Module):
    def __init__ (self, embed_size, vocab_size, hidden_size, num_layers):
        super(decoderRNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers)
        self.linear = nn.Linear(hidden_size, vocab_size)
        self.dropout = nn.Dropout(0.5) # 흠 필요함?
    
    def forward(self, features, caption):
        # embeddings = self.dropout(self.embedding(caption))   # 왜 dropout?
        embeddings = self.embedding(caption)
        embeddings = torch.cat((features.unsqueeze(0), embeddings), dim = 0)
        hiddens, _ = self.lstm(embeddings)
        outputs = self.linear(hiddens)
        return outputs

In [64]:
# CNN 과 RNN을 연결시키자

class CNN2RNN(nn.Module):
    def __init__(self, embed_size, vocab_size, hidden_size, num_layers):
        super(CNN2RNN, self).__init__()
        self.encoderCNN = encoderCNN(embed_size)
        self.decoderRNN = decoderRNN(embed_size, vocab_size, hidden_size, num_layers)
    
    def captionImage(self, image, vocabulary, maxlength = 50):
        result_caption = []

        with torch.no_grad():
            x = self.encoderCNN(image).unsqueeze(0)
            states = None

            for _ in range(maxlength) :
                hiddens, states = self.decoderRNN.lstm(x, states)
                output = self.decoderRNN.linear(hiddens.squeeze(0))
                predicted = output.argmax(1)
                print(predicted.shape)

                result_caption.append(predicted.item())
                x = self.decoderRNN.embedding(output).unsqueeze(0)

                if vocabulary.itos[predicted.item()] == "<EOS>" :
                    break
        
        return [vocabulary.itos[i] for i in result_caption]


### 학습을 위한 하이퍼파라미터 정의

In [65]:
embed_size = 256
hidden_size = 512
vocab_size = 19855  # 단어사전 크기
num_layers = 1
model = decoder(embed_size, hidden_size, vocab_size, num_layers)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

### 학습, 검증, 테스트 함수 정의

In [74]:
import torchmetrics.functional as metrics

def training(dataloader):
    model.train()
    loss_list = []
    acc_list = []
    precision_list = []
    recall_list = []
    f1score_list = []

    for images, captions in dataloader:
        images = torch.FloatTensor(images)
        
        print(captions)


    #     captions = torch.FloatTensor(captions)
    #     print(captions)
    #     lengths = 300     #근데 이게 배치사이즈가 맞나??
        
        
    #     targets = pack_padded_sequence(captions, lengths, batch_first = True)[0]

    #     # 이미지 특성 추출
    #     features = resnet(images)
    #     features = features.view(features.size(0), -1)

    #     # 예측
    #     outputs = model(features, captions, lengths)

    #     # 역전파
    #     loss = criterion(outputs, targets)
    #     optimizer.zero_grad()
    #     loss.backward()
    #     optimizer.step()

    #     # 정확도
    #     acc = metrics.accuracy(outputs, label, task = 'multiclass') 
    #     precision = metrics.precision(outputs, label, task = 'multiclass')
    #     recall = metrics.recall(outputs, label, task = 'multiclass')
    #     f1score = metrics.f1_score(outputs, label, task = 'multiclass')
        
    #     loss_list.append(loss)
    #     acc_list.append(acc)
    #     precision_list.append(precision)
    #     recall_list.append(recall)
    #     f1score_list.append(f1score)
        
    # total_loss = sum(loss_list) / len(loss_list)
    # total_acc = sum(acc_list) / len(acc_list)
    # total_precision = sum(precision_list) / len(precision_list)
    # total_recall = sum(recall_list)/len(recall_list)
    # total_f1score = sum(f1score_list) / len(f1score_list)
    # # print(f"[TOTAL Train Loss] ==> {total_loss}")
    # # print(f"ACC : {total_acc}  Precision : {total_precision}  Recall : {total_recall} F1score : {total_f1score}")
    # return total_loss, total_acc, total_precision, total_recall, total_f1score

training(train_dl)

tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,

KeyboardInterrupt: 

In [None]:
def valid_testing(dataloader):
    res_model.eval()
    loss_list = []
    acc_list = []
    precision_list = []
    recall_list = []
    f1score_list = []
    for image, label in dataloader:
        # 학습
        pre_label = res_model(image)
        pre_label = F.sigmoid(pre_label)

        pre_label = pre_label.squeeze()
        # print(label.shape, pre_label.shape)
        # print(label, pre_label, sep = '\n\n')
        
        # 손실계산
        valid_loss = cost(pre_label, label.float())
        # train_loss = cost(pre_label, label)
        
        # 정확도
        acc = metrics.accuracy(pre_label, label, task = 'binary') 
        precision = metrics.precision(pre_label, label, task = 'binary')
        recall = metrics.recall(pre_label, label, task = 'binary')
        f1score = metrics.f1_score(pre_label, label, task = 'binary')
        loss_list.append(valid_loss)
        acc_list.append(acc)
        precision_list.append(precision)
        recall_list.append(recall)
        f1score_list.append(f1score)
    total_loss = sum(loss_list) / len(loss_list)
    total_acc = sum(acc_list) / len(acc_list)
    total_precision = sum(precision_list) / len(precision_list)
    total_recall = sum(recall_list)/len(recall_list)
    total_f1score = sum(f1score_list) / len(f1score_list)
    # print(f"[TOTAL Train Loss] ==> {total_loss}")
    # print(f"ACC : {total_acc}  Precision : {total_precision}  Recall : {total_recall} F1score : {total_f1score}")
    return total_loss, total_acc, total_precision, total_recall, total_f1score

# valid_testing(valid_dl)

### 학습 진행

In [None]:
## 학습 중 모델 저장 관련 변수
dir = './model/'
filename = dir + "best_model.pth"

import os
if not os.path.exists(dir) :
    os.mkdir(dir)       # 하위 폴더만 생성 증, data 폴더는 이미 존재해야 함
    # os.makedirs(dir)    # 존재하지 않는 상위 폴더부터 생성

In [None]:
EPOCH = 5
training_list = [[], [], [], [], []]
validing_list = [[], [], [], [], []]
# loss, acc, prec, rec, f1_score

scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones = [0, EPOCH], gamma = 0.5)
# milestones => 어떤 에포크 구간에서 학습률을 조정할지 나타내는거

# 모델 저장 관련 변수
save_score_point = 0

In [None]:
for epo in range(EPOCH):
    # 학습
    loss, acc, prec, rec, f1 = training(train_dl)
    # print(loss, acc, prec, rec, f1)
    training_list[0].append(loss.item())
    training_list[1].append(acc.item())
    training_list[2].append(prec.item())
    training_list[3].append(rec.item())
    training_list[4].append(f1.item())
    print(f"epo => {epo}  학습중", end = ' ')
    # 검증
    loss, acc, prec, rec, f1 = valid_testing(valid_dl)
    validing_list[0].append(loss.item())
    validing_list[1].append(acc.item())
    validing_list[2].append(prec.item())
    validing_list[3].append(rec.item())
    validing_list[4].append(f1.item())
    print("검증중")
    
    # 검증 데이터 기준 학습된 모델 저장
    if loss < save_score_point:
        torch.save(res_model, filename)
        print('  모 델 저 장 완 료\n')
    
    # 스케줄러
    scheduler.step()

In [None]:
epo_list = list(range(0, EPOCH))
# print(epo_list)
title_list = ['LOSS', 'ACC', 'PRECISION', 'RECALL', 'F1-SCORE']

In [None]:
for k in range(5):
    print(sum(training_list[k]) / len(training_list[k]))

In [None]:
for k in range(5):
    print(sum(validing_list[k]) / len(validing_list[k]))

In [None]:
from torchviz import make_dot

x=torch.randn(4,3,150,150)
make_dot(res_model(x),params=dict(res_model.named_parameters()), show_attrs = True, show_saved = True)

In [None]:
plt.figure(figsize = (10,5))
k=0
plt.title(title_list[k])
plt.plot(epo_list, training_list[k], label = 'train')
plt.plot(epo_list, validing_list[k], label = 'valid')
plt.legend()
plt.tight_layout()
plt.show()

In [None]:
plt.figure(figsize = (15,7))
for k in range(1,5):
    plt.subplot(2,2,k)
    plt.title(title_list[k])
    plt.plot(epo_list, training_list[k], label = 'train', color = 'royalblue', alpha = 0.7)
    plt.plot(epo_list, validing_list[k], label = 'valid', color = 'tomato')
    plt.legend()
plt.tight_layout()
plt.show()

### 예측

In [None]:
def predicting(dataloader):
    res_model.eval()
    loss_list = []
    acc_list = []
    precision_list = []
    recall_list = []
    f1score_list = []
    for image, label in dataloader:
        # 학습
        pre_label = res_model(image)
        pre_label = F.sigmoid(pre_label)
        pre_label = pre_label.squeeze()
        
        # 손실계산
        pred_loss = cost(pre_label, label.float())
        
        # 정확도
        acc = metrics.accuracy(pre_label, label, task = 'binary') 
        precision = metrics.precision(pre_label, label, task = 'binary')
        recall = metrics.recall(pre_label, label, task = 'binary')
        f1score = metrics.f1_score(pre_label, label, task = 'binary')
        
        
        loss_list.append(pred_loss)
        acc_list.append(acc)
        precision_list.append(precision)
        recall_list.append(recall)
        f1score_list.append(f1score)
    total_loss = sum(loss_list) / len(loss_list)
    total_acc = sum(acc_list) / len(acc_list)
    total_precision = sum(precision_list) / len(precision_list)
    total_recall = sum(recall_list)/len(recall_list)
    total_f1score = sum(f1score_list) / len(f1score_list)
    print(f"[TOTAL Train Loss] ==> {total_loss}")
    print(f"ACC : {total_acc}  Precision : {total_precision}  Recall : {total_recall} F1score : {total_f1score}")
    return total_loss, total_acc, total_precision, total_recall, total_f1score

predicting(test_dl)