In [1]:
# 구글 드라이브 마운트
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
# 프로젝트 폴더로 경로 이동
cd ./drive/MyDrive/INISW_2기/KU_proj/

/content/drive/MyDrive/INISW_2기/KU_proj


# 로컬 개발 코드
- 로컬에서 주피터 노트북(Jupyter Notebook), 주피터 랩(JupyterLab) 또는 파이썬(Python)을 이용한다.
- 파이토치(pytorch)를 사용하여 딥러닝 프로그램을 개발한다.
- 파일명: 0_local_image_captioning.ipynb

### 로컬 개발 워크플로우(workflow)  
- 로컬 개발 워크플로우를 다음의 4단계로 분리한다.

1. 데이터셋 준비(Data Setup)
- 로컬 저장소에서 전처리 및 학습에 필요한 학습 데이터셋을 준비한다.

2. 데이터 전처리(Data Preprocessing)
- 데이터셋의 분석 및 정규화(Normalization)등의 전처리를 수행한다.
- 데이터를 모델 학습에 사용할 수 있도록 가공한다.
- 추론과정에서 필요한 경우, 데이터 전처리에 사용된 객체를 meta_data 폴더 아래에 저장한다.

3. 학습 모델 훈련(Train Model)
- 데이터를 훈련에 사용할 수 있도록 가공한 뒤에 학습 모델을 구성한다.
- 학습 모델을 준비된 데이터셋으로 훈련시킨다.
- 정확도(Accuracy)나 손실(Loss)등 학습 모델의 성능을 검증한다.
- 학습 모델의 성능 검증 후, 학습 모델을 배포한다.
- 배포할 학습 모델을 meta_data 폴더 아래에 저장한다.

4. 추론(Inference)
- 저장된 전처리 객체나 학습 모델 객체를 준비한다.
- 추론에 필요한 테스트 데이터셋을 준비한다.
- 배포된 학습 모델을 통해 테스트 데이터에 대한 추론을 진행한다.

# 이미지 캡셔닝 (Image Captioning)
- 지금부터 이미지 데이터를 이용하여 캡셔닝(captioning)을 진행해보고자 한다.

## 사용할 데이터

- AIhub에서 제공하는 Open Dataset인 [유동인구 분석을 위한 cctv 영상 데이터](https://aihub.or.kr/aihubdata/data/view.do?currMenu=115&topMenu=100&aihubDataSe=realm&dataSetSn=489) 이라는 cctv 영상 데이터셋으로, 총 330 시간의 영상이 존재한다. 우리는 이 중에서 영상 속 등장한 사람 객체만을 크롭(cropp)한 이미지를 기반으로  증강(augmented)한 2만 5천 개의 이미지 데이터셋을 사용하고자 한다.

In [None]:
# 설치(코랩 환경)
!pip install transformers==4.27 torch
!pip install pycocoevalcap

In [7]:
# Imports
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torch import nn
import torch
import torch.optim as optim
from torch.optim.lr_scheduler import CosineAnnealingLR

import json
import os
from PIL import Image
import numpy as np
import zipfile

from transformers import BlipProcessor, BlipForConditionalGeneration, Swin2SRImageProcessor, Swin2SRForImageSuperResolution
from pycocotools.coco import COCO
from pycocoevalcap.eval import COCOEvalCap

## **1. 데이터셋 준비(Data Setup)**

In [None]:
# image_augmented.zip 파일 압축 풀기
!unzip -qq './image_augmented.zip' -d './meta_data'


# 정답 캡션 json 파일 불러오기
labelpath="./meta_data/annotaions/shuffled_captions.json"
with open(labelpath, 'r',encoding = 'utf-8' or 'cp949' ) as f: # json 파일 접근
    captions = json.load(f)

## 2. 데이터 전처리 (Data Preprocessing)

### 데이터 준비 (Preparing Data)

앞서 meta_data 폴더에 풀어둔 이미지 데이터와 captions객체에 불러온 정답 캡션을 훈련에 사용할 수 있는 형태로 바꾸고자 한다.

- 이미지 데이터 고해상도화(Super Resolution)
  - 지나치게 적은 픽셀 수(가로 50, 세로 100)의 이미지의 경우, 고해상도화를 통해 보다 명확한 시각 정보를 담을 수 있게끔 해준다.

- 이미지 데이터 정규화 (Normalization)
  - preprocessor를 임포트(import)하여 이미지들을 전체 이미지 데이터셋 RGB 값의 평균, 표준편차값을 통해 0 ~ 1 값으로 정규화를 한다.

- 데이터 합치기 & 레이블 생성
  - 이미지 데이터와 정답 캡션을 매칭하여 최종적으로 모델에 학습할 데이터 집합을 생성한다.

- 훈련 (train) & 평가 (val) 데이터셋 생성
  - 전체 데이터 중 일부는 훈련 (train)에 사용하고, 나머지 일부는 훈련된 모델의 성능을 평가 (val)하기 위해 사용하고자 한다.

##### 이미지 데이터 고해상도화(Super Resolution)

In [None]:
# super resolution을 진행할 모델 불러오기
from transformers import Swin2SRImageProcessor, Swin2SRForImageSuperResolution
pro_sr = Swin2SRImageProcessor.from_pretrained("./meta_data/super_resol/preprocessor")
model_sr = Swin2SRForImageSuperResolution.from_pretrained("./meta_data/super_resol/srwinsr.pt")

In [None]:
# SR 진행된 이미지 리스트 반환 함수
def image_list_with_sr(captions,dir,n,m):
    imagelist=[]
    for i in range(n,n+m):
        path = dir+'/'+captions[i]['image']
        image = Image.open(path)
        image = super_reso(image) if image.size[0]<50 or image.size[1]<100 else image
        imagelist.append(image)
    return imagelist

In [None]:
# 개별 이미지 SR 진행 함수
def super_reso(image):
    inputs = pro_sr(image, return_tensors="pt").to(device)

    # forward pass
    with torch.no_grad():
        outputs = model_sr(**inputs)

    output = outputs.reconstruction.data.squeeze().cpu().float().clamp_(0, 1).numpy()
    output = np.moveaxis(output, source=0, destination=-1)
    output = (output * 255.0).round().astype(np.uint8)
    return Image.fromarray(output)

In [None]:
# 조건에 따른 SR 적용된 이미지 리스트
device = "cuda" if torch.cuda.is_available() else "cpu"
model_sr.to(device)
ImageList = image_list_with_sr(captions,'./meta_data/image_augmented',option['start'],option['num'])

# SR 모델 CPU로 이동
model_sr.to('cpu')

# 25000장에서 약 3.5분 소요

##### 이미지 데이터 정규화

In [None]:
# 정규화 진행하는 preprocessor 폴더 임포트(import) 및 인스턴스화
from transformers import BlipProcessor
processor = BlipProcessor.from_pretrained('.meta_data/preprocessor')

In [None]:
# processor로 정규화된 데이터셋 클래스 작성
class ImageCaptioningDataset(Dataset):
    def __init__(self, dataset, processor):
        self.dataset = dataset
        self.processor = processor

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        item = self.dataset[idx]
        encoding = self.processor(images=item["image"], text=item["text"], padding="max_length", return_tensors="pt")

        encoding = {k:v.squeeze() for k,v in encoding.items()}
        return encoding

##### 데이터 합치기 & 레이블 생성

In [None]:
# 옵션
option = {
    'start': 0,
    'num': 25000,
    'batch_size':16,
}

In [None]:
# 이미지와 캡션 매칭
data = [{'text':captions[i]['label'],'image':ImageList[i]} for i in range(option['start'],option['start']+option['num'])]

##### 훈련 & 평가 데이터셋 생성

전체 데이터셋 중 **8:2**의 비율로 훈련:평가 데이터셋을 생성한다. 이때 배치 사이즈는 option 객체의 정보를 이용한다. (해당 프로젝트에서 진행한 배치 사이즈: 16개)

In [None]:
train_dataset = ImageCaptioningDataset(data[:int(0.8*option['num'])], processor)
val_dataset = ImageCaptioningDataset(data[int(0.8*option['num']):], processor)
train_dataloader = DataLoader(train_dataset,shuffle=False,batch_size = option['batch_size'])
val_dataloader = DataLoader(val_dataset,shuffle=False,batch_size = option['batch_size'])

In [None]:
len(train_dataloader), len(val_dataloader)

## **3. 학습 모델 훈련 (Train Model)**

이미지 캡션을 위해 transformers에서 제공하는 pre-trained된 blip 모델을 불러온다.

In [None]:
#모델 구축
from transformers import BlipForConditionalGeneration
model = BlipForConditionalGeneration.from_pretrained(mode)

### 학습(train) 시 성능지표 확인 위한 코드

In [None]:
# 모델이 생성한 캡션을 json 파일로 저장해주는 함수
def gen_captions(captions,filename):
    gen = []
    for i in range(len(captions)):
        gen.append({'image_id': i+1, 'caption': captions[i]})

    with open(filename,'w') as f:
      json.dump(gen,f)

In [None]:
# 생성된 캡션을 기반으로, ground_truth 파일과 함께 성능지표를 계산해주는 함수
def coco_caption_eval(annotation_file, results_file):

    coco = COCO(annotation_file)
    coco_result = coco.loadRes(results_file)

    coco_eval = COCOEvalCap(coco, coco_result)
    coco_eval.evaluate()

    # print output evaluation scores
    for metric, score in coco_eval.eval.items():
        print(f'{metric}: {score:.3f}')

    return coco_eval

In [None]:
val_cpath = './meta_data/output/val' # validation 위해 모델이 생성한 캡션 파일 저장하는 경로
val_rpath = './meta_data/annotations/gt_captions.json' # 성능지표 계산 시 참조하는 ground-truth 캡션 파일 경로
train_hist=[] # loss 값 확인 위한 리스트
val_hist=[] # loss 값 확인 위한 리스트

### fine-tuning 대상 layer 정하기

- 모델 구조 살펴보기

- 모델 layer 중 동결(frozen)할 layer와 학습가능(trainable)한 layer 구분하기
  - 해당 프로젝트 실험 결과, 전층(all layer)를 작은 학습률(lr: 1e-6)로 fine-tuning한 버전이 가장 높은 성능을 보였음



In [None]:
# 모델 구조 살펴보기
print(model)

In [None]:
# 필요에 따라, 동결할 레이어 동결시키기
'''n = 0
for l in model.text_decoder.bert.encoder.layer:
  n += 1
  if n >= 12:
    l.crossattention.requires_grad_()
  else:
    continue'''

In [None]:
# 동결, 학습가능 layer 확인
for name, param in model.named_parameters():
    if param.requires_grad:
        print(name, "is trainable")
    else:
        print(name, "is frozen")

### 모델 학습
- 전이학습을 통한 fine-tuning
  - 앞서 준비된 사전학습 모델을 준비해준 입력 데이터와 레이블 데이터로 학습시킨다.

In [None]:
lr = 1e-6
E = 30

optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=0.05)
scheduler = CosineAnnealingLR(optimizer, T_max=E)

device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)

for epoch in range(E):
    model.train()
    Loss = 0

    #train_caption = [] -> train 시 성능지표를 보기 위한 리스트 (선택사항)
    for idx, batch in enumerate(train_dataloader):
        model.train()
        input_ids = batch.pop("input_ids").to(device)
        pixel_values = batch.pop("pixel_values").to(device)
        outputs = model(input_ids=input_ids,pixel_values=pixel_values, labels=input_ids)
        loss = outputs.loss
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # train 시 성능지표를 보기 위한 작업 (선택사항)
        #with torch.no_grad():
        #  model.eval()
        #  train_caption+=processor.batch_decode(model.generate(pixel_values=pixel_values,max_length = 100),skip_special_tokens=True)
        #train_hist2.append(loss.tolist())

        Loss+=loss.tolist()

    train_hist.append(Loss/len(train_dataloader)) # train loss 확인 위한 코드


    # 에폭마다 validation 진행
    val = 0
    val_caption =[]
    with torch.no_grad():
        model.eval()
        for idx, batch in enumerate(val_dataloader):
            input_ids = batch.pop("input_ids").to(device)
            pixel_values = batch.pop("pixel_values").to(device)
            outputs = model(input_ids=input_ids,pixel_values=pixel_values, labels=input_ids)

            # 성능을 보기위한 작업: 1. 캡션 생성해서 리스트에 담아두기 2. Val loss 확인하기
            val_caption+=processor.batch_decode(model.generate(pixel_values=pixel_values,max_length = 300),skip_special_tokens=True)
            val+=outputs.loss.tolist()

    val_hist.append(val/len(val_dataloader))

    # checkpoint model 저장
    if val_hist[-1]==min(val_hist):
        torch.save(model,'./meta_data/checkpoint_best.pt')

    #Epoch 출력
    print("Epoch {}회차 - val_Loss:{}, ".format(epoch+1,val/313))

    # epoch의 caption들 저장 및 성능 출력을 위한 코드
    # meata_data 하위 폴더에 에폭별 Val 데이터셋에 대한 생성 캡션을 json 파일로 저장하기 -> 이후 성능지표 계산을 위해
    gen_captions(val_caption,val_cpath+'/'+str(epoch+1)+'.json')

    # train 시 성능지표 확인 위한 코드 (선택사항)
    #gen_captions(train_caption,train_cpath+'/'+str(epoch+1)+'.json')

### 모델 평가 (Evaluate Model)

- 기계번역, 캡션 task 관련 성능지표 계산
- BLEU, METEOR, ROUGE-L, CIDEr, SPICE

In [None]:
val = []
for i in range(30):
    val.append(coco_caption_eval("./meta_data/annotaions/gt_captions.json",f'.meta_data/output/val/{i+1}.json').eval.items())

In [None]:
# 결과 json 파일로 저장
with open('./meta_data/ouput/score.json','w') as f:
  json.dump(val,f)

## **4. 추론 (Inference)**

훈련시킨 모델을 직접 사용해보고자 한다. 잘 훈련된 모델이라면 처음 보는 cctv 영상 속 사람 객체 이미지를 인풋으로 줬을 때, 외양을 묘사하는 정확한 캡션을 생성할 것이다.

- 이미지 불러오기
- 조건에 따른 SR 적용
- 데이터 변환 및 결과 확인

##### 이미지 불러오기
미리 준비해둔 test 이미지 파일을 받아 추론을 진행해보고자 한다.

In [None]:
# dataset.zip 파일을 dataset 폴더에 압축을 풀어준다.
zip_source_path = './inference_image_dataset.zip'
zip_target_path = './meta_data'

extract_zip_file = zipfile.ZipFile(zip_source_path)
extract_zip_file.extractall(zip_target_path)

extract_zip_file.close()

##### 조건에 따른 SR 적용
- 이미지 데이터를 읽어온다.
- SR를 실시한다.

In [None]:
# 이미지 데이터 SR한 리스트 반환 함수
def image_list_with_sr_inf(dir,n,m):
    imagelist=[]
    for i in range(n,n+m):
        path = dir+'/'+os.listdir(dir)[i]
        image = Image.open(path)
        image = super_reso(image) if image.size[0]<50 or image.size[1]<100 else image
        imagelist.append(image)
    return imagelist

In [None]:
model.to('cpu')

model_sr.to(device)
ImageList = image_list_with_sr_inf('.meta_data/inference_image_dataset',0,10)

model_sr.to('cpu') # 5000장에서 약 2분 소요

##### 데이터 변환 및 결과 확인
- 데이터 변환
- 각 사진마다 생성된 caption 확인

In [None]:
pixel_values =  processor.image_processor(images=ImageList, return_tensors="pt").pixel_values # 데이터 변환
model.to(device)
outputs=processor.batch_decode(model.generate(pixel_values=pixel_values.to(device),max_length = 70),skip_special_tokens=True)

In [None]:
for i in range(10):
    print(outputs[i])
    ImageList[i].show()
    print()