# CLIP 기반 객체 분석 파이프라인

In [1]:
!pip -q install matplotlib seaborn scikit-learn kaggle tqdm open_clip_torch >/dev/null 2>&1 || true

In [26]:
import ast
import os
import random
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import open_clip
import pandas as pd
import seaborn as sns
import torch
from IPython.display import display
from PIL import Image
from sklearn.decomposition import PCA
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm

torch.manual_seed(42)
random.seed(42)
np.random.seed(42)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cpu')

In [28]:
import shutil
src = Path('api/kaggle.json')
target = Path('~/.kaggle/kaggle.json').expanduser()
target.parent.mkdir(parents=True, exist_ok=True)
if src.exists():
    shutil.copyfile(src, target)
    os.chmod(target, 0o600)
else:
    try:
        from google.colab import files
        uploaded = files.upload()
        if 'kaggle.json' not in uploaded:
            raise RuntimeError('kaggle.json missing')
        with open(target, 'wb') as f:
            f.write(uploaded['kaggle.json'])
        os.chmod(target, 0o600)
    except Exception:
        print('skip kaggle token upload (not on Colab)')

In [29]:
USE_DRIVE = False
DRIVE_DIR = '/content/drive/MyDrive/school/딥러닝실습/1117/data'
if USE_DRIVE:
    from google.colab import drive
    drive.mount('/content/drive')
    os.makedirs(DRIVE_DIR, exist_ok=True)

In [30]:
DATA_DIR = Path(DRIVE_DIR) if USE_DRIVE else Path('data').resolve()
IMAGES_DIR = DATA_DIR / 'images'
ANNO_CSV = DATA_DIR / 'annotations.csv'
os.makedirs(IMAGES_DIR, exist_ok=True)
print(DATA_DIR)
print(IMAGES_DIR)
print(ANNO_CSV)

/Users/gabriel/Documents/school/딥러닝실습/1117/data
/Users/gabriel/Documents/school/딥러닝실습/1117/data/images
/Users/gabriel/Documents/school/딥러닝실습/1117/data/annotations.csv


## 데이터 로드 및 전처리

In [31]:
df = pd.read_csv(ANNO_CSV, comment='#')
aliases = {'image_id': 'filename', 'image_path': 'filename', 'file': 'filename', 'filepath': 'filename', 'class': 'label', 'category': 'label'}
for old, new in aliases.items():
    if old in df.columns and new not in df.columns:
        df[new] = df[old]
required = {'filename', 'geometry', 'label'}
if not required.issubset(df.columns):
    raise ValueError('annotations.csv must include filename/image_id, geometry, class/label columns')

df['filename'] = df['filename'].astype(str)
df['label'] = df['label'].astype(str)

def resolve_path(value):
    p = Path(value)
    if p.is_absolute():
        return p
    if p.parts and p.parts[0] == 'images':
        return DATA_DIR / p
    return IMAGES_DIR / p

df['abs_path'] = df['filename'].apply(resolve_path)
exists_mask = df['abs_path'].map(Path.exists)
if not exists_mask.all():
    missing = df.loc[~exists_mask, 'filename'].unique().tolist()[:5]
    print('dropping entries with missing files:', missing)
    df = df[exists_mask]

def to_box(value):
    pts = ast.literal_eval(value)
    xs = [float(pt[0]) for pt in pts]
    ys = [float(pt[1]) for pt in pts]
    return [min(xs), min(ys), max(xs), max(ys)]

df['bbox'] = df['geometry'].apply(to_box)

if 'split' not in df.columns:
    files = df['filename'].unique()
    train_ids, tmp_ids = train_test_split(files, test_size=0.3, random_state=42)
    val_ids, test_ids = train_test_split(tmp_ids, test_size=0.5, random_state=42)
    split_map = {fid: 'train' for fid in train_ids}
    split_map.update({fid: 'val' for fid in val_ids})
    split_map.update({fid: 'test' for fid in test_ids})
    df['split'] = df['filename'].map(split_map)

df['split'] = df['split'].str.lower()
class_names = sorted(df['label'].unique())
label_to_idx = {name: idx for idx, name in enumerate(class_names)}
df['label_id'] = df['label'].map(label_to_idx)
print('총 샘플 수:', len(df))
print('클래스:', class_names)
display(df[['filename', 'label', 'bbox', 'split']].head())

총 샘플 수: 3425
클래스: ['Airplane', 'Truncated_airplane']


Unnamed: 0,filename,label,bbox,split
0,4f833867-273e-4d73-8bc3-cb2d9ceb54ef.jpg,Airplane,"[135.0, 522.0, 245.0, 600.0]",test
1,4f833867-273e-4d73-8bc3-cb2d9ceb54ef.jpg,Airplane,"[1025.0, 284.0, 1125.0, 384.0]",test
2,4f833867-273e-4d73-8bc3-cb2d9ceb54ef.jpg,Airplane,"[1058.0, 1503.0, 1130.0, 1568.0]",test
3,4f833867-273e-4d73-8bc3-cb2d9ceb54ef.jpg,Airplane,"[813.0, 1518.0, 885.0, 1604.0]",test
4,4f833867-273e-4d73-8bc3-cb2d9ceb54ef.jpg,Airplane,"[594.0, 938.0, 657.0, 1012.0]",test


## Dataset / DataLoader 구성

In [32]:
SPLIT_TO_USE = 'val'  # 'train' / 'val' / 'test' 중 선택
MAX_SAMPLES_PER_CLASS = None  # 정수로 설정하면 클래스별 최대 샘플 수 제한
subset = df[df['split'] == SPLIT_TO_USE].copy()
if MAX_SAMPLES_PER_CLASS:
    subset = subset.groupby('label').head(MAX_SAMPLES_PER_CLASS).reset_index(drop=True)
subset = subset.reset_index(drop=True)
print('사용할 split:', SPLIT_TO_USE, '샘플 수:', len(subset))

class ClipRegionDataset(Dataset):
    def __init__(self, frame, preprocess):
        self.frame = frame.reset_index(drop=True)
        self.preprocess = preprocess
    def __len__(self):
        return len(self.frame)
    def __getitem__(self, idx):
        row = self.frame.iloc[idx]
        image = Image.open(row['abs_path']).convert('RGB')
        width, height = image.size
        x0, y0, x1, y1 = row['bbox']
        x0 = max(0, min(width, x0))
        y0 = max(0, min(height, y0))
        x1 = max(0, min(width, x1))
        y1 = max(0, min(height, y1))
        if x1 <= x0 or y1 <= y0:
            crop = image
        else:
            crop = image.crop((x0, y0, x1, y1))
        tensor = self.preprocess(crop)
        return tensor, row['label_id'], row['label']

MODEL_NAME = 'ViT-B-32'
PRETRAINED = 'laion2b_s34b_b79k'
model, _, preprocess = open_clip.create_model_and_transforms(MODEL_NAME, pretrained=PRETRAINED)
model = model.to(device)
text_prompts = [f"a photo of a {name.replace('_', ' ')}" for name in class_names]
text_tokens = open_clip.tokenize(text_prompts).to(device)
with torch.no_grad():
    text_features = model.encode_text(text_tokens)
    text_features = text_features / text_features.norm(dim=-1, keepdim=True)

BATCH_SIZE = 32
PIN_MEMORY = torch.cuda.is_available()
NUM_WORKERS = 0  # 멀티프로세싱 호환 이슈를 피하기 위해 기본값 0 (Colab GPU라면 수동으로 늘려도 됨)
dataset = ClipRegionDataset(subset, preprocess)
loader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY)
len(dataset)

사용할 split: val 샘플 수: 528


open_clip_model.safetensors:   0%|          | 0.00/605M [00:00<?, ?B/s]

528

## CLIP 추론 및 지표 산출

## 모델 구성

In [33]:
def evaluate_clip(model, loader, text_features):
    model.eval()
    all_preds = []
    all_labels = []
    all_label_names = []
    confidences = []
    raw_probs = []
    image_embeds = []
    with torch.no_grad():
        for images, labels, label_names in tqdm(loader):
            images = images.to(device)
            labels = labels.to(device)
            image_features = model.encode_image(images)
            image_features = image_features / image_features.norm(dim=-1, keepdim=True)
            logits = (model.logit_scale.exp() * image_features @ text_features.T)
            probs = logits.softmax(dim=-1)
            preds = probs.argmax(dim=-1)
            all_preds.extend(preds.cpu().tolist())
            all_labels.extend(labels.cpu().tolist())
            all_label_names.extend(label_names)
            confidences.extend(probs.max(dim=-1).values.cpu().tolist())
            raw_probs.extend(probs.cpu().tolist())
            image_embeds.extend(image_features.cpu().tolist())
    return {
        'preds': np.array(all_preds),
        'labels': np.array(all_labels),
        'label_names': np.array(all_label_names),
        'confidences': np.array(confidences),
        'probs': np.array(raw_probs),
        'embeds': np.array(image_embeds)
    }

if len(dataset) == 0:
    raise ValueError('선택한 split에 샘플이 없습니다. SPLIT_TO_USE 또는 MAX_SAMPLES_PER_CLASS를 조정하세요.')

outputs = evaluate_clip(model, loader, text_features)
preds = outputs['preds']
labels = outputs['labels']
confidences = outputs['confidences']
embeds = outputs['embeds']
print('samples evaluated:', len(preds))

  0%|          | 0/17 [00:00<?, ?it/s]Traceback (most recent call last):
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "<string>", line 1, in <module>
  File "/opt/anaconda3/lib/python3.12/multiprocessing/spawn.py", line 122, in spawn_main
  File "/opt/anaconda3/lib/python3.12/multiprocessing/spawn.py", line 122, in spawn_main
    exitcode = _main(fd, parent_sentinel)    
exitcode = _main(fd, parent_sentinel)
                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/opt/anaconda3/lib/python3.12/multiprocessing/spawn.py", line 132, in _main
  File "/opt/anaconda3/lib/python3.12/multiprocessing/spawn.py", line 132, in _main
    self = reduction.pickle.load(from_parent)
    self = reduction.pickle.load(from_parent)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't get attribute 'ClipRegionDataset' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
           ^^^^^^^^^^^^^^^^^^^^^^^^^^

RuntimeError: DataLoader worker (pid(s) 21857) exited unexpectedly

## 모델 구성

## 분류 리포트 및 통계

In [None]:
if len(labels) == 0:
    raise ValueError('No samples available for the selected split. Adjust SPLIT_TO_USE or MAX_SAMPLES_PER_CLASS.')
idx_to_label = {idx: name for name, idx in label_to_idx.items()}
class_names_ordered = [idx_to_label[i] for i in range(len(class_names))]
report = classification_report(labels, preds, target_names=class_names_ordered, zero_division=0, output_dict=True)
report_df = pd.DataFrame(report).transpose()
display(report_df)
acc = (preds == labels).mean()
print('overall accuracy:', round(acc, 4))
results_df = pd.DataFrame({
    'true_idx': labels,
    'pred_idx': preds,
    'true_label': [idx_to_label[i] for i in labels],
    'pred_label': [idx_to_label[i] for i in preds],
    'confidence': confidences
})
results_df.head()

## 그래프 1: 클래스별 정확도

In [None]:
per_class = results_df.groupby('true_label').apply(lambda g: (g['true_label'] == g['pred_label']).mean()).reset_index(name='accuracy')
plt.figure(figsize=(8, 4))
plt.bar(per_class['true_label'], per_class['accuracy'])
plt.xticks(rotation=45, ha='right')
plt.ylim(0, 1)
plt.ylabel('Accuracy')
plt.title('Per-class CLIP Accuracy')
plt.show()

## 그래프 2: 혼동 행렬 히트맵

In [None]:
cm = confusion_matrix(labels, preds, labels=list(range(len(class_names))))
cm_norm = cm / cm.sum(axis=1, keepdims=True)
plt.figure(figsize=(6, 5))
sns.heatmap(cm_norm, xticklabels=class_names_ordered, yticklabels=class_names_ordered, cmap='Blues', annot=True, fmt='.2f')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Normalized Confusion Matrix')
plt.tight_layout()
plt.show()

## 그래프 3: 신뢰도 분포

In [None]:
plt.figure(figsize=(8, 4))
sns.boxplot(data=results_df, x='true_label', y='confidence')
plt.xticks(rotation=45, ha='right')
plt.ylabel('Confidence')
plt.title('Confidence Distribution per Class (True Label 기준)')
plt.show()

## 그래프 4: 임베딩 PCA 시각화

In [None]:
sample_count = min(len(embeds), 2000)
embed_subset = embeds[:sample_count]
label_subset = labels[:sample_count]
pca = PCA(n_components=2, random_state=42)
proj = pca.fit_transform(embed_subset)
plt.figure(figsize=(6, 6))
plt.scatter(proj[:, 0], proj[:, 1], c=label_subset, cmap='tab10', alpha=0.6)
plt.title('CLIP Image Embeddings (PCA)')
plt.xlabel('PC1')
plt.ylabel('PC2')
plt.show()

## 샘플 크롭 예측 시각화

In [None]:
def predict_single(crop_image):
    with torch.no_grad():
        tensor = preprocess(crop_image).unsqueeze(0).to(device)
        feat = model.encode_image(tensor)
        feat = feat / feat.norm(dim=-1, keepdim=True)
        logits = (model.logit_scale.exp() * feat @ text_features.T)
        probs = logits.softmax(dim=-1).cpu().numpy()[0]
        pred_idx = probs.argmax()
        return pred_idx, probs[pred_idx]

if len(subset) == 0:
    print('subset is empty, skip visualization')
else:
    samples = subset.sample(n=min(4, len(subset)), random_state=0)
    fig, axes = plt.subplots(1, len(samples), figsize=(14, 4))
    if len(samples) == 1:
        axes = [axes]
    for ax, (_, row) in zip(axes, samples.iterrows()):
        image = Image.open(row['abs_path']).convert('RGB')
        x0, y0, x1, y1 = row['bbox']
        crop = image.crop((x0, y0, x1, y1))
        pred_idx, conf = predict_single(crop)
        pred_label = idx_to_label[pred_idx]
        ax.imshow(crop)
        ax.axis('off')
        ax.set_title(f"true: {row['label']}
pred: {pred_label} ({conf:.2f})")
    plt.tight_layout()
    plt.show()

## Kaggle 다운로드 (옵션)

In [None]:
MODE = 'dataset'
TARGET = 'zynicide/wine-reviews'
SELECT_FILES = []
UNZIP = True
OVERWRITE = True
print(MODE, TARGET)

In [None]:
from kaggle import KaggleApi
from requests.exceptions import HTTPError
api = KaggleApi()
api.authenticate()
def download_dataset(slug, dest, files, unzip, force):
    if files:
        for f in files:
            api.dataset_download_file(slug, f, path=dest, force=force, quiet=False)
    else:
        api.dataset_download_files(slug, path=dest, unzip=unzip, force=force, quiet=False)
def download_competition(slug, dest, files, unzip, force):
    if files:
        for f in files:
            api.competition_download_file(slug, f, path=dest, force=force, quiet=False)
    else:
        api.competition_download_files(slug, path=dest, quiet=False)
    if unzip:
        for z in Path(dest).glob('*.zip'):
            import subprocess
            subprocess.run(['unzip', '-o', str(z), '-d', dest], check=False)
try:
    if MODE == 'dataset':
        download_dataset(TARGET, str(DATA_DIR), SELECT_FILES, UNZIP, OVERWRITE)
    elif MODE == 'competition':
        download_competition(TARGET, str(DATA_DIR), SELECT_FILES, UNZIP, OVERWRITE)
    else:
        raise ValueError('invalid mode')
    print('done')
except HTTPError as e:
    print('error', e)