In [None]:
!pip install qwen-vl-utils
!pip install transformers==4.49
!pip install vllm==0.8.1

Kernel > Restart Kernel

## 1. 모델 로드

In [None]:
from transformers import AutoProcessor
from vllm import LLM, SamplingParams
from qwen_vl_utils import process_vision_info

MODEL_PATH = "iamjoon/Qwen2-VL-7B-Instruct-fashion-product-images-small-checkpoint-68"

llm = LLM(
    model=MODEL_PATH,
    trust_remote_code=True,
    limit_mm_per_prompt={"image": 10, "video": 10},
)

## 2. 테스트 데이터 로드 및 전처리

In [None]:
import io
import json
from PIL import Image
from datasets import load_dataset
from sklearn.model_selection import train_test_split

In [None]:
# 시스템(assistant)에게 주어진 역할
system_message = "당신은 이미지와 제품명(name)으로부터 패션/스타일 정보를 추론하는 분류 모델입니다."

# 실제로 사용자 입력 -> 모델이 답해야 하는 프롬프트
prompt = """입력 정보:
- name: {name}
- image: [image]

위 정보를 바탕으로, 아래 7가지 key에 대한 값을 JSON 형태로 추론해 주세요:
1) gender
2) masterCategory
3) subCategory
4) season
5) usage
6) baseColour
7) articleType

출력 시 **아래 JSON 예시 형태**를 반드시 지키세요:
{{
  "gender": "예시값",
  "masterCategory": "예시값",
  "subCategory": "예시값",
  "season": "예시값",
  "usage": "예시값",
  "baseColour": "예시값",
  "articleType": "예시값"
}}

# 예시
{{
  "gender": "Men",
  "masterCategory": "Accessories",
  "subCategory": "Eyewear",
  "season": "Winter",
  "usage": "Casual",
  "baseColour": "Blue",
  "articleType": "Sunglasses"
}}

# 주의
- 7개 항목 이외의 정보(텍스트, 문장 등)는 절대 포함하지 마세요.
"""

In [None]:
def combine_cols_to_label(example):
    # 실제 컬럼명에 맞게 수정
    label_dict = {
        "gender": example["gender"],
        "masterCategory": example["masterCategory"],
        "subCategory": example["subCategory"],
        "season": example["season"],
        "usage": example["usage"],
        "baseColour": example["baseColour"],
        "articleType": example["articleType"],
    }
    example["label"] = json.dumps(label_dict, ensure_ascii=False)
    return example

def format_data(sample):
   # Image.Image를 PngImageFile로 변환
   buffer = io.BytesIO()
   sample["image"].save(buffer, format='PNG')
   buffer.seek(0)
   image = Image.open(buffer)
   
   return {
       "messages": [
           {
               "role": "system",
               "content": [
                   {
                       "type": "text",
                       "text": system_message
                   }
               ],
           },
           {
               "role": "user",
               "content": [
                   {
                       "type": "text",
                       "text": prompt.format(name=sample["productDisplayName"]),
                   },
                   {
                       "type": "image",
                       "image": image,
                   }
               ],
           },
           {
               "role": "assistant",
               "content": [
                   {
                       "type": "text",
                       "text": sample["label"],
                   }
               ],
           },
       ],
   }

In [None]:
dataset = load_dataset("ashraq/fashion-product-images-small", split="train")
dataset_add_label = dataset.map(combine_cols_to_label)
dataset_add_label = dataset_add_label.shuffle(seed=4242)

In [None]:
formatted_dataset = [format_data(row) for row in dataset_add_label]

In [None]:
# test_size=0.9로 설정하여 전체 데이터의 90%를 테스트 세트로 분리
train_dataset, test_dataset = train_test_split(formatted_dataset,
                                             test_size=0.9,
                                             random_state=42)

## 3. 임의의 샘플 한 개 인퍼런스

In [None]:
from IPython.display import display

In [None]:
model_id = "Qwen/Qwen2-VL-7B-Instruct"
processor = AutoProcessor.from_pretrained(model_id)

In [None]:
sampling_params = SamplingParams(
    temperature=0.1,
    top_p=0.001,
    repetition_penalty=1.05,
    max_tokens=256,
    stop_token_ids=[],
)

테스트 데이터 32번에 대해서 모델의 예측을 얻어봅시다.

In [None]:
test_sample = test_dataset[32]["messages"]

In [None]:
test_sample_prompt = processor.apply_chat_template(
    test_sample,
    tokenize=False,
    add_generation_prompt=False,
)

In [None]:
def split_input_and_label(prompt):
    input = prompt.split('<|im_start|>assistant')[0] + '<|im_start|>assistant'
    label = prompt.split('<|im_start|>assistant')[1]
    return input, label

In [None]:
test_sample_input, test_sample_label = split_input_and_label(test_sample_prompt)

In [None]:
print(test_sample_input)

In [None]:
print(test_sample_label)

In [None]:
# 이미지만 따로 전처리
test_sample_image_inputs, test_sample_video_inputs = process_vision_info(test_sample)

In [None]:
test_sample_image_inputs

In [None]:
display(test_sample_image_inputs[0])

In [None]:
# 아무 값도 없음
video_inputs

In [None]:
mm_data = {}
if image_inputs is not None:
    mm_data["image"] = image_inputs
if video_inputs is not None:
    mm_data["video"] = video_inputs

llm_inputs = {
    "prompt": test_sample_input,
    "multi_modal_data": mm_data,
}

In [None]:
outputs = llm.generate([llm_inputs], sampling_params=sampling_params)
generated_text = outputs[0].outputs[0].text

print('모델의 예측:')
print(generated_text.strip())

print('레이블:')
print(test_sample_label.strip())

## 4. 다수의 데이터 호출

In [None]:
# 상위 50개 테스트 데이터 추출
test_samples = test_dataset[:50]

# 배치 인퍼런스를 위한 입력 데이터 준비
batch_inputs = []
batch_labels = []

for sample in test_samples:
    # 각 샘플의 메시지 처리
    messages = sample["messages"]
    
    # 프롬프트 생성
    prompt = processor.apply_chat_template(
        messages,
        tokenize=False,
        add_generation_prompt=False,
    )
    
    # 입력과 레이블 분리
    input_part = prompt.split('<|im_start|>assistant')[0] + '<|im_start|>assistant'
    label_part = prompt.split('<|im_start|>assistant')[1]
    
    # 이미지/비디오 데이터 처리
    image_inputs, video_inputs = process_vision_info(messages)
    
    # 멀티모달 데이터 구성
    mm_data = {}
    if image_inputs is not None:
        mm_data["image"] = image_inputs
    if video_inputs is not None:
        mm_data["video"] = video_inputs
    
    # LLM 입력 데이터 구성
    llm_input = {
        "prompt": input_part,
        "multi_modal_data": mm_data,
    }
    
    batch_inputs.append(llm_input)
    batch_labels.append(label_part)

# 배치 인퍼런스 실행
print("배치 인퍼런스 시작...")
outputs = llm.generate(batch_inputs, sampling_params=sampling_params)

# 결과 출력
print("\n=== 배치 인퍼런스 결과 ===")
for i, output in enumerate(outputs):
    generated_text = output.outputs[0].text
    print(f"\n--- 샘플 {i+1} ---")
    print(f"모델의 예측: {generated_text.strip()}")
    print(f"레이블: {batch_labels[i].strip()}")
    print("-" * 50)

## 5. 모델 평가 (테스트 데이터 50건)

In [None]:
import json
from sklearn.metrics import f1_score
from collections import defaultdict

def calculate_f1_scores(outputs, batch_labels):
    """각 key별로 F1 score를 계산하는 함수"""
    
    # 각 키별로 예측값과 실제값을 저장할 딕셔너리
    predictions_by_key = defaultdict(list)
    labels_by_key = defaultdict(list)
    
    for i, output in enumerate(outputs):
        # 모델 예측 결과 파싱
        try:
            pred_text = output.outputs[0].text.strip()
            pred_json = json.loads(pred_text)
        except:
            print(f"샘플 {i+1}: 예측 결과 JSON 파싱 실패")
            continue
        
        # 실제 레이블 파싱
        try:
            label_text = batch_labels[i].strip()
            # <|im_end|> 제거
            if label_text.endswith('<|im_end|>'):
                label_text = label_text[:-10]
            label_json = json.loads(label_text)
        except:
            print(f"샘플 {i+1}: 레이블 JSON 파싱 실패")
            continue
        
        # 각 키별로 예측값과 실제값 저장
        for key in label_json.keys():
            if key in pred_json:
                predictions_by_key[key].append(pred_json[key])
                labels_by_key[key].append(label_json[key])
            else:
                print(f"샘플 {i+1}: 키 '{key}' 예측 결과에 없음")
    
    # 각 키별로 F1 score 계산
    f1_scores = {}
    for key in labels_by_key.keys():
        if len(labels_by_key[key]) > 0:
            # 고유한 라벨들 확인
            unique_labels = list(set(labels_by_key[key] + predictions_by_key[key]))
            
            if len(unique_labels) == 1:
                # 모든 값이 동일한 경우
                f1_scores[key] = 1.0
            else:
                # F1 score 계산 (macro average)
                f1_scores[key] = f1_score(
                    labels_by_key[key], 
                    predictions_by_key[key], 
                    labels=unique_labels,
                    average='macro',
                    zero_division=0
                )
    
    return f1_scores, predictions_by_key, labels_by_key

# F1 score 계산 실행
f1_results, preds_by_key, labels_by_key = calculate_f1_scores(outputs, batch_labels)

# 결과 출력
print("\n=== F1 Score 결과 ===")
print(f"{'Key':<15} {'F1 Score':<10} {'샘플 수':<8}")
print("-" * 35)

for key, score in f1_results.items():
    sample_count = len(labels_by_key[key])
    print(f"{key:<15} {score:.4f}     {sample_count}")

# 전체 평균 F1 score
if f1_results:
    avg_f1 = sum(f1_results.values()) / len(f1_results)
    print("-" * 35)
    print(f"{'평균 F1':<15} {avg_f1:.4f}")

# 각 키별 상세 결과 (옵션)
print("\n=== 상세 결과 ===")
for key in f1_results.keys():
    correct = sum(1 for p, l in zip(preds_by_key[key], labels_by_key[key]) if p == l)
    total = len(labels_by_key[key])
    accuracy = correct / total if total > 0 else 0
    print(f"{key}: 정확도 {accuracy:.4f} ({correct}/{total})")