# Unzip Image Dataset

In [1]:
!unzip dataset-new-1000.zip

Archive:  dataset-new-1000.zip
   creating: dataset-new-1000/
   creating: dataset-new-1000/dataset/
  inflating: dataset-new-1000/dataset/.DS_Store  
   creating: dataset-new-1000/dataset/product-main/
  inflating: dataset-new-1000/dataset/product-main/baju-0.png  
  inflating: dataset-new-1000/dataset/product-main/baju-1.png  
  inflating: dataset-new-1000/dataset/product-main/baju-10.png  
  inflating: dataset-new-1000/dataset/product-main/baju-11.png  
  inflating: dataset-new-1000/dataset/product-main/baju-12.png  
  inflating: dataset-new-1000/dataset/product-main/baju-13.png  
  inflating: dataset-new-1000/dataset/product-main/baju-14.png  
  inflating: dataset-new-1000/dataset/product-main/baju-15.png  
  inflating: dataset-new-1000/dataset/product-main/baju-16.png  
  inflating: dataset-new-1000/dataset/product-main/baju-17.png  
  inflating: dataset-new-1000/dataset/product-main/baju-18.png  
  inflating: dataset-new-1000/dataset/product-main/baju-19.png  
  inflating: datase

# Install Dependencies

In [2]:
!pip install catboost -q

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m99.2/99.2 MB[0m [31m7.4 MB/s[0m eta [36m0:00:00[0m
[?25h

# Import Dependencies

In [3]:
import os
import io
import math
import random
import time
import inspect
from pathlib import Path
import pickle

import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, roc_auc_score, f1_score
from sklearn.decomposition import PCA
from catboost import CatBoostClassifier
from xgboost import XGBClassifier

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
from torchvision import transforms

from transformers import AutoModel, AutoTokenizer, AutoImageProcessor
from huggingface_hub import hf_hub_download
from peft import get_peft_model, LoraConfig, TaskType, PeftModel

from PIL import Image

import matplotlib.pyplot as plt
from tqdm.auto import tqdm

# Load Data

In [4]:
dataset_raw = pd.read_csv('multilang_dataset.csv')

In [5]:
dataset_raw.head()

Unnamed: 0,product-main,product-review,caption,label
0,baju-0.png,baju-0-0.png,Halo kak! Aku mau jualan baju online murah mer...,0
1,baju-0.png,baju-0-1.png,PROMO GILA!! Baju branded original hanya 50rb!...,0
2,baju-0.png,baju-0-2.png,Saya mau refund baju hitam ini. Barang yang di...,1
3,baju-0.png,baju-0-3.png,Mohon refund segera. Baju yang dikirim berbeda...,1
4,baju-1.png,baju-1-0.png,DISKON AKHIR TAHUN! Semua baju cuma 25rb! Kual...,0


In [6]:
# Function to load image data
def load_image(row, name):
    image_path = os.path.join('dataset-new-1000/dataset', name, row[name])
    with open(image_path, 'rb') as f:
        # Read the image file content
        image_data = f.read()
        return image_data

dataset = dataset_raw.copy()
dataset['img-prod-main'] = dataset_raw.apply(load_image, axis=1, name='product-main')
dataset['img-prod-review'] = dataset_raw.apply(load_image, axis=1, name='product-review')

dataset.head()

Unnamed: 0,product-main,product-review,caption,label,img-prod-main,img-prod-review
0,baju-0.png,baju-0-0.png,Halo kak! Aku mau jualan baju online murah mer...,0,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...
1,baju-0.png,baju-0-1.png,PROMO GILA!! Baju branded original hanya 50rb!...,0,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...
2,baju-0.png,baju-0-2.png,Saya mau refund baju hitam ini. Barang yang di...,1,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...
3,baju-0.png,baju-0-3.png,Mohon refund segera. Baju yang dikirim berbeda...,1,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...
4,baju-1.png,baju-1-0.png,DISKON AKHIR TAHUN! Semua baju cuma 25rb! Kual...,0,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...


In [7]:
dataset_img = dataset[['img-prod-main','img-prod-review']]
dataset_cap = dataset[['caption']]

In [8]:
dataset_cap.head()

Unnamed: 0,caption
0,Halo kak! Aku mau jualan baju online murah mer...
1,PROMO GILA!! Baju branded original hanya 50rb!...
2,Saya mau refund baju hitam ini. Barang yang di...
3,Mohon refund segera. Baju yang dikirim berbeda...
4,DISKON AKHIR TAHUN! Semua baju cuma 25rb! Kual...


In [9]:
dataset_img.head()

Unnamed: 0,img-prod-main,img-prod-review
0,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...
1,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...
2,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...
3,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...
4,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...


# Feature Engineering

## Embedding Image Features

### Embedding Function

In [None]:
def bytes_to_image(image_bytes):
    try:
        return Image.open(io.BytesIO(image_bytes)).convert("RGB")
    except Exception as e:
        return None

# Batching function
@torch.no_grad()
def get_batch_embeddings(images, model_name):
    if model_name == "dino_finetune":
        inputs = processor_dino(images=images, return_tensors="pt").to(device)
        pixel_values = inputs["pixel_values"]

        with torch.no_grad():
            outputs = model_dino_finetune.encode(pixel_values)
        return outputs.cpu().numpy()

In [None]:
def extract_embeddings_from_column(df, column_name, model_name,batch_size=32):
    embeddings = []
    batch = []

    for image_data in tqdm(df[column_name], desc=f"Processing {column_name}"):
        image = bytes_to_image(image_data)
        if image:
            batch.append(image)
        else:
            embeddings.append(None)
            continue

        # Kalau sudah cukup batch
        if len(batch) == batch_size:
            batch_features = get_batch_embeddings(batch, model_name)
            embeddings.extend(batch_features)
            batch = []

    # Sisa batch terakhir
    if batch:
        batch_features = get_batch_embeddings(batch, model_name)
        embeddings.extend(batch_features)

    return embeddings

## Embedding Text Features

### Embedding Function

In [10]:
@torch.no_grad()
def embed_batch(texts, model_name, batch_size=32):
    embeddings = []
    for i in tqdm(range(0, len(texts), batch_size), desc="Embedding captions"):
        batch = texts[i:i + batch_size]

        if model_name == 'qwen_embed':
            tokenizer = tokenizer_qwen
            model = model_qwen_lora.base_model

        # Tokenisasi
        tokens = tokenizer(batch, padding=True, truncation=True, return_tensors="pt").to(device)

        # Forward
        output = model(**tokens)

        # Ambil hidden states (hanya kalau model punya last_hidden_state)
        if hasattr(output, "last_hidden_state"):
            last_hidden_state = output.last_hidden_state
        else:
            raise ValueError(f"Model {model_name} tidak mengembalikan last_hidden_state")

        # Mean pooling
        attention_mask = tokens['attention_mask']
        mask_expanded = attention_mask.unsqueeze(-1).expand(last_hidden_state.size())
        summed = torch.sum(last_hidden_state * mask_expanded, 1)
        counts = torch.clamp(mask_expanded.sum(1), min=1e-9)
        mean_pooled = summed / counts

        embeddings.append(mean_pooled.cpu())

    return torch.cat(embeddings, dim=0)


# Finetune Image Best Model

## Finetune Siamese Model

In [11]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [13]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)


Device: cuda


In [None]:
model_checkpoint = "facebook/dinov2-large"
processor = AutoImageProcessor.from_pretrained(model_checkpoint)
backbone = AutoModel.from_pretrained(model_checkpoint).to(device)
backbone.eval()

Fetching 1 files:   0%|          | 0/1 [00:00<?, ?it/s]

Dinov2Model(
  (embeddings): Dinov2Embeddings(
    (patch_embeddings): Dinov2PatchEmbeddings(
      (projection): Conv2d(3, 1024, kernel_size=(14, 14), stride=(14, 14))
    )
    (dropout): Dropout(p=0.0, inplace=False)
  )
  (encoder): Dinov2Encoder(
    (layer): ModuleList(
      (0-23): 24 x Dinov2Layer(
        (norm1): LayerNorm((1024,), eps=1e-06, elementwise_affine=True)
        (attention): Dinov2Attention(
          (attention): Dinov2SelfAttention(
            (query): Linear(in_features=1024, out_features=1024, bias=True)
            (key): Linear(in_features=1024, out_features=1024, bias=True)
            (value): Linear(in_features=1024, out_features=1024, bias=True)
          )
          (output): Dinov2SelfOutput(
            (dense): Linear(in_features=1024, out_features=1024, bias=True)
            (dropout): Dropout(p=0.0, inplace=False)
          )
        )
        (layer_scale1): Dinov2LayerScale()
        (drop_path): Identity()
        (norm2): LayerNorm((1024,),

In [None]:


def try_inject(backbone):
    cfg = LoraConfig(
        r=8,
        lora_alpha=16,
        target_modules=["query", "value"],
        lora_dropout=0.1,
        bias="none",
        task_type="FEATURE_EXTRACTION",
    )
    return get_peft_model(backbone, cfg)

backbone.to(device)

# tampilkan proporsi params yang bisa dilatih
trainable = sum(p.numel() for p in backbone.parameters() if p.requires_grad)
total = sum(p.numel() for p in backbone.parameters())
print(f"\nTrainable params: {trainable:,} / {total:,} ({100*trainable/total:.6f}%)")



Trainable params: 304,368,640 / 304,368,640 (100.000000%)


In [None]:
class ProductMatchingDataset(Dataset):
    def __init__(self, csv_file, main_img_dir, review_img_dir, processor):
        self.data_frame = pd.read_csv(csv_file)
        self.main_img_dir = main_img_dir
        self.review_img_dir = review_img_dir
        self.processor = processor

    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        main_img_name = f"{self.main_img_dir}/{self.data_frame.iloc[idx, 0]}"
        review_img_name = f"{self.review_img_dir}/{self.data_frame.iloc[idx, 1]}"
        label = int(self.data_frame.iloc[idx, 3])

        main_image = Image.open(main_img_name).convert("RGB")
        review_image = Image.open(review_img_name).convert("RGB")

        processed_main = self.processor(images=main_image, return_tensors="pt")['pixel_values'].squeeze(0)
        processed_review = self.processor(images=review_image, return_tensors="pt")['pixel_values'].squeeze(0)

        # Ubah label untuk CosineEmbeddingLoss:
        # Label 0 (cocok) -> target 1 (ingin cosine similarity mendekati 1)
        # Label 1 (tidak cocok) -> target -1 (ingin cosine similarity mendekati -1)
        target_label = 1 if label == 0 else -1

        return processed_main, processed_review, torch.tensor(target_label, dtype=torch.float)

# Inisialisasi model dan processor
processor_dino = AutoImageProcessor.from_pretrained("facebook/dinov2-large")
model_dino_base = AutoModel.from_pretrained("facebook/dinov2-large")

# Buat dataset dan dataloader
MAIN_IMG_DIR = '/content/dataset-new-1000/dataset/product-main'
REVIEW_IMG_DIR = '/content/dataset-new-1000/dataset/product-review'
CSV_PATH = '/content/multilang_dataset.csv'

dataset = ProductMatchingDataset(
    csv_file=CSV_PATH,
    main_img_dir=MAIN_IMG_DIR,
    review_img_dir=REVIEW_IMG_DIR,
    processor=processor_dino
)

train_dataloader = DataLoader(dataset, batch_size=4, shuffle=True)

Fetching 1 files:   0%|          | 0/1 [00:00<?, ?it/s]

In [None]:
# Colab cell 5
CSV_PATH = "/content/multilang_dataset.csv"
ROOT = "/content/dataset-new-1000/dataset"  # contains product-main and product-review

df = pd.read_csv(CSV_PATH)
df.head()

Unnamed: 0,product-main,product-review,caption,label
0,baju-0.png,baju-0-0.png,Halo kak! Aku mau jualan baju online murah mer...,0
1,baju-0.png,baju-0-1.png,PROMO GILA!! Baju branded original hanya 50rb!...,0
2,baju-0.png,baju-0-2.png,Saya mau refund baju hitam ini. Barang yang di...,1
3,baju-0.png,baju-0-3.png,Mohon refund segera. Baju yang dikirim berbeda...,1
4,baju-1.png,baju-1-0.png,DISKON AKHIR TAHUN! Semua baju cuma 25rb! Kual...,0


In [None]:
class DualImageDataset(Dataset):
    def __init__(self, df, root, processor, split="train", transform=None):
        self.df = df.reset_index(drop=True)
        self.root = Path(root)
        self.proc = processor
        self.transform = transform
    def __len__(self):
        return len(self.df)
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        main_path = self.root / "product-main" / row["product-main"]
        rev_path  = self.root / "product-review" / row["product-review"]
        img1 = Image.open(main_path).convert("RGB")
        img2 = Image.open(rev_path).convert("RGB")

        p1 = self.proc(img1, return_tensors="pt")
        p2 = self.proc(img2, return_tensors="pt")

        pixel1 = p1['pixel_values'].squeeze(0)
        pixel2 = p2['pixel_values'].squeeze(0)
        label = int(row["label"])
        return pixel1, pixel2, label

from sklearn.model_selection import train_test_split
train_df, val_df = train_test_split(df, test_size=0.12, stratify=df["label"], random_state=42)
train_ds = DualImageDataset(train_df, ROOT, processor)
val_ds   = DualImageDataset(val_df, ROOT, processor)

def collate_fn(batch):
    p1 = torch.stack([b[0] for b in batch])
    p2 = torch.stack([b[1] for b in batch])
    labels = torch.tensor([b[2] for b in batch], dtype=torch.float32)
    return {"pixel1": p1, "pixel2": p2, "labels": labels}

train_loader = DataLoader(train_ds, batch_size=16, shuffle=True, collate_fn=collate_fn, num_workers=2)
val_loader   = DataLoader(val_ds,   batch_size=32, shuffle=False, collate_fn=collate_fn, num_workers=2)


In [None]:
import inspect
import torch
import torch.nn as nn

class SiameseModel(nn.Module):
    def __init__(self, backbone, emb_dim=256, pool="cls", normalize=True):
        super().__init__()
        self.backbone = backbone
        self.pool = pool
        self.normalize = normalize

        hidden_size = getattr(backbone.config, "hidden_size", 1024)

        self.proj = nn.Sequential(
            nn.Linear(hidden_size, max(hidden_size // 2, emb_dim)),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(max(hidden_size // 2, emb_dim), emb_dim)
        )

    def _get_inner_model(self, module):
        """Ambil base model kalau ada wrapper PEFT."""
        for attr in ("base_model", "model", "get_base_model"):
            if hasattr(module, attr):
                inner = getattr(module, attr)
                if callable(inner) and attr == "get_base_model":
                    inner = inner()
                return inner
        return module

    def _safe_forward(self, module, **kwargs):
        return module.forward(**kwargs)

    def encode(self, pixel: torch.Tensor):
        out = self._safe_forward(self.backbone, pixel_values=pixel, return_dict=True)

        if hasattr(out, "pooler_output") and out.pooler_output is not None:
            h = out.pooler_output
        else:
            h = out.last_hidden_state[:, 0, :]  # CLS token

        emb = self.proj(h)
        if self.normalize:
            emb = nn.functional.normalize(emb, dim=-1)
        return emb

    def forward(self, p1, p2):
        e1 = self.encode(p1)
        e2 = self.encode(p2)
        sim = (e1 * e2).sum(dim=-1)
        return e1, e2, sim

In [None]:
model = SiameseModel(backbone, emb_dim=256, pool="cls", normalize=True).to(device)

optimizer = torch.optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=5e-4, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=5)

bce_loss = nn.BCEWithLogitsLoss()

temperature = torch.tensor(10.0, device=device, requires_grad=False)

def train_epoch(model, loader, opt):
    model.train()
    total_loss = 0.0
    for batch in tqdm(loader):
        p1 = batch["pixel1"].to(device)
        p2 = batch["pixel2"].to(device)
        labels = batch["labels"].to(device)
        _, _, sim = model(p1, p2)  # sim in [-1,1]
        logits = sim * temperature  # scale
        loss = bce_loss(logits, labels)
        opt.zero_grad()
        loss.backward()
        opt.step()
        total_loss += loss.item() * p1.size(0)
    return total_loss / len(loader.dataset)

def eval_epoch(model, loader):
    model.eval()
    import sklearn.metrics as skm
    ys, preds = [], []
    with torch.no_grad():
        for batch in tqdm(loader):
            p1 = batch["pixel1"].to(device)
            p2 = batch["pixel2"].to(device)
            labels = batch["labels"].cpu().numpy()
            _, _, sim = model(p1, p2)
            probs = torch.sigmoid(sim * temperature).cpu().numpy()
            ys.append(labels)
            preds.append(probs)
    ys = np.concatenate(ys)
    preds = np.concatenate(preds)
    # compute ROC AUC & accuracy@0.5
    auc = skm.roc_auc_score(ys, preds)
    acc = ( (preds>=0.5).astype(int) == ys.astype(int) ).mean()
    return {"roc_auc": auc, "acc": acc}

# training run
num_epochs = 6
for epoch in range(num_epochs):
    t0 = time.time()
    train_loss = train_epoch(model, train_loader, optimizer)
    metrics = eval_epoch(model, val_loader)
    scheduler.step()
    print(f"Epoch {epoch+1}/{num_epochs}  loss={train_loss:.4f}  val_auc={metrics['roc_auc']:.4f}  val_acc={metrics['acc']:.4f}  time={time.time()-t0:.1f}s")


  0%|          | 0/60 [00:00<?, ?it/s]

  0%|          | 0/5 [00:00<?, ?it/s]

Epoch 1/6  loss=1.1192  val_auc=0.6019  val_acc=0.5385  time=226.6s


  0%|          | 0/60 [00:00<?, ?it/s]

  0%|          | 0/5 [00:00<?, ?it/s]

Epoch 2/6  loss=0.4100  val_auc=0.6459  val_acc=0.5615  time=233.8s


  0%|          | 0/60 [00:00<?, ?it/s]

Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x792bcfdcab60>
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/torch/utils/data/dataloader.py", line 1664, in __del__
    self._shutdown_workers()
  File "/usr/local/lib/python3.12/dist-packages/torch/utils/data/dataloader.py", line 1647, in _shutdown_workers
    if w.is_alive():
       ^^^^^^^^^^^^
  File "/usr/lib/python3.12/multiprocessing/process.py", line 160, in is_alive
    assert self._parent_pid == os.getpid(), 'can only test a child process'
       Exception ignored in:  <function _MultiProcessingDataLoaderIter.__del__ at 0x792bcfdcab60> 
 Traceback (most recent call last):
 ^^^  File "/usr/local/lib/python3.12/dist-packages/torch/utils/data/dataloader.py", line 1664, in __del__
    ^self._shutdown_workers()^^^
^  File "/usr/local/lib/python3.12/dist-packages/torch/utils/data/dataloader.py", line 1647, in _shutdown_workers
^    ^if w.is_alive():^
^ ^ ^ ^ ^ ^ ^ ^^^^^^^

  0%|          | 0/5 [00:00<?, ?it/s]

Epoch 3/6  loss=0.1546  val_auc=0.6954  val_acc=0.6077  time=224.9s


  0%|          | 0/60 [00:00<?, ?it/s]

  0%|          | 0/5 [00:00<?, ?it/s]

Epoch 4/6  loss=0.0581  val_auc=0.6876  val_acc=0.6154  time=232.7s


  0%|          | 0/60 [00:00<?, ?it/s]

  0%|          | 0/5 [00:00<?, ?it/s]

Epoch 5/6  loss=0.0348  val_auc=0.6914  val_acc=0.6077  time=233.6s


  0%|          | 0/60 [00:00<?, ?it/s]

  0%|          | 0/5 [00:00<?, ?it/s]

Epoch 6/6  loss=0.0304  val_auc=0.6914  val_acc=0.6077  time=233.3s


In [None]:
SAVE_DIR = "/content/drive/MyDrive/dinov2_lora_siamese"
os.makedirs(SAVE_DIR, exist_ok=True)
backbone.save_pretrained(SAVE_DIR)
# save projection head separately
torch.save(model.proj.state_dict(), os.path.join(SAVE_DIR, "proj_head.pt"))
print("Saved to", SAVE_DIR)

In [None]:
SAVE_PATH = "/content/drive/MyDrive/siamese_model.pt"
torch.save(model.state_dict(), SAVE_PATH)

## Use Model

In [None]:
class SiameseModel(nn.Module):
    def __init__(self, backbone, emb_dim=256, pool="cls", normalize=True):
        super().__init__()
        self.backbone = backbone
        self.pool = pool
        self.normalize = normalize

        hidden_size = getattr(backbone.config, "hidden_size", 1024)

        self.proj = nn.Sequential(
            nn.Linear(hidden_size, max(hidden_size // 2, emb_dim)),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(max(hidden_size // 2, emb_dim), emb_dim)
        )

    def _get_inner_model(self, module):
        """Ambil base model kalau ada wrapper PEFT."""
        for attr in ("base_model", "model", "get_base_model"):
            if hasattr(module, attr):
                inner = getattr(module, attr)
                if callable(inner) and attr == "get_base_model":
                    inner = inner()
                return inner
        return module

    def _safe_forward(self, module, **kwargs):
        return module.forward(**kwargs)

    def encode(self, pixel: torch.Tensor):
        out = self._safe_forward(self.backbone, pixel_values=pixel, return_dict=True)

        if hasattr(out, "pooler_output") and out.pooler_output is not None:
            h = out.pooler_output
        else:
            h = out.last_hidden_state[:, 0, :]  # CLS token

        emb = self.proj(h)
        if self.normalize:
            emb = nn.functional.normalize(emb, dim=-1)
        return emb

    def forward(self, p1, p2):
        e1 = self.encode(p1)
        e2 = self.encode(p2)
        sim = (e1 * e2).sum(dim=-1)
        return e1, e2, sim

In [None]:
from peft import LoraConfig, get_peft_model

def try_inject(backbone):
    cfg = LoraConfig(
        r=8,
        lora_alpha=16,
        target_modules=["query", "value"],
        lora_dropout=0.1,
        bias="none",
        task_type="FEATURE_EXTRACTION",
    )
    return get_peft_model(backbone, cfg)


In [None]:
SAVE_PATH = "/content/drive/MyDrive/siamese_model.pt"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
backbone = AutoModel.from_pretrained("facebook/dinov2-large")
processor_dino = AutoImageProcessor.from_pretrained("facebook/dinov2-large")

backbone = try_inject(backbone)

model_dino_finetune = SiameseModel(backbone).to(device)

model_dino_finetune.load_state_dict(torch.load(SAVE_PATH, map_location=device))
model_dino_finetune.eval()

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/549 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.22G [00:00<?, ?B/s]

Fetching 1 files:   0%|          | 0/1 [00:00<?, ?it/s]

preprocessor_config.json:   0%|          | 0.00/436 [00:00<?, ?B/s]

Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


SiameseModel(
  (backbone): PeftModelForFeatureExtraction(
    (base_model): LoraModel(
      (model): Dinov2Model(
        (embeddings): Dinov2Embeddings(
          (patch_embeddings): Dinov2PatchEmbeddings(
            (projection): Conv2d(3, 1024, kernel_size=(14, 14), stride=(14, 14))
          )
          (dropout): Dropout(p=0.0, inplace=False)
        )
        (encoder): Dinov2Encoder(
          (layer): ModuleList(
            (0-23): 24 x Dinov2Layer(
              (norm1): LayerNorm((1024,), eps=1e-06, elementwise_affine=True)
              (attention): Dinov2Attention(
                (attention): Dinov2SelfAttention(
                  (query): lora.Linear(
                    (base_layer): Linear(in_features=1024, out_features=1024, bias=True)
                    (lora_dropout): ModuleDict(
                      (default): Dropout(p=0.1, inplace=False)
                    )
                    (lora_A): ModuleDict(
                      (default): Linear(in_features=1024, 

In [None]:
dataset_dino_finetune = dataset_img.copy()
dataset_dino_finetune['img-prod-main-features'] = extract_embeddings_from_column(dataset_dino_finetune, 'img-prod-main', model_name='dino_finetune')
dataset_dino_finetune['img-prod-review-features'] = extract_embeddings_from_column(dataset_dino_finetune, 'img-prod-review',model_name='dino_finetune')

Processing img-prod-main:   0%|          | 0/1080 [00:00<?, ?it/s]

Processing img-prod-review:   0%|          | 0/1080 [00:00<?, ?it/s]

In [None]:
dataset_embed_img_only = {}
dataset_embed_test_img_only = {}

# Only process the 'dino_finetune' image dataset
for img_name, dataset_img in [("dino_finetune", dataset_dino_finetune)]:
    #convert menjadi matrix
    img_main_matrix = np.stack(dataset_img['img-prod-main-features'])
    img_review_matrix = np.stack(dataset_img['img-prod-review-features'])

    #numpy row menjadi column
    df_img_main_embed = pd.DataFrame(img_main_matrix, columns=[f"img_main_{i}" for i in range(img_main_matrix.shape[1])])
    df_img_review_embed = pd.DataFrame(img_review_matrix, columns=[f"img_review_{i}" for i in range(img_review_matrix.shape[1])])
    label = pd.DataFrame(dataset['label'])

    #concat
    combine_dataset_img_only = pd.concat([df_img_main_embed, df_img_review_embed, label], axis=1)

    X_train_img, X_test_img, y_train_img, y_test_img = train_test_split(combine_dataset_img_only.drop('label', axis=1),
                                                                        combine_dataset_img_only['label'], test_size=0.2, random_state=42, stratify=combine_dataset_img_only['label'])

    dataset_embed_img_only[f'{img_name}_img_only'] = pd.concat([X_train_img, y_train_img], axis=1)
    dataset_embed_test_img_only[f'{img_name}_img_only'] = pd.concat([X_test_img, y_test_img], axis=1)

In [None]:
for name, data in dataset_embed_img_only.items():
    # Split training data
    X = data.drop('label', axis=1)
    y = data['label']
    X_train, X_val, y_train, y_val = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )

    # Train model
    model = CatBoostClassifier(
        iterations=100,
        learning_rate=0.1,
        depth=6,
        loss_function='Logloss',
        eval_metric='Accuracy',
        random_seed=42,
        verbose=10
    )
    model.fit(X_train, y_train, eval_set=(X_val, y_val), verbose=False)

    # Validation evaluation
    y_pred_val = model.predict(X_val)
    y_pred_val_proba = model.predict_proba(X_val)[:, 1]

    # TEST SET: Ambil dari dataset_embed_test_img_only
    test_data = dataset_embed_test_img_only[name]
    X_test = test_data.drop('label', axis=1)
    y_test = test_data['label']

    y_pred_test = model.predict(X_test)
    y_pred_test_proba = model.predict_proba(X_test)[:, 1]

    acc_test = accuracy_score(y_test, y_pred_test)
    roc_test = roc_auc_score(y_test, y_pred_test_proba)
    f1_test = f1_score(y_test, y_pred_test)

    print(f"\n{name} - TEST")
    print(f"Accuracy : {acc_test:.4f}")
    print(f"ROC AUC  : {roc_test:.4f}")
    print(f"F1 Score : {f1_test:.4f}")


dino_finetune_img_only - TEST
Accuracy : 0.7176
ROC AUC  : 0.7978
F1 Score : 0.7215


# Finetune Text Best Model

## Finetune Qwen

In [None]:
class QwenEmbeddingForClassification(nn.Module):
    def __init__(self, model_name="Qwen/Qwen3-Embedding-0.6B", num_labels=2, emb_dim=1024):
        super().__init__()
        backbone = AutoModel.from_pretrained(model_name, torch_dtype="auto")

        lora_config = LoraConfig(
            task_type=TaskType.FEATURE_EXTRACTION,  # fokus embedding
            r=16,
            lora_alpha=32,
            lora_dropout=0.05,
            bias="none"
        )
        self.backbone = get_peft_model(backbone, lora_config)
        self.classifier = nn.Linear(emb_dim, num_labels)

    def mean_pooling(self, hidden_states, attention_mask):
        mask_expanded = attention_mask.unsqueeze(-1).expand(hidden_states.size())
        summed = torch.sum(hidden_states * mask_expanded, 1)
        counts = torch.clamp(mask_expanded.sum(1), min=1e-9)
        return summed / counts

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.backbone(input_ids=input_ids, attention_mask=attention_mask)
        pooled = self.mean_pooling(outputs.last_hidden_state, attention_mask)

        logits = self.classifier(pooled)

        loss = None
        if labels is not None:
            loss_fn = nn.CrossEntropyLoss()
            loss = loss_fn(logits, labels)

        return {"loss": loss, "logits": logits, "embeddings": pooled}


In [None]:
class CaptionDataset(Dataset):
    def __init__(self, dataframe):
        self.df = dataframe

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        return {
            "caption": str(self.df.iloc[idx]["caption"]),
            "label": int(self.df.iloc[idx]["label"])
        }


In [None]:
model_name = "Qwen/Qwen3-Embedding-0.6B"
tokenizer = AutoTokenizer.from_pretrained(model_name)

def collate_fn(batch):
    texts = [x["caption"] for x in batch]
    labels = torch.tensor([x["label"] for x in batch])
    enc = tokenizer(texts, padding=True, truncation=True, return_tensors="pt", max_length=128)
    return enc["input_ids"], enc["attention_mask"], labels

train_df, val_df = train_test_split(dataset, test_size=0.2, random_state=42, stratify=dataset["label"])

train_dataset = CaptionDataset(train_df)
val_dataset   = CaptionDataset(val_df)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, collate_fn=collate_fn)
val_loader   = DataLoader(val_dataset, batch_size=8, shuffle=False, collate_fn=collate_fn)


device = "cuda" if torch.cuda.is_available() else "cpu"
model = QwenEmbeddingForClassification(model_name, num_labels=len(set(dataset["label"]))).to(device)
optimizer = optim.AdamW(model.parameters(), lr=2e-4)

num_epochs = 3

for epoch in range(num_epochs):
    # --- Training ---
    model.train()
    train_loss, correct, total = 0, 0, 0
    loop = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Train]")
    for input_ids, attn_mask, labels in loop:
        input_ids, attn_mask, labels = input_ids.to(device), attn_mask.to(device), labels.to(device)

        out = model(input_ids, attn_mask, labels=labels)
        loss, logits = out["loss"], out["logits"]

        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        train_loss += loss.item()

        # hitung accuracy batch
        preds = torch.argmax(logits, dim=-1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)

        loop.set_postfix(loss=loss.item(), acc=correct/total)

    avg_train_loss = train_loss / len(train_loader)
    train_acc = correct / total

    # --- Validation ---
    model.eval()
    val_loss, correct, total = 0, 0, 0
    with torch.no_grad():
        loop_val = tqdm(val_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Val]")
        for input_ids, attn_mask, labels in loop_val:
            input_ids, attn_mask, labels = input_ids.to(device), attn_mask.to(device), labels.to(device)

            out = model(input_ids, attn_mask, labels=labels)
            loss, logits = out["loss"], out["logits"]

            val_loss += loss.item()

            preds = torch.argmax(logits, dim=-1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

            loop_val.set_postfix(loss=loss.item(), acc=correct/total)

    avg_val_loss = val_loss / len(val_loader)
    val_acc = correct / total

    print(f"Epoch {epoch+1}: "
          f"Train Loss = {avg_train_loss:.4f} | Train Acc = {train_acc:.4f} | "
          f"Val Loss = {avg_val_loss:.4f} | Val Acc = {val_acc:.4f}")


Epoch 1/3 [Train]:   0%|          | 0/108 [00:00<?, ?it/s]

Epoch 1/3 [Val]:   0%|          | 0/27 [00:00<?, ?it/s]

Epoch 1: Train Loss = 0.4153 | Train Acc = 0.8183 | Val Loss = 0.2546 | Val Acc = 0.8935


Epoch 2/3 [Train]:   0%|          | 0/108 [00:00<?, ?it/s]

Epoch 2/3 [Val]:   0%|          | 0/27 [00:00<?, ?it/s]

Epoch 2: Train Loss = 0.1661 | Train Acc = 0.9375 | Val Loss = 0.2183 | Val Acc = 0.9306


Epoch 3/3 [Train]:   0%|          | 0/108 [00:00<?, ?it/s]

Epoch 3/3 [Val]:   0%|          | 0/27 [00:00<?, ?it/s]

Epoch 3: Train Loss = 0.0787 | Train Acc = 0.9803 | Val Loss = 0.2711 | Val Acc = 0.8981


In [None]:
save_dir = "/content/drive/MyDrive/qwen_embed_lora"

model.backbone.save_pretrained(save_dir)   # backbone + LoRA adapter
tokenizer.save_pretrained(save_dir)

print(f"Model saved to {save_dir}")

Model saved to /content/drive/MyDrive/qwen_embed_lora


## Use Model

In [14]:
model_name = "Qwen/Qwen3-Embedding-0.6B"
save_dir = "/content/drive/MyDrive/qwen_embed_lora"

# load backbone
tokenizer_qwen = AutoTokenizer.from_pretrained(save_dir)
base_model = AutoModel.from_pretrained(model_name, torch_dtype="auto")

# load lora adapter
model_qwen_lora = PeftModel.from_pretrained(base_model, save_dir).to(device)
model_qwen_lora.eval()

PeftModelForFeatureExtraction(
  (base_model): LoraModel(
    (model): Qwen3Model(
      (embed_tokens): Embedding(151669, 1024)
      (layers): ModuleList(
        (0-27): 28 x Qwen3DecoderLayer(
          (self_attn): Qwen3Attention(
            (q_proj): lora.Linear(
              (base_layer): Linear(in_features=1024, out_features=2048, bias=False)
              (lora_dropout): ModuleDict(
                (default): Dropout(p=0.05, inplace=False)
              )
              (lora_A): ModuleDict(
                (default): Linear(in_features=1024, out_features=16, bias=False)
              )
              (lora_B): ModuleDict(
                (default): Linear(in_features=16, out_features=2048, bias=False)
              )
              (lora_embedding_A): ParameterDict()
              (lora_embedding_B): ParameterDict()
              (lora_magnitude_vector): ModuleDict()
            )
            (k_proj): Linear(in_features=1024, out_features=1024, bias=False)
            (v_proj

In [15]:
dataset_qwen_finetune = dataset_cap.copy()
captions = dataset_qwen_finetune["caption"].astype(str).tolist()

caption_embeddings_qwen_finetune = embed_batch(captions, "qwen_embed", batch_size=32)
dataset_qwen_finetune["caption_embeddings"] = list(caption_embeddings_qwen_finetune.numpy())


Embedding captions:   0%|          | 0/34 [00:00<?, ?it/s]

In [16]:
dataset_embed_text_only = {}
dataset_embed_test_text_only = {}
for text_name, dataset_text in [("qwen_finetune", dataset_qwen_finetune)]:
    # convert menjadi matrix
    text_matrix = np.stack(dataset_text['caption_embeddings'])
    # numpy row menjadi column
    df_text_embed = pd.DataFrame(text_matrix, columns=[f"text_{i}" for i in range(text_matrix.shape[1])])
    label = pd.DataFrame(dataset['label'])

    # concat
    combine_dataset_text_only = pd.concat([df_text_embed, label], axis=1)

    X_train_text, X_test_text, y_train_text, y_test_text = train_test_split(combine_dataset_text_only.drop('label', axis=1),
                                                                        combine_dataset_text_only['label'], test_size=0.1, random_state=42, stratify=combine_dataset_text_only['label'])

    dataset_embed_text_only[f'{text_name}_text_only'] = pd.concat([X_train_text, y_train_text], axis=1)
    dataset_embed_test_text_only[f'{text_name}_text_only'] = pd.concat([X_test_text, y_test_text], axis=1)

In [17]:
for name, data in dataset_embed_text_only.items():
    # Split training data
    X = data.drop('label', axis=1)
    y = data['label']
    X_train, X_val, y_train, y_val = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )

    # Train model
    model = CatBoostClassifier(
        iterations=100,
        learning_rate=0.1,
        depth=6,
        loss_function='Logloss',
        eval_metric='Accuracy',
        random_seed=42,
        verbose=10
    )
    model.fit(X_train, y_train, eval_set=(X_val, y_val), verbose=False)

    # TEST SET: Ambil dari dataset_embed_test_text_only
    test_data = dataset_embed_test_text_only[name]
    X_test = test_data.drop('label', axis=1)
    y_test = test_data['label']

    y_pred_test = model.predict(X_test)
    y_pred_test_proba = model.predict_proba(X_test)[:, 1]

    acc_test = accuracy_score(y_test, y_pred_test)
    roc_test = roc_auc_score(y_test, y_pred_test_proba)
    f1_test = f1_score(y_test, y_pred_test)

    print(f"\n{name} - TEST")
    print(f"Accuracy : {acc_test:.4f}")
    print(f"ROC AUC  : {roc_test:.4f}")
    print(f"F1 Score : {f1_test:.4f}")



qwen_finetune_text_only - TEST
Accuracy : 0.8981
ROC AUC  : 0.9691
F1 Score : 0.8972


# Combine All Finetune

In [None]:
dataset_embed = {}
dataset_embed_test = {}

# list dataset
list_dataset_img = [("dino_finetune", dataset_dino_finetune)]
list_dataset_cap = [("qwen_finetune", dataset_qwen_finetune)]

for img_name, dataset_img in list_dataset_img:
  for cap_name, dataset_cap in list_dataset_cap:
    #convert menjadi matrix
    img_main_matrix = np.stack(dataset_img['img-prod-main-features'])
    img_review_matrix = np.stack(dataset_img['img-prod-review-features'])
    text_matrix = np.stack(dataset_cap['caption_embeddings'])
    #numpy row menjadi column
    df_img_main_embed = pd.DataFrame(img_main_matrix, columns=[f"img_main_{i}" for i in range(img_main_matrix.shape[1])])
    df_img_review_embed = pd.DataFrame(img_review_matrix, columns=[f"img_review_{i}" for i in range(img_review_matrix.shape[1])])
    df_text_embed = pd.DataFrame(text_matrix, columns=[f"text_feat_{i}" for i in range(text_matrix.shape[1])])
    label = pd.DataFrame(dataset['label'])
    #concat
    combine_dataset = pd.concat([df_img_main_embed,df_img_review_embed,df_text_embed,label],axis=1)
    X_train, X_test, y_train, y_test = train_test_split(combine_dataset.drop('label', axis=1),
                                                        combine_dataset['label'], test_size=0.2, random_state=42,stratify=combine_dataset['label'])
    dataset_embed[f'{img_name} x {cap_name}'] = pd.concat([X_train,y_train],axis=1)
    dataset_embed_test[f'{img_name} x {cap_name}'] = pd.concat([X_test,y_test],axis=1)

In [None]:
for name, data in dataset_embed.items():
    # Split training data
    X = data.drop('label', axis=1)
    y = data['label']
    # Train model
    model = CatBoostClassifier(
        iterations=100,
        learning_rate=0.1,
        loss_function='Logloss',
        eval_metric='Accuracy',
        random_seed=42,
        verbose=10
    )
    # TEST SET: Ambil dari dataset_embed_test
    test_data = dataset_embed_test[name]
    X_test = test_data.drop('label', axis=1)
    y_test = test_data['label']

    model.fit(X, y, eval_set=(X_test,y_test), verbose=False)


    y_pred_test = model.predict(X_test)
    y_pred_test_proba = model.predict_proba(X_test)[:, 1]

    acc_test = accuracy_score(y_test, y_pred_test)
    roc_test = roc_auc_score(y_test, y_pred_test_proba)
    f1_test = f1_score(y_test, y_pred_test)

    print(f"\n{name} - TEST")
    print(f"Accuracy : {acc_test:.4f}")
    print(f"ROC AUC  : {roc_test:.4f}")
    print(f"F1 Score : {f1_test:.4f}")


dino_finetune x qwen_finetune - TEST
Accuracy : 0.9120
ROC AUC  : 0.9691
F1 Score : 0.9116


## Save Best Finetune Model

In [None]:
best_data = dataset_embed['dino_finetune x qwen_finetune']
X = best_data.drop('label', axis=1)
y = best_data['label']

best_model = CatBoostClassifier(
    iterations=100,
    learning_rate=0.1,
    depth=6,
    loss_function='Logloss',
    eval_metric='Accuracy',
    random_seed=42,
    verbose=0
)

best_model.fit(X, y) # Train full dataset

# Simpan model menggunakan pickle
model_filename = 'best_model_finetune.pkl'
with open(model_filename, 'wb') as file:
    pickle.dump(best_model, file)

# Inference Model

In [None]:
import torch
import torch.nn as nn
from transformers import AutoModel, AutoTokenizer, AutoImageProcessor
from peft import get_peft_model, PeftModel, LoraConfig
from PIL import Image
import numpy as np
import joblib
import catboost

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# -------------------------
# 1. Helper load image
# -------------------------
def load_image(path):
    return Image.open(path).convert("RGB")

# -------------------------
# 2. Define SiameseModel (projection head + backbone)
# -------------------------
class SiameseModel(nn.Module):
    def __init__(self, backbone, emb_dim=256, pool="cls", normalize=True):
        super().__init__()
        self.backbone = backbone
        self.pool = pool
        self.normalize = normalize
        hidden_size = getattr(backbone.config, "hidden_size", 1024)
        self.proj = nn.Sequential(
            nn.Linear(hidden_size, max(hidden_size // 2, emb_dim)),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(max(hidden_size // 2, emb_dim), emb_dim)
        )

    def _safe_forward(self, module, **kwargs):
        return module.forward(**kwargs)

    def encode(self, pixel: torch.Tensor):
        out = self._safe_forward(self.backbone, pixel_values=pixel, return_dict=True)
        if hasattr(out, "pooler_output") and out.pooler_output is not None:
            h = out.pooler_output
        else:
            h = out.last_hidden_state[:, 0, :]
        emb = self.proj(h)
        if self.normalize:
            emb = nn.functional.normalize(emb, dim=-1)
        return emb

# -------------------------
# 3. Load DINO backbone + LoRA + projection head
# -------------------------
backbone_dino = AutoModel.from_pretrained("facebook/dinov2-large")
lora_cfg = LoraConfig(
    r=8, lora_alpha=16, target_modules=["query","value"], lora_dropout=0.1,
    bias="none", task_type="FEATURE_EXTRACTION"
)
backbone_dino = get_peft_model(backbone_dino, lora_cfg)
backbone_dino.to(device)

# Load projection head + backbone weights
model_dino = SiameseModel(backbone_dino).to(device)
state_dict = torch.load("/content/drive/MyDrive/siamese_model.pt", map_location=device)
model_dino.load_state_dict(state_dict)
model_dino.eval()

processor_dino = AutoImageProcessor.from_pretrained("facebook/dinov2-large")

# -------------------------
# 4. Load Qwen LoRA from HuggingFace
# -------------------------
tokenizer_qwen = AutoTokenizer.from_pretrained("shidqii/qwen-embed-lora")
base_model_qwen = AutoModel.from_pretrained("Qwen/Qwen3-Embedding-0.6B")
model_qwen_lora = PeftModel.from_pretrained(base_model_qwen, "shidqii/qwen-embed-lora")
model_qwen_lora.to(device)
model_qwen_lora.eval()

# -------------------------
# 5. Load CatBoost
# -------------------------
cat_model = joblib.load("best_model_finetune.pkl")

# -------------------------
# 6. Helpers embed image & text
# -------------------------
@torch.no_grad()
def embed_image(paths):
    imgs = [load_image(p) for p in paths]
    inputs = processor_dino(images=imgs, return_tensors="pt", padding=True).to(device)
    emb = model_dino.encode(inputs["pixel_values"])
    return emb.cpu().numpy()

@torch.no_grad()
def embed_caption(texts):
    embs = []
    for i in range(0, len(texts), 32):
        batch = texts[i:i+32]
        tokens = tokenizer_qwen(batch, padding=True, truncation=True, return_tensors="pt").to(device)
        out = model_qwen_lora.base_model(**tokens)
        last_hidden = out.last_hidden_state
        mask = tokens["attention_mask"].unsqueeze(-1).expand(last_hidden.size())
        mean_pooled = torch.sum(last_hidden * mask, 1) / torch.clamp(mask.sum(1), min=1e-9)
        embs.append(mean_pooled.cpu())
    return np.vstack(embs)

# -------------------------
# 7. Predict function
# -------------------------
def predict(img_main_path, img_review_path, caption):
    img_emb = embed_image([img_main_path, img_review_path])
    caption_emb = embed_caption([caption])[0]

    features = np.concatenate([img_emb[0], img_emb[1], caption_emb])
    return cat_model.predict([features])[0]

# -------------------------
# 8. Example usage
# -------------------------
result = predict(
    "image-main.png",
    "image-review.png",
    "caption"
)
print("Predicted label:", result)