In [None]:
# Instalasi library yang diperlukan dari Hugging Face
!pip install -q transformers ftfy regex
!pip install --upgrade -q transformers
!pip install -q accelerate

# Impor library standar dan dari Hugging Face
import os
import json
import torch
import pandas as pd
import re
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from transformers import CLIPProcessor, CLIPModel
from torch.optim import AdamW
from sklearn.model_selection import train_test_split
from tqdm.notebook import tqdm
import torch.nn as nn
import torch.nn.functional as F

# Cek ketersediaan GPU
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

In [None]:
import ast

# Path ke direktori dataset di Kaggle
data_dir = "/kaggle/input/data-of-multimodal-sarcasm-detection"

# Fungsi untuk memuat data dari file .txt
def load_data_from_txt(filepath):
    records = []
    with open(filepath, 'r', encoding='utf-8') as f:
        for line in f:
            try:
                # Menggunakan ast.literal_eval untuk mengubah string menjadi list
                # Contoh: "['id', 'teks', 1, 1]" -> ['id', 'teks', 1, 1]
                data_list = ast.literal_eval(line.strip())

                # Ekstrak data berdasarkan posisi di dalam list
                tweet_id = data_list[0]
                text = data_list[1]
                label = int(data_list[2]) # Label sarkasme ada di posisi ke-3

                records.append({'id': tweet_id, 'text': text, 'sarcasm': label})
            except (ValueError, SyntaxError):
                # Lewati baris yang formatnya rusak atau tidak bisa di-parse
                # print(f"Skipping malformed line: {line}")
                continue
    return pd.DataFrame(records)

# Memuat data dari masing-masing file
train_df = load_data_from_txt(os.path.join(data_dir, 'text', 'train.txt'))
val_df = load_data_from_txt(os.path.join(data_dir, 'text', 'valid2.txt'))
test_df = load_data_from_txt(os.path.join(data_dir, 'text', 'test2.txt'))

# Membuat path lengkap untuk setiap gambar
image_folder = os.path.join(data_dir, 'dataset_image')
train_df['image_path'] = train_df['id'].apply(lambda x: os.path.join(image_folder, f"{x}.jpg"))
val_df['image_path'] = val_df['id'].apply(lambda x: os.path.join(image_folder, f"{x}.jpg"))
test_df['image_path'] = test_df['id'].apply(lambda x: os.path.join(image_folder, f"{x}.jpg"))

# Memastikan hanya baris dengan gambar yang ada yang diproses
train_df = train_df[train_df['image_path'].apply(os.path.exists)].dropna()
val_df = val_df[val_df['image_path'].apply(os.path.exists)].dropna()
test_df = test_df[test_df['image_path'].apply(os.path.exists)].dropna()

# Mengubah tipe data kolom 'sarcasm' menjadi integer
train_df['sarcasm'] = train_df['sarcasm'].astype(int)
val_df['sarcasm'] = val_df['sarcasm'].astype(int)
test_df['sarcasm'] = test_df['sarcasm'].astype(int)

# --- PERUBAHAN DI SINI ---
sarcastic_samples = train_df[train_df['sarcasm'] == 1].sample(n=16, random_state=42)
non_sarcastic_samples = train_df[train_df['sarcasm'] == 0].sample(n=16, random_state=42)
train_df_16shot = pd.concat([sarcastic_samples, non_sarcastic_samples])

print(f"Ukuran data latih (16-shot): {len(train_df_512shot)}")
print(f"Ukuran data validasi: {len(val_df)}")
print(f"Ukuran data uji: {len(test_df)}")

# Tampilkan beberapa contoh data
train_df_16shot.head()

In [None]:
class SarcasmDataset(Dataset):
    def __init__(self, dataframe, processor):
        self.dataframe = dataframe
        self.processor = processor
        self.texts = dataframe['text'].tolist()
        self.image_paths = dataframe['image_path'].tolist()
        self.labels = dataframe['sarcasm'].tolist()

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        text = self.texts[idx]
        image = Image.open(self.image_paths[idx]).convert("RGB")
        label = torch.tensor(self.labels[idx], dtype=torch.long)

        # --- BAGIAN YANG DIPERBAIKI ---\n        # Proses input dengan padding ke panjang maksimal dan truncation
        # Ini memastikan semua output tensor teks memiliki ukuran yang sama
        inputs = self.processor(
            text=[text],
            images=image,
            return_tensors="pt",
            padding="max_length",  # Ubah dari True menjadi "max_length"
            max_length=77,         # Panjang standar untuk model CLIP
            truncation=True        # Pastikan truncation aktif
        )
        # -----------------------------\n
        return {
            'input_ids': inputs['input_ids'].squeeze(0),
            'attention_mask': inputs['attention_mask'].squeeze(0),
            'pixel_values': inputs['pixel_values'].squeeze(0),
            'labels': label
        }

In [None]:
class CueLearningSarcasmModel(nn.Module):
    def __init__(self, clip_model_name="openai/clip-vit-large-patch14"):
        super().__init__()
        self.clip = CLIPModel.from_pretrained(clip_model_name)
        self.processor = CLIPProcessor.from_pretrained(clip_model_name)

        for param in self.clip.parameters():
            param.requires_grad = False

        text_prompt_length = 12
        image_prompt_length = 20
        sarcasm_prompt_length = 8
        d_model = self.clip.text_embed_dim

        self.text_prompts = nn.Parameter(torch.randn(text_prompt_length, d_model))
        self.image_prompts = nn.Parameter(torch.randn(image_prompt_length, self.clip.vision_embed_dim))

        sarcasm_texts = ["a sarcastic tweet", "this is sarcasm"]
        non_sarcasm_texts = ["a normal tweet", "this is not sarcasm"]
        sarcasm_tokens = self.processor(text=sarcasm_texts, return_tensors="pt", padding=True, truncation=True)
        non_sarcasm_tokens = self.processor(text=non_sarcasm_texts, return_tensors="pt", padding=True, truncation=True)

        with torch.no_grad():
            sarcasm_word_embeds = self.clip.text_model.embeddings.token_embedding(sarcasm_tokens.input_ids).mean(dim=0)
            non_sarcasm_word_embeds = self.clip.text_model.embeddings.token_embedding(non_sarcasm_tokens.input_ids).mean(dim=0)

        self.sarcasm_prompt_embeds = nn.Parameter(torch.cat([
            torch.randn(sarcasm_prompt_length, d_model),
            sarcasm_word_embeds
        ], dim=0))
        self.non_sarcasm_prompt_embeds = nn.Parameter(torch.cat([
            torch.randn(sarcasm_prompt_length, d_model),
            non_sarcasm_word_embeds
        ], dim=0))

    def _prepare_4d_attention_mask(self, mask_2d, dtype, device):
        """Mempersiapkan 2D padding mask menjadi 4D additive mask."""
        # Ubah 2D mask [B, S] menjadi 4D [B, 1, 1, S]
        mask_4d = mask_2d.to(dtype).unsqueeze(1).unsqueeze(1)
        # Invert: 1 -> 0, 0 -> 1
        inverted_mask = 1.0 - mask_4d
        # Ubah menjadi additive mask: 0 -> 0, 1 -> -inf
        return inverted_mask * torch.finfo(dtype).min

    def _prepare_4d_causal_attention_mask(self, shape, dtype, device):
        """Mempersiapkan 4D causal mask."""
        bsz, seq_len = shape[0], shape[1]
        # Buat matriks segitiga atas berisi -inf
        causal_mask = torch.empty((bsz, seq_len, seq_len), dtype=dtype, device=device)
        causal_mask.fill_(torch.finfo(dtype).min)
        causal_mask.triu_(1)
        # Tambah dimensi untuk multi-head attention
        return causal_mask.unsqueeze(1)

    def forward(self, input_ids, attention_mask, pixel_values):
        # 1. Dapatkan embedding awal
        inputs_embeds = self.clip.text_model.embeddings.token_embedding(input_ids)
        vision_outputs = self.clip.vision_model(pixel_values=pixel_values)
        image_embeds = vision_outputs[1]

        # 2. Tambahkan prompt ke text embedding
        prompted_text_embeds = torch.cat([
            self.text_prompts.unsqueeze(0).expand(inputs_embeds.shape[0], -1, -1),
            inputs_embeds
        ], dim=1)

        # 3. Buat padding mask 2D yang diperluas
        prompt_attention_mask = torch.ones(
            prompted_text_embeds.shape[0], self.text_prompts.shape[0],
            dtype=attention_mask.dtype, device=input_ids.device
        )
        extended_attention_mask_2d = torch.cat([prompt_attention_mask, attention_mask], dim=1)

        # === FIX FINAL: Buat dan gabungkan mask secara manual ===
        # Buat 4D padding mask dari 2D mask
        padding_mask_4d = self._prepare_4d_attention_mask(
            extended_attention_mask_2d, prompted_text_embeds.dtype, input_ids.device
        )
        # Buat 4D causal mask
        causal_mask_4d = self._prepare_4d_causal_attention_mask(
            prompted_text_embeds.shape, prompted_text_embeds.dtype, input_ids.device
        )
        # Gabungkan keduanya menjadi satu mask final
        final_attention_mask = padding_mask_4d + causal_mask_4d
        # === END FIX FINAL ===

        # 4. Proses melalui text encoder dengan satu mask final
        text_encoder_outputs = self.clip.text_model.encoder(
            inputs_embeds=prompted_text_embeds,
            attention_mask=final_attention_mask,  # Hanya gunakan mask ini
        )
        last_hidden_state = text_encoder_outputs[0]
        normed_hidden_state = self.clip.text_model.final_layer_norm(last_hidden_state)

        eos_token_pos = input_ids.argmax(dim=-1)
        shifted_eos_pos = eos_token_pos + self.text_prompts.shape[0]
        batch_indices = torch.arange(normed_hidden_state.shape[0], device=input_ids.device)
        text_features = normed_hidden_state[batch_indices, shifted_eos_pos]

        # 5. Proses image features
        image_features = self.clip.visual_projection(image_embeds)
        text_features_proj = self.clip.text_projection(text_features)

        # 6. Fusion multi-modal
        multi_modal_features = (text_features_proj + image_features) / 2.0
        multi_modal_features = F.normalize(multi_modal_features, p=2, dim=-1)

        # 7. Dapatkan fitur untuk prompt sarkasme/non-sarkasme
        def get_prompt_features(prompt_embeds):
            prompt_embeds_b1 = prompt_embeds.unsqueeze(0)
            # Karena prompt ini tidak di-padding, kita hanya butuh causal mask
            causal_mask = self._prepare_4d_causal_attention_mask(
                prompt_embeds_b1.shape, prompt_embeds_b1.dtype, prompt_embeds_b1.device
            )
            encoder_out = self.clip.text_model.encoder(
                inputs_embeds=prompt_embeds_b1,
                attention_mask=causal_mask
            )
            normed_out = self.clip.text_model.final_layer_norm(encoder_out[0])
            features = normed_out[:, -1, :]
            return self.clip.text_projection(features)

        sarcasm_prompt_features = get_prompt_features(self.sarcasm_prompt_embeds)
        non_sarcasm_prompt_features = get_prompt_features(self.non_sarcasm_prompt_embeds)

        sarcasm_prompt_features = F.normalize(sarcasm_prompt_features, p=2, dim=-1)
        non_sarcasm_prompt_features = F.normalize(non_sarcasm_prompt_features, p=2, dim=-1)

        # 8. Hitung Probabilitas Sarkasme
        sim_sarcasm = F.cosine_similarity(multi_modal_features, sarcasm_prompt_features.squeeze(0))
        sim_non_sarcasm = F.cosine_similarity(multi_modal_features, non_sarcasm_prompt_features.squeeze(0))

        logits = torch.stack([sim_non_sarcasm, sim_sarcasm], dim=1) * self.clip.logit_scale.exp()
        return logits

In [None]:
# Inisialisasi model dan pindahkan ke GPU
model = CueLearningSarcasmModel().to(device)
processor = model.processor

# --- PERUBAHAN DI SINI ---
# Membuat instance Dataset dan DataLoader menggunakan data 512-shot
train_dataset = SarcasmDataset(train_df_512shot, processor)
val_dataset = SarcasmDataset(val_df, processor)

# Ukuran batch disesuaikan untuk dataset yang lebih besar
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)

# Optimizer (hanya melatih parameter prompt yang kita buat)
learnable_params = [p for p in model.parameters() if p.requires_grad]
optimizer = AdamW(learnable_params, lr=2e-3) # Learning rate dari paper
criterion = nn.CrossEntropyLoss()

In [None]:
from sklearn.metrics import accuracy_score, f1_score

# Re-initialize the model to ensure the fix is applied
model = CueLearningSarcasmModel().to(device)
processor = model.processor

# --- PERUBAHAN DI SINI ---
# Re-create Datasets and DataLoaders menggunakan data 512-shot
train_dataset = SarcasmDataset(train_df_16shot, processor)
val_dataset = SarcasmDataset(val_df, processor)

# Ukuran batch disesuaikan
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)

# Optimizer (only trains the learnable prompt parameters)
learnable_params = [p for p in model.parameters() if p.requires_grad]
optimizer = AdamW(learnable_params, lr=2e-3)
criterion = nn.CrossEntropyLoss()

# --- Training and Evaluation Loop ---
num_epochs = 1 # Anda mungkin ingin menyesuaikan jumlah epoch

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}")

    for batch in progress_bar:
        # Move batch data to the device
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        pixel_values = batch['pixel_values'].to(device)
        labels = batch['labels'].to(device)

        # Forward pass
        outputs = model(input_ids, attention_mask, pixel_values)
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        progress_bar.set_postfix({'loss': loss.item()})

    avg_train_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch+1} | Average Training Loss: {avg_train_loss:.4f}")

    # --- Evaluation ---
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for batch in val_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            pixel_values = batch['pixel_values'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(input_ids, attention_mask, pixel_values)
            preds = torch.argmax(outputs, dim=1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    print(f"Validation Accuracy: {acc:.4f} | Validation F1-Score: {f1:.4f}\n")