In [None]:
!pip install -U "transformers>=4.40.0" datasets accelerate peft bitsandbytes scikit-learn matplotlib pandas tqdm



In [None]:
import transformers
from transformers import TrainingArguments
import inspect

print("transformers version:", transformers.__version__)
print("TrainingArguments module:", TrainingArguments.__module__)
print("TrainingArguments.__init__ signature:")
print(inspect.signature(TrainingArguments.__init__))


transformers version: 4.57.1
TrainingArguments module: transformers.training_args
TrainingArguments.__init__ signature:


In [None]:
import torch
from datasets import load_dataset
import pandas as pd
import numpy as np
from tqdm.auto import tqdm

from sklearn.metrics import (
    classification_report,
    f1_score,
    roc_auc_score,
    average_precision_score,
    confusion_matrix
)

import matplotlib.pyplot as plt

device = "cuda" if torch.cuda.is_available() else "cpu"
device


'cuda'

In [None]:
dataset = load_dataset("dair-ai/emotion")

dataset

DatasetDict({
    train: Dataset({
        features: ['text', 'label'],
        num_rows: 16000
    })
    validation: Dataset({
        features: ['text', 'label'],
        num_rows: 2000
    })
    test: Dataset({
        features: ['text', 'label'],
        num_rows: 2000
    })
})

In [None]:
label_list = dataset["train"].features["label"].names
label_list

['sadness', 'joy', 'love', 'anger', 'fear', 'surprise']

In [None]:
# 建立 emotion -> risk 的 mapping
emotion_to_risk = {
    "joy": 0,
    "love": 0,
    "surprise": 0,
    "anger": 1,
    "fear": 1,
    "sadness": 2
}

def add_risk(example):
    label_idx = example["label"]
    emotion = label_list[label_idx]
    risk = emotion_to_risk[emotion]
    example["emotion"] = emotion
    example["risk"] = risk
    return example

dataset = dataset.map(add_risk)
dataset["train"][0]


{'text': 'i didnt feel humiliated',
 'label': 0,
 'emotion': 'sadness',
 'risk': 2}

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig

model_name = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16
)

tokenizer = AutoTokenizer.from_pretrained(model_name)
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

base_model = AutoModelForCausalLM.from_pretrained(
    model_name,
    quantization_config=bnb_config,
    device_map="auto"
)
base_model.eval()


LlamaForCausalLM(
  (model): LlamaModel(
    (embed_tokens): Embedding(32000, 2048)
    (layers): ModuleList(
      (0-21): 22 x LlamaDecoderLayer(
        (self_attn): LlamaAttention(
          (q_proj): Linear4bit(in_features=2048, out_features=2048, bias=False)
          (k_proj): Linear4bit(in_features=2048, out_features=256, bias=False)
          (v_proj): Linear4bit(in_features=2048, out_features=256, bias=False)
          (o_proj): Linear4bit(in_features=2048, out_features=2048, bias=False)
        )
        (mlp): LlamaMLP(
          (gate_proj): Linear4bit(in_features=2048, out_features=5632, bias=False)
          (up_proj): Linear4bit(in_features=2048, out_features=5632, bias=False)
          (down_proj): Linear4bit(in_features=5632, out_features=2048, bias=False)
          (act_fn): SiLUActivation()
        )
        (input_layernorm): LlamaRMSNorm((2048,), eps=1e-05)
        (post_attention_layernorm): LlamaRMSNorm((2048,), eps=1e-05)
      )
    )
    (norm): LlamaRMSNorm(

In [None]:
EMOTION_LABELS = ["joy", "love", "surprise", "anger", "fear", "sadness"]

def build_zero_shot_prompt(text: str) -> str:
    return f"""You are an assistant that classifies the emotion of a short social media post.

Given the post, choose ONE emotion from: joy, love, surprise, anger, fear, sadness.

Reply with only the emotion word.

Post: {text}
Emotion:"""

def build_few_shot_prompt(text: str, examples) -> str:
    """
    examples: list of dicts [{"text": ..., "emotion": ...}, ...]
    """
    example_str = ""
    for ex in examples:
        example_str += f"""Post: {ex["text"]}
Emotion: {ex["emotion"]}

"""
    return f"""You are an assistant that classifies the emotion of a short social media post.

Choose ONE emotion from: joy, love, surprise, anger, fear, sadness.

Here are some examples:
{example_str}
Now classify the following post. Reply with only the emotion word.

Post: {text}
Emotion:"""


In [None]:
def decode_emotion_from_output(output_text: str) -> str:
    # 簡單找出第一個出現的 emotion 字
    output_text = output_text.lower()
    for emo in EMOTION_LABELS:
        if emo in output_text:
            return emo
    # fallback: 如果完全沒找到，就標最常見的 low risk 情緒 'joy'
    return "joy"

def emotion_to_risk_label(emotion: str) -> int:
    return emotion_to_risk[emotion]


In [None]:
@torch.no_grad()
def generate_emotion(prompts, model, tokenizer, max_new_tokens=8):
    """
    prompts: list[str]
    回傳: list[emotion_str]
    """
    inputs = tokenizer(prompts, return_tensors="pt", padding=True, truncation=True).to(model.device)
    outputs = model.generate(
        **inputs,
        max_new_tokens=max_new_tokens,
        do_sample=False
    )
    decoded = tokenizer.batch_decode(outputs, skip_special_tokens=True)

    # 從完整輸出中切出最後一行當作回答
    emotions = []
    for full in decoded:
        # 取最後一個 "Emotion:" 後面的內容
        if "Emotion:" in full:
            ans = full.split("Emotion:")[-1].strip()
        else:
            ans = full.strip()
        emo = decode_emotion_from_output(ans)
        emotions.append(emo)
    return emotions


In [None]:
from math import ceil

test_data = dataset["test"]

# 為了 demo，先取前 N 筆
N = 500   # 可以改 2000
texts = test_data["text"][:N]
true_emotion = test_data["emotion"][:N]
true_risk = test_data["risk"][:N]

zero_pred_emotion = []
batch_size = 8

for i in tqdm(range(0, N, batch_size)):
    batch_texts = texts[i:i+batch_size]
    prompts = [build_zero_shot_prompt(t) for t in batch_texts]
    batch_preds = generate_emotion(prompts, base_model, tokenizer)
    zero_pred_emotion.extend(batch_preds)

len(zero_pred_emotion), zero_pred_emotion[:5]


  0%|          | 0/63 [00:00<?, ?it/s]

(500, ['joy', 'joy', 'joy', 'joy', 'joy'])

In [None]:
# emotion → 整數 label / risk label
emo_to_idx = {e: i for i, e in enumerate(EMOTION_LABELS)}

y_true_emo = [emo_to_idx[e] for e in true_emotion]
y_pred_emo_zero = [emo_to_idx[e] for e in zero_pred_emotion]

y_true_risk = true_risk
y_pred_risk_zero = [emotion_to_risk_label(e) for e in zero_pred_emotion]


In [None]:
from sklearn.preprocessing import label_binarize

def evaluate_classification(y_true, y_pred, num_classes, average="macro", task_name=""):
    print(f"=== {task_name} Classification Report ===")
    print(classification_report(y_true, y_pred, digits=4))

    # F1
    f1_macro = f1_score(y_true, y_pred, average="macro")
    f1_weighted = f1_score(y_true, y_pred, average="weighted")

    # one-vs-rest ROC / PR
    y_true_bin = label_binarize(y_true, classes=list(range(num_classes)))
    y_pred_bin = label_binarize(y_pred, classes=list(range(num_classes)))

    try:
        roc = roc_auc_score(y_true_bin, y_pred_bin, average="macro", multi_class="ovr")
    except ValueError:
        roc = np.nan

    try:
        pr_auc = average_precision_score(y_true_bin, y_pred_bin, average="macro")
    except ValueError:
        pr_auc = np.nan

    print(f"F1-macro  : {f1_macro:.4f}")
    print(f"F1-weight : {f1_weighted:.4f}")
    print(f"AUROC     : {roc:.4f}")
    print(f"PR-AUC    : {pr_auc:.4f}")

    # Confusion Matrix
    cm = confusion_matrix(y_true, y_pred, labels=list(range(num_classes)))
    print("Confusion Matrix:")
    print(cm)

    return {
        "f1_macro": f1_macro,
        "f1_weighted": f1_weighted,
        "auroc": roc,
        "pr_auc": pr_auc,
        "confusion_matrix": cm
    }

metrics_zero_emo = evaluate_classification(
    y_true_emo, y_pred_emo_zero, num_classes=len(EMOTION_LABELS), task_name="Zero-shot Emotion"
)

metrics_zero_risk = evaluate_classification(
    y_true_risk, y_pred_risk_zero, num_classes=3, task_name="Zero-shot Risk"
)


=== Zero-shot Emotion Classification Report ===
              precision    recall  f1-score   support

           0     0.3174    0.9669    0.4779       151
           1     0.0000    0.0000    0.0000        39
           2     0.0000    0.0000    0.0000        13
           3     0.4000    0.0506    0.0899        79
           4     1.0000    0.0429    0.0822        70
           5     0.4444    0.0541    0.0964       148

    accuracy                         0.3220       500
   macro avg     0.3603    0.1857    0.1244       500
weighted avg     0.4306    0.3220    0.1986       500

F1-macro  : 0.1244
F1-weight : 0.1986
AUROC     : 0.5128
PR-AUC    : 0.1787
Confusion Matrix:
[[146   1   1   1   0   2]
 [ 35   0   0   2   0   2]
 [ 13   0   0   0   0   0]
 [ 67   3   1   4   0   4]
 [ 61   0   1   3   3   2]
 [138   0   2   0   0   8]]
=== Zero-shot Risk Classification Report ===
              precision    recall  f1-score   support

           0     0.4179    0.9655    0.5833       20

In [None]:
# 從 train 抽 few-shot 示例，每個 emotion 抽 1 個
train_data = dataset["train"]

few_shot_examples = []
selected_emotions = set()

for ex in train_data:
    emo = ex["emotion"]
    if emo not in selected_emotions:
        few_shot_examples.append({"text": ex["text"], "emotion": emo})
        selected_emotions.add(emo)
    if len(selected_emotions) == len(EMOTION_LABELS):
        break

few_shot_examples


[{'text': 'i didnt feel humiliated', 'emotion': 'sadness'},
 {'text': 'im grabbing a minute to post i feel greedy wrong',
  'emotion': 'anger'},
 {'text': 'i am ever feeling nostalgic about the fireplace i will know that it is still on the property',
  'emotion': 'love'},
 {'text': 'ive been taking or milligrams or times recommended amount and ive fallen asleep a lot faster but i also feel like so funny',
  'emotion': 'surprise'},
 {'text': 'i feel as confused about life as a teenager or as jaded as a year old man',
  'emotion': 'fear'},
 {'text': 'i have been with petronas for years i feel that petronas has performed well and made a huge profit',
  'emotion': 'joy'}]

In [None]:
few_pred_emotion = []

for i in tqdm(range(0, N, batch_size)):
    batch_texts = texts[i:i+batch_size]
    prompts = [build_few_shot_prompt(t, few_shot_examples) for t in batch_texts]
    batch_preds = generate_emotion(prompts, base_model, tokenizer)
    few_pred_emotion.extend(batch_preds)

len(few_pred_emotion), few_pred_emotion[:5]


  0%|          | 0/63 [00:00<?, ?it/s]

(500, ['joy', 'joy', 'joy', 'joy', 'joy'])

In [None]:
y_pred_emo_few = [emo_to_idx[e] for e in few_pred_emotion]
y_pred_risk_few = [emotion_to_risk_label(e) for e in few_pred_emotion]

metrics_few_emo = evaluate_classification(
    y_true_emo, y_pred_emo_few, num_classes=len(EMOTION_LABELS), task_name="Few-shot Emotion"
)

metrics_few_risk = evaluate_classification(
    y_true_risk, y_pred_risk_few, num_classes=3, task_name="Few-shot Risk"
)


=== Few-shot Emotion Classification Report ===
              precision    recall  f1-score   support

           0     0.3103    0.8940    0.4608       151
           1     0.0000    0.0000    0.0000        39
           2     0.0417    0.0769    0.0541        13
           3     0.5238    0.1392    0.2200        79
           4     1.0000    0.0857    0.1579        70
           5     0.7000    0.0473    0.0886       148

    accuracy                         0.3200       500
   macro avg     0.4293    0.2072    0.1636       500
weighted avg     0.5248    0.3200    0.2236       500

F1-macro  : 0.1636
F1-weight : 0.2236
AUROC     : 0.5246
PR-AUC    : 0.1921
Confusion Matrix:
[[135   3  12   1   0   0]
 [ 35   0   0   4   0   0]
 [ 12   0   1   0   0   0]
 [ 63   0   4  11   0   1]
 [ 57   0   3   2   6   2]
 [133   1   4   3   0   7]]
=== Few-shot Risk Classification Report ===
              precision    recall  f1-score   support

           0     0.4276    0.9754    0.5946       203


In [None]:
def build_train_example(text, emotion):
    # 簡單的指令格式
    return f"""You are an assistant that classifies the emotion of a short social media post.

Post: {text}
Emotion:""", emotion

def convert_to_sft_format(split):
    prompts = []
    targets = []
    for ex in split:
        p, t = build_train_example(ex["text"], ex["emotion"])
        prompts.append(p)
        targets.append(t)
    return prompts, targets

train_prompts, train_targets = convert_to_sft_format(dataset["train"])
val_prompts, val_targets = convert_to_sft_format(dataset["validation"])

len(train_prompts), train_prompts[0], train_targets[0]


(16000,
 'You are an assistant that classifies the emotion of a short social media post.\n\nPost: i didnt feel humiliated\nEmotion:',
 'sadness')

In [None]:
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training

lora_config = LoraConfig(
    r=8,
    lora_alpha=16,
    target_modules=["q_proj", "v_proj", "k_proj", "o_proj"],  # 針對 attention layer
    lora_dropout=0.1,
    bias="none",
    task_type="CAUSAL_LM"
)

lora_model = AutoModelForCausalLM.from_pretrained(
    model_name,
    quantization_config=bnb_config,
    device_map="auto"
)

lora_model = prepare_model_for_kbit_training(lora_model)
lora_model = get_peft_model(lora_model, lora_config)
lora_model.print_trainable_parameters()


trainable params: 2,252,800 || all params: 1,102,301,184 || trainable%: 0.2044


In [None]:
from torch.utils.data import Dataset

class EmotionSFTDataset(Dataset):
    def __init__(self, prompts, targets, tokenizer, max_length=256):
        self.prompts = prompts
        self.targets = targets
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.prompts)

    def __getitem__(self, idx):
        prompt = self.prompts[idx]
        target = self.targets[idx]
        # 我們希望模型學「prompt + target」，target 部分當 label
        full_text = prompt + " " + target
        tokenized = self.tokenizer(
            full_text,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt"
        )
        input_ids = tokenized["input_ids"][0]
        attention_mask = tokenized["attention_mask"][0]
        # label = input_ids（一般 SFT 作法）
        labels = input_ids.clone()
        return {
            "input_ids": input_ids,
            "attention_mask": attention_mask,
            "labels": labels
        }

train_sft_ds = EmotionSFTDataset(train_prompts, train_targets, tokenizer)
val_sft_ds = EmotionSFTDataset(val_prompts, val_targets, tokenizer)
len(train_sft_ds)


16000

In [None]:
import os
os.environ["WANDB_DISABLED"] = "true"

In [None]:
from transformers import TrainingArguments, Trainer

output_dir = "./tinyllama-emotion-lora"

training_args = TrainingArguments(
    output_dir=output_dir,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    gradient_accumulation_steps=2,
    learning_rate=2e-4,
    num_train_epochs=1,
    logging_steps=50,
    save_steps=200,
    save_total_limit=2,
    report_to="none",              # 關掉 wandb / tensorboard 等 logger
    fp16=torch.cuda.is_available() # 或 bf16 也可以，但 fp16 比較穩
)

trainer = Trainer(
    model=lora_model,
    args=training_args,
    train_dataset=train_sft_ds,
    eval_dataset=val_sft_ds,
)

trainer.train()





  return fn(*args, **kwargs)


Step,Training Loss
50,2.809
100,0.307
150,0.2873
200,0.2929
250,0.2899
300,0.281
350,0.2899
400,0.2885
450,0.2862
500,0.2813


  return fn(*args, **kwargs)
  return fn(*args, **kwargs)
  return fn(*args, **kwargs)
  return fn(*args, **kwargs)


TrainOutput(global_step=1000, training_loss=0.41119957065582274, metrics={'train_runtime': 2578.7289, 'train_samples_per_second': 6.205, 'train_steps_per_second': 0.388, 'total_flos': 2.5479541161984e+16, 'train_loss': 0.41119957065582274, 'epoch': 1.0})

In [None]:
lora_model.eval()

lora_pred_emotion = []

for i in tqdm(range(0, N, batch_size)):
    batch_texts = texts[i:i+batch_size]
    prompts = [build_zero_shot_prompt(t) for t in batch_texts]  # 同樣的 prompt，但 model 換成 lora_model
    batch_preds = generate_emotion(prompts, lora_model, tokenizer)
    lora_pred_emotion.extend(batch_preds)

len(lora_pred_emotion), lora_pred_emotion[:5]


In [None]:
y_pred_emo_lora = [emo_to_idx[e] for e in lora_pred_emotion]
y_pred_risk_lora = [emotion_to_risk_label(e) for e in lora_pred_emotion]

metrics_lora_emo = evaluate_classification(
    y_true_emo, y_pred_emo_lora, num_classes=len(EMOTION_LABELS), task_name="LoRA Emotion"
)

metrics_lora_risk = evaluate_classification(
    y_true_risk, y_pred_risk_lora, num_classes=3, task_name="LoRA Risk"
)


In [None]:
# 簡單把 risk label 映射成 "高風險機率"
def risk_to_phigh(risk_label: int) -> float:
    if risk_label == 2:
        return 1.0
    elif risk_label == 1:
        return 0.5
    else:
        return 0.0

p_high_lora = np.array([risk_to_phigh(r) for r in y_pred_risk_lora])

# 建一個 DataFrame 方便畫圖
df_risk = pd.DataFrame({
    "index": np.arange(N),
    "risk_true": y_true_risk,
    "risk_pred_lora": y_pred_risk_lora,
    "p_high_lora": p_high_lora
})

df_risk.head()


In [None]:
plt.figure(figsize=(12, 4))
plt.plot(df_risk["index"], df_risk["p_high_lora"], marker="o", linestyle="-", linewidth=1)
plt.xlabel("Sample index")
plt.ylabel("P(high_risk)")
plt.title("High Risk Probability over Samples (LoRA)")
plt.grid(True, alpha=0.3)
plt.show()


In [None]:
window_size = 50
df_risk["p_high_roll"] = df_risk["p_high_lora"].rolling(window=window_size, min_periods=1).mean()

# 把 rolling 結果 reshape 成 2D，畫 heatmap
# 例如每 row 50 個樣本
step = window_size
num_rows = int(np.ceil(N / step))
heat_data = []

for i in range(num_rows):
    segment = df_risk["p_high_roll"].iloc[i*step:(i+1)*step].to_numpy()
    # 補齊長度
    if len(segment) < step:
        segment = np.pad(segment, (0, step - len(segment)), constant_values=np.nan)
    heat_data.append(segment)

heat_data = np.array(heat_data)

plt.figure(figsize=(10, 6))
plt.imshow(heat_data, aspect="auto", interpolation="nearest")
plt.colorbar(label="P(high_risk) (rolling mean)")
plt.xlabel("Index within window")
plt.ylabel("Window #")
plt.title(f"High Risk Heatmap (window={window_size})")
plt.show()


In [None]:
summary_emo = pd.DataFrame([
    {"method": "zero-shot", "task": "emotion", **metrics_zero_emo},
    {"method": "few-shot", "task": "emotion", **metrics_few_emo},
    {"method": "LoRA", "task": "emotion", **metrics_lora_emo},
])

summary_risk = pd.DataFrame([
    {"method": "zero-shot", "task": "risk", **metrics_zero_risk},
    {"method": "few-shot", "task": "risk", **metrics_few_risk},
    {"method": "LoRA", "task": "risk", **metrics_lora_risk},
])

summary_emo, summary_risk
