## 매 시도마다 돌릴 것들

In [98]:
from google.colab import drive
drive.mount('/content/drive')
!pip install datasets --quiet

import os
import cv2
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

data_path = '/content/drive/MyDrive/Colab Notebooks/COSE474/MVSA/data/'  # 이미지 및 텍스트 파일 폴더

from transformers import AutoTokenizer, AutoModelForSequenceClassification
import pandas as pd
from collections import Counter
from tqdm import tqdm
from collections import Counter
import torch.nn.functional as F
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
from transformers import TFAutoModelForSequenceClassification
from transformers import AutoConfig
from scipy.special import softmax
import shutil
from PIL import Image
from transformers import BlipProcessor, BlipForConditionalGeneration

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## 전처리

In [31]:
# 텍스트 데이터 로드
texts_df = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/COSE474/MVSA/texts.csv')
label_txt = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/COSE474/MVSA/label.txt', sep='\t', header=0)
label_txt.columns = ['ID', 'Annotator1', 'Annotator2', 'Annotator3']
label_jpg = label_txt.copy()

def pre_text(text):
    new_text = []
    for t in text.split(" "):
        t = '@user' if t.startswith('@') and len(t) > 1 else t
        t = 'http' if t.startswith('http') else t
        new_text.append(t)
    return " ".join(new_text)

texts_df['Text'] = texts_df['Text'].apply(pre_text)

In [32]:
dic = {'positive': 1, 'neutral':0, 'negative': -1}
def pre_labeltext(text):
    new_text = text.split(",")[0]
    return dic[new_text]
label_txt['Annotator1'] = label_txt['Annotator1'].apply(pre_labeltext)
label_txt['Annotator2'] = label_txt['Annotator2'].apply(pre_labeltext)
label_txt['Annotator3'] = label_txt['Annotator3'].apply(pre_labeltext)
label_txt['label'] = label_txt['Annotator1'] + label_txt['Annotator2'] + label_txt['Annotator3']
label_txt['label'] = label_txt['label']/3
label_txt = label_txt.drop(['Annotator1', 'Annotator2', 'Annotator3'], axis=1)

In [33]:
def pre_labeltext(text):
    new_text = text.split(",")[1]
    return dic[new_text]
label_jpg['Annotator1'] = label_jpg['Annotator1'].apply(pre_labeltext)
label_jpg['Annotator2'] = label_jpg['Annotator2'].apply(pre_labeltext)
label_jpg['Annotator3'] = label_jpg['Annotator3'].apply(pre_labeltext)
label_jpg['label'] = label_jpg['Annotator1'] + label_jpg['Annotator2'] + label_jpg['Annotator3']
label_jpg['label'] = label_jpg['label']/3
label_jpg = label_jpg.drop(['Annotator1', 'Annotator2', 'Annotator3'], axis=1)

In [35]:
# 라벨 데이터와 텍스트 데이터 병합
merged_text = pd.merge(label_txt, texts_df, on='ID')

## 데이터 라벨링-모델 일치율 확인(만족스럽다)

In [90]:
def checksentiment(listtexts, listlabels):
    MODEL = f"cardiffnlp/twitter-roberta-base-sentiment-latest"
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = AutoModelForSequenceClassification.from_pretrained(MODEL)
    model.to(device)
    tokenizer = AutoTokenizer.from_pretrained(MODEL)
    config = AutoConfig.from_pretrained(MODEL)
    batch_size = 16
    sentiment_scores = []

    for i in tqdm(range(0, len(listtexts), batch_size)):
        batch_texts = listtexts[i:i+batch_size]
        encoded_input = tokenizer(batch_texts, padding=True, truncation=True, return_tensors='pt')
        encoded_input.to(device)

        with torch.no_grad():
            outputs = model(**encoded_input)

        logits = outputs.logits
        probabilities = torch.nn.functional.softmax(logits, dim=-1)
        scores = probabilities.cpu().numpy()

        pos_probs = scores[:, 2]  # 긍정 확률
        neg_probs = scores[:, 0]  # 부정 확률
        sentiments = pos_probs - neg_probs  # 감성 점수
        sentiment_scores.extend(sentiments)

    listlabels = np.array(listlabels)
    sentiment_scores = np.array(sentiment_scores)

    # L2 Loss
    l2_loss = np.mean((sentiment_scores - listlabels) ** 2)
    print(f"L2 Loss: {l2_loss}")

    return sentiment_scores

In [91]:
checksentiment(merged_text['Text'].tolist(), merged_text['label'].tolist())
count = 0
for i in range (19600):
    if sentiment_scores[i] * merged_text['label'].tolist()[i] < 0:
        count+=1
count

Some weights of the model checkpoint at cardiffnlp/twitter-roberta-base-sentiment-latest were not used when initializing RobertaForSequenceClassification: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
  0%|          | 0/1225 [00:00<?, ?it/s]Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.
100%|██████████| 1225/1225 [00:19<00:00, 61.65it/s]


L2 Loss: 0.22832325209626356


1704

## 이미지 설명-감성 분석 점수 확인(학습 전 저성능 확인)

In [89]:
#자기 전에 켜 둘 것
source_dir = '/content/drive/MyDrive/Colab Notebooks/COSE474/MVSA/data'

target_dir = '/content/data'

# 대상 폴더가 없으면 생성
if not os.path.exists(target_dir):
    os.makedirs(target_dir)

loader = label_txt['ID'].tolist()

for filename in tqdm(loader):
    filename_with_ext = f"{filename}.jpg"
    src_file = os.path.join(source_dir, filename_with_ext)
    dst_file = os.path.join(target_dir, filename_with_ext)

    try:
        shutil.copy2(src_file, dst_file)
    except Exception as e:
        print(f"파일 복사 중 오류 발생: {filename_with_ext}, 오류: {e}")

100%|██████████| 19600/19600 [1:57:19<00:00,  2.78it/s]


In [76]:
scores = np.array(sentiment_scores)
sim = label_jpg['label'] * label_txt['label'] > 0
label_jpg['pred'] = scores
input_df = label_jpg[sim]

In [None]:
image_dir = '/content/data'
def jpg(num):
    return f"{num}.jpg"
image_ids = input_df['ID'].apply(jpg).tolist()
actual_labels = input_df['pred'].tolist()
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-large")
model_blip = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-large")
model_blip.to(device)

In [None]:
captions = []

for image_id in tqdm(image_ids):

    image_path = os.path.join(image_dir, image_id)

    # 이미지 로드 및 전처리
    try:
        raw_image = Image.open(image_path).convert('RGB')
    except Exception as e:
        print(f"이미지 로드 실패: {image_id}, 오류: {e}")
        captions.append("")  # 빈 문자열 추가하여 인덱스 맞추기
        continue

    # 입력 데이터 생성
    inputs = processor(images=raw_image, return_tensors="pt").to(device)

    # 캡션 생성
    with torch.no_grad():
        out = model_blip.generate(**inputs)

    # 캡션 디코딩
    caption = processor.decode(out[0], skip_special_tokens=True)
    captions.append(caption)

  6%|▌         | 707/12850 [03:47<50:37,  4.00it/s]  

이미지 로드 실패: 3910.jpg, 오류: cannot identify image file '/content/data/3910.jpg'


  6%|▌         | 732/12850 [03:55<1:08:51,  2.93it/s]

In [96]:
sentiment_scores = checksentiment(captions, actual_labels)
count = 0
for i in range (19600):
    if sentiment_scores[i] * merged_text['label'].tolist()[i] < 0:
        count+=1
count

Unnamed: 0,ID,label,pred
0,2499,0.666667,0.153497
1,2500,0.666667,0.096899
3,2502,1.000000,0.200500
5,2504,0.333333,0.875041
6,2505,0.333333,0.971453
...,...,...,...
19593,22885,1.000000,0.166340
19594,22886,0.333333,-0.049349
19596,22888,0.333333,0.024390
19597,22889,0.666667,0.069241


In [None]:
captions

## 멀티모달 이미지 감성-설명 학습(이거 로드 안되는 깨진 이미지 3개 있는데 그거 위에서 id 따질거니까 제거해두기)