# import


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!git clone https://github.com/uclnlp/emoji2vec.git
!cd emoji2vec
!wget http://nlp.stanford.edu/data/glove.6B.zip
!unzip glove.6B.zip
!pip install gensim
# Step 1. Clone repo 並安裝相依套件
!git clone https://github.com/WING-NUS/ELCo.git
%cd ELCo
!pip install -r /content/ELCo/scripts/requirements.txt

In [None]:
import pandas as pd
import numpy as np
train_df = pd.read_csv('/content/drive/MyDrive/final_balanced_training_set(1).csv')
test_df = pd.read_csv('/content/drive/MyDrive/final_test_set_processed(1).csv')
label_map = {"Negative": 0, "Neutral": 1, "Positive": 2}
train_df["Sentiment"] = train_df["Sentiment"].map(label_map)
test_df["Sentiment"] = test_df["Sentiment"].map(label_map)
train_df["Sentiment"] = train_df["Sentiment"].astype(int)
test_df["Sentiment"] = test_df["Sentiment"].astype(int)

# only text


In [None]:
import os
import re
import torch
import numpy as np
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from transformers import RobertaTokenizer, RobertaForSequenceClassification, Trainer, TrainingArguments
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score
from sklearn.preprocessing import label_binarize
from tqdm import tqdm

# === 資料集定義 ===
class TextOnlyDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_length=128):
        self.texts = dataframe["CommentText"].tolist()
        self.labels = dataframe["Sentiment"].tolist()
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        encoding = self.tokenizer(
            self.texts[idx],
            truncation=True,
            padding="max_length",
            max_length=self.max_length,
            return_tensors="pt"
        )
        item = {key: val.squeeze(0) for key, val in encoding.items()}
        item["labels"] = torch.tensor(self.labels[idx])
        return item

# === 評估指標 ===
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=1)

    acc = accuracy_score(labels, preds)
    macro_f1 = f1_score(labels, preds, average='macro')
    precision = precision_score(labels, preds, average='macro')
    recall = recall_score(labels, preds, average='macro')

    try:
        y_true_bin = label_binarize(labels, classes=list(range(logits.shape[1])))
        auc = roc_auc_score(y_true=y_true_bin, y_score=logits, average='macro', multi_class='ovr')
    except ValueError:
        auc = 0.0

    return {
        "accuracy": acc,
        "macro_f1": macro_f1,
        "precision": precision,
        "recall": recall,
        "auc": auc
    }

# === 訓練參數與模型設定 ===
tokenizer = RobertaTokenizer.from_pretrained("roberta-base")
model = RobertaForSequenceClassification.from_pretrained("roberta-base", num_labels=3)

# 載入資料（你要自行準備 train_df, test_df）
# train_df = pd.read_csv("train.csv")
# test_df = pd.read_csv("test.csv")

train_dataset = TextOnlyDataset(train_df, tokenizer)
test_dataset = TextOnlyDataset(test_df, tokenizer)

os.environ["WANDB_DISABLED"] = "true"  # 防止自動連線到 wandb

training_args = TrainingArguments(
    output_dir="./results_text_only",
    num_train_epochs=3,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    logging_dir="./logs_text_only",
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    compute_metrics=compute_metrics
)

# === 開始訓練與評估 ===
trainer.train()
results = trainer.evaluate()
print(results)

# === 推論 ===
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

test_loader = DataLoader(test_dataset, batch_size=16)

true_labels = []
pred_labels = []
texts = test_df["CommentText"].tolist()

# Emoji 抽取
def extract_emojis(text):
    return ' '.join(re.findall(r"[^\w\s,.!?\'\"@#$%^&*()<>+=:;~`]+", text))

emoji_lists = [extract_emojis(t) for t in texts]

# 推論
for batch in tqdm(test_loader, desc="🔍 Predicting"):
    input_ids = batch["input_ids"].to(device)
    attention_mask = batch["attention_mask"].to(device)
    labels = batch["labels"].to(device)

    with torch.no_grad():
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        logits = outputs.logits
        preds = torch.argmax(logits, dim=1)

    true_labels.extend(labels.cpu().tolist())
    pred_labels.extend(preds.cpu().tolist())

# 輸出結果
df_result = pd.DataFrame({
    "text": texts,
    "true_label": true_labels,
    "pred_label": pred_labels,
    "emoji_list": emoji_lists
})

df_result.to_csv("youtube_emoji_sentiment_onlytext.csv", index=False)
print("✅ 匯出完成：youtube_emoji_sentiment_onlytext.csv")


# emoji2vec

In [None]:


# Now try importing gensim and KeyedVectors again
from gensim.models import KeyedVectors
import re
emoji_model = KeyedVectors.load_word2vec_format("/content/emoji2vec/pre-trained/emoji2vec.bin", binary=True)
train_df = pd.read_csv('/content/drive/MyDrive/final_balanced_training_set(1).csv')
test_df = pd.read_csv('/content/drive/MyDrive/final_test_set_processed(1).csv')
# 範例：取得 😂 的向量
vec = emoji_model['😂']
print(vec.shape)

import numpy as np

def get_emoji_vector(emojis):
    vectors = [emoji_model[emoji] for emoji in emojis if emoji in emoji_model]
    if len(vectors) == 0:
        return np.zeros(emoji_model.vector_size)
    return np.mean(vectors, axis=0)

emoji_pattern = re.compile("[\U00010000-\U0010ffff]", flags=re.UNICODE)


train_df["emojis"] = train_df["CommentText"].apply(lambda text: emoji_pattern.findall(str(text)))
test_df["emojis"] = test_df["CommentText"].apply(lambda text: emoji_pattern.findall(str(text)))
train_df["emoji_vec"] = train_df["emojis"].apply(get_emoji_vector)
test_df["emoji_vec"] = test_df["emojis"].apply(get_emoji_vector)
label_map = {"Negative": 0, "Neutral": 1, "Positive": 2}

train_df["Sentiment"] = train_df["Sentiment"].map(label_map)
test_df["Sentiment"] = test_df["Sentiment"].map(label_map)

print(train_df.head())
## roberta + emoji2vec
import torch
import torch.nn as nn
from transformers import RobertaModel
from sklearn.metrics import accuracy_score, f1_score , precision_score, recall_score, roc_auc_score
from torch.utils.data import Dataset, DataLoader
from transformers import RobertaTokenizer
import numpy as np # Import numpy

# Define the dataset class as before
class EmojiSentimentDataset(Dataset):
    def __init__(self, dataframe, tokenizer, task="Sentiment", max_length=128):
        self.texts = dataframe["CommentText"].tolist()
        self.emoji_vecs = dataframe["emoji_vec"].tolist()
        self.labels = dataframe[task].tolist()
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        emoji_vector_data = self.emoji_vecs[idx]
        if not isinstance(emoji_vector_data, (list, np.ndarray)):

             print(f"Warning: Invalid emoji_vec at index {idx}. Using zero vector.")
             emoji_vector = torch.zeros(300, dtype=torch.float32)
        else:
            emoji_vector = torch.tensor(emoji_vector_data, dtype=torch.float32)

        label_value = self.labels[idx]

        if not isinstance(label_value, int):
             print(f"Warning: Invalid label at index {idx}. Skipping this item or using a default label if appropriate.")

             raise ValueError(f"Invalid label type at index {idx}: {type(label_value)}")


        label = torch.tensor(label_value, dtype=torch.long)

        encoding = self.tokenizer(
            text,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors="pt"
        )

        return {
            "input_ids": encoding["input_ids"].squeeze(0),
            "attention_mask": encoding["attention_mask"].squeeze(0),
            "emoji_vec": emoji_vector,
            "labels": label
        }

# Define the model, metrics, and collate_fn as before
class RobertaWithEmoji(nn.Module):
    def __init__(self, num_labels=3, emoji_dim=300):
        super().__init__()
        self.roberta = RobertaModel.from_pretrained("roberta-base")
        self.dropout = nn.Dropout(0.1)
        self.classifier = nn.Linear(self.roberta.config.hidden_size + emoji_dim, num_labels)

    def forward(self, input_ids, attention_mask, emoji_vec, labels=None): # Add labels to forward for Trainer to pass it
        outputs = self.roberta(input_ids=input_ids, attention_mask=attention_mask)
        cls_output = outputs.last_hidden_state[:, 0, :]  # 取 [CLS] 向量
        combined = torch.cat((cls_output, emoji_vec), dim=1)
        logits = self.classifier(self.dropout(combined))

        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.classifier.out_features), labels.view(-1))

        return (loss, logits) if loss is not None else logits

from sklearn.preprocessing import label_binarize
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, precision_score, recall_score
import numpy as np

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=1)

    acc = accuracy_score(labels, preds)
    macro_f1 = f1_score(labels, preds, average='macro')
    precision = precision_score(labels, preds, average='macro')
    recall = recall_score(labels, preds, average='macro')

    try:
        labels = np.array(labels)
        n_classes = logits.shape[1]
        y_true_bin = label_binarize(labels, classes=list(range(n_classes)))

        auc = roc_auc_score(
            y_true=y_true_bin,
            y_score=logits,
            multi_class='ovr',
            average='macro'
        )
    except ValueError as e:
        print("⚠️ ROC AUC 計算錯誤:", e)
        auc = 0.0

    return {
        "accuracy": acc,
        "macro_f1": macro_f1,
        "precision": precision,
        "recall": recall,
        "auc": auc
    }

def collate_fn(batch):


    input_ids = torch.stack([item["input_ids"] for item in batch])
    attention_mask = torch.stack([item["attention_mask"] for item in batch])
    emoji_vec = torch.stack([item["emoji_vec"] for item in batch])
    labels = torch.stack([item["labels"] for item in batch])

    return {
        "input_ids": input_ids,
        "attention_mask": attention_mask,
        "emoji_vec": emoji_vec,
        "labels": labels
    }


initial_test_rows = len(test_df)
test_df = test_df[test_df["emoji_vec"].notnull()]
test_df = test_df[test_df["emoji_vec"].apply(lambda x: isinstance(x, (list, np.ndarray)) and len(x) == 300)]
cleaned_test_rows = len(test_df)
print(f"Cleaned test_df: Removed {initial_test_rows - cleaned_test_rows} rows with invalid emoji_vec. Remaining rows: {cleaned_test_rows}")

tokenizer = RobertaTokenizer.from_pretrained("roberta-base")
test_dataset = EmojiSentimentDataset(test_df, tokenizer, task="Sentiment")
train_dataset = EmojiSentimentDataset(train_df, tokenizer, task="Sentiment")


from transformers import TrainingArguments, Trainer
import os

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = RobertaWithEmoji(num_labels=3).to(device)

os.environ["WANDB_DISABLED"] = "true"

training_args = TrainingArguments(
    output_dir="./results",
    num_train_epochs=3,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    logging_dir="./logs",
    logging_steps=100,
    do_eval=True,
    do_train=True,
    learning_rate=2e-5,
    eval_strategy="epoch",  # ✅ 每個 epoch 都 evaluate

)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    compute_metrics=compute_metrics,
    data_collator=collate_fn
)

trainer.train()

results = trainer.evaluate()
print("📊 評估結果：", results)
# 切換到評估模式
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# 準備 Dataloader
test_loader = DataLoader(test_dataset, batch_size=16, collate_fn=collate_fn)

# 儲存欄位
texts = test_df['CommentText'].tolist()
true_labels = []
pred_labels = []
emoji_lists = []

# 預測 loop
for i, batch in enumerate(tqdm(test_loader, desc="🔍 Predicting")):
    input_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    emoji_vec = batch['emoji_vec'].to(device)
    labels = batch['labels'].to(device)

    with torch.no_grad():
        outputs = model(input_ids=input_ids, attention_mask=attention_mask, emoji_vec=emoji_vec)
        logits = outputs["logits"]
        preds = torch.argmax(logits, dim=1)

    true_labels.extend(labels.cpu().tolist())
    pred_labels.extend(preds.cpu().tolist())

# 抽取 emoji 函數
def extract_emojis(text):
    return ' '.join(re.findall(r"[^\w\s,.!?\'\"@#$%^&*()<>+=:;~`]+", text))

emoji_lists = [extract_emojis(t) for t in texts]

# 建立 DataFrame 並匯出
df_result = pd.DataFrame({
    "text": texts,
    "true_label": true_labels,
    "pred_label": pred_labels,
    "emoji_list": emoji_lists
})

df_result.to_csv("youtube_emoji_sentiment_emoji2vec.csv", index=False)
print("✅ 匯出完成：youtube_emoji_sentiment_emoji2vec.csv")

# RoBERTa + emoji2vec(position)

In [None]:
import os
import re
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from gensim.models import KeyedVectors
from transformers import BertTokenizer, BertModel, Trainer, TrainingArguments
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score

train_df = pd.read_csv('/content/drive/MyDrive/final_balanced_training_set(1).csv')
test_df = pd.read_csv('/content/drive/MyDrive/final_test_set_processed(1).csv')

emoji_model = KeyedVectors.load_word2vec_format("/content/emoji2vec/pre-trained/emoji2vec.bin", binary=True)

def get_emoji_vector(emojis):
    vectors = [emoji_model[emoji] for emoji in emojis if emoji in emoji_model]
    if len(vectors) == 0:
        return np.zeros(emoji_model.vector_size)
    return np.mean(vectors, axis=0)

emoji_pattern = re.compile("[\U00010000-\U0010ffff]", flags=re.UNICODE)


sentiment_map = {"Negative": 0, "Neutral": 1, "Positive": 2}
train_df["Sentiment"] = train_df["Sentiment"].map(sentiment_map)
test_df["Sentiment"] = test_df["Sentiment"].map(sentiment_map)

def preprocess_df(df):
    emoji_vecs = []
    emoji_positions = []
    for _, row in df.iterrows():
        text = str(row["CommentText"])
        emojis = emoji_pattern.findall(text)
        emoji_combo = ''.join(emojis)
        vec = get_emoji_vector(emojis)
        emoji_vecs.append(vec)
        idx = text.find(emoji_combo)
        ratio = idx / max(1, len(text))
        emoji_positions.append(0 if ratio < 0.5 else 1)
    df["emoji2vec_vec"] = emoji_vecs
    df["emoji_pos"] = emoji_positions
    return df.dropna(subset=["Sentiment"])

train_df = preprocess_df(train_df)
test_df = preprocess_df(test_df)

class EmojiSentimentDataset(Dataset):
    def __init__(self, df, tokenizer, max_len=128):
        self.texts = df["CommentText"].tolist()
        self.emoji2vec_vecs = df["emoji2vec_vec"].tolist()
        self.emoji_pos = df["emoji_pos"].tolist()
        self.labels = df["Sentiment"].tolist()
        self.tokenizer = tokenizer
        self.max_len = max_len
    def __len__(self):
        return len(self.texts)
    def __getitem__(self, idx):
        text = self.texts[idx]
        emoji2vec_vec = torch.tensor(self.emoji2vec_vecs[idx], dtype=torch.float32)
        emoji_pos = torch.tensor(self.emoji_pos[idx], dtype=torch.long)
        label = torch.tensor(self.labels[idx], dtype=torch.long)
        encoding = tokenizer(text, padding="max_length", truncation=True, max_length=self.max_len, return_tensors="pt")
        return {
            "input_ids": encoding["input_ids"].squeeze(0),
            "attention_mask": encoding["attention_mask"].squeeze(0),
            "emoji2vec_vec": emoji2vec_vec,
            "emoji_pos": emoji_pos,
            "labels": label
        }

def collate_fn(batch):
    input_ids = torch.stack([b["input_ids"] for b in batch])
    attention_mask = torch.stack([b["attention_mask"] for b in batch])
    emoji2vec_vec = torch.stack([b["emoji2vec_vec"] for b in batch])
    emoji_pos = torch.stack([b["emoji_pos"] for b in batch])
    labels = torch.stack([b["labels"] for b in batch])
    return {
        "input_ids": input_ids,
        "attention_mask": attention_mask,
        "emoji2vec_vec": emoji2vec_vec,
        "emoji_pos": emoji_pos,
        "labels": labels
    }

class BertWithEmojiOnly(nn.Module):
    def __init__(self, num_labels=3, emoji2vec_dim=300, pos_embed_dim=16):
        super().__init__()
        self.bert = BertModel.from_pretrained("roberta-base")
        self.pos_embedding = nn.Embedding(2, pos_embed_dim)
        self.dropout = nn.Dropout(0.1)
        combined_dim = self.bert.config.hidden_size + emoji2vec_dim + pos_embed_dim
        self.classifier = nn.Linear(combined_dim, num_labels)
    def forward(self, input_ids, attention_mask, emoji2vec_vec, emoji_pos, labels=None):
        bert_out = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        cls_vec = bert_out.last_hidden_state[:, 0, :]
        pos_vec = self.pos_embedding(emoji_pos)
        concat_vec = torch.cat([cls_vec, emoji2vec_vec, pos_vec], dim=1)
        output = self.dropout(concat_vec)
        logits = self.classifier(output)
        if labels is not None:
            loss_fn = nn.CrossEntropyLoss()
            loss = loss_fn(logits, labels)
            return loss, logits
        return logits

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=1)
    acc = accuracy_score(labels, preds)
    f1 = f1_score(labels, preds, average="macro")
    precision = precision_score(labels, preds, average="macro")
    recall = recall_score(labels, preds, average="macro")
    try:
        from sklearn.preprocessing import label_binarize
        y_true_bin = label_binarize(labels, classes=[0, 1, 2])
        auc = roc_auc_score(y_true_bin, logits, multi_class="ovr", average="macro")
    except:
        auc = 0.0
    return {"accuracy": acc, "f1": f1, "precision": precision, "recall": recall, "auc": auc}

from transformers import RobertaTokenizer

os.environ["WANDB_DISABLED"] = "true"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tokenizer = RobertaTokenizer.from_pretrained("roberta-base")
train_dataset = EmojiSentimentDataset(train_df, tokenizer)
test_dataset = EmojiSentimentDataset(test_df, tokenizer)
model = BertWithEmojiOnly().to(device)

training_args = TrainingArguments(
    output_dir="./finetuned-emote-roberta",
    per_device_train_batch_size=16,
    num_train_epochs=3,
    save_steps=500,
    logging_steps=100,
    do_train=True,
    do_eval=True,
    overwrite_output_dir=True,
    learning_rate=2e-5
    eval_strategy="epoch",
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    compute_metrics=compute_metrics,
    data_collator=collate_fn
)

trainer.train()
eval_results = trainer.evaluate()
print("Evaluation results:", eval_results)

# 切換到評估模式
model.eval()

# 準備 Dataloader
test_loader = DataLoader(test_dataset, batch_size=16, collate_fn=collate_fn)

# 儲存欄位
texts = test_df['CommentText'].tolist()
true_labels = []
pred_labels = []
emoji_lists = []

# 預測 loop
for i, batch in enumerate(test_loader):
    input_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    # Change 'emoji_vec' to 'emoji2vec_vec' to match the model's forward method
    emoji2vec_vec = batch['emoji2vec_vec'].to(device)
    labels = batch['labels'].to(device)
    with torch.no_grad():
        # Pass emoji2vec_vec using the correct keyword argument name
        outputs = model(input_ids=input_ids, attention_mask=attention_mask, emoji2vec_vec=emoji2vec_vec, emoji_pos=batch['emoji_pos'].to(device))
        logits = outputs if isinstance(outputs, torch.Tensor) else outputs[1]
        preds = torch.argmax(logits, dim=1)
    true_labels.extend(labels.cpu().tolist())
    pred_labels.extend(preds.cpu().tolist())

# 抽取 emoji 函數
def extract_emojis(text):
    return ' '.join(re.findall(r"[^\w\s,.!?\'\"@#$%^&*()<>+=:;~`]+", text))

emoji_lists = [extract_emojis(t) for t in texts]

# 建立 DataFrame 並匯出
df_result = pd.DataFrame({
    "text": texts,
    "true_label": true_labels,
    "pred_label": pred_labels,
    "emoji_list": emoji_lists
})

df_result.to_csv("youtube_emoji_sentiment_position.csv", index=False)
print("✅ 匯出完成：youtube_emoji_sentiment_position.csv")

# pretrain

In [None]:

import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from datasets import Dataset
from gensim.models import KeyedVectors
from transformers import (
    RobertaTokenizer, RobertaForMaskedLM, DataCollatorForLanguageModeling,
    TrainingArguments, Trainer, RobertaModel
)
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from sklearn.preprocessing import label_binarize
from torch.utils.data import Dataset as TorchDataset

# ========== 1. Load emoji2vec ==========
emoji_model = KeyedVectors.load_word2vec_format("/content/emoji2vec/pre-trained/emoji2vec.bin", binary=True)

def get_emoji_vector(text):
    emojis = [ch for ch in text if ch in emoji_model]
    if not emojis:
        return np.zeros(emoji_model.vector_size)
    vectors = [emoji_model[em] for em in emojis]
    return np.mean(vectors, axis=0)

# ========== 2. ELCo MLM 預訓練 ==========
elco_df = pd.read_csv("/content/ELCo/ELCo.csv")
import random

templates = [
    lambda em, en: f"{em} usually means {en}.",
    lambda em, en: f"When people use {em}, they are likely feeling {en}.",
    lambda em, en: f"{em} is often used to show {en}.",
    lambda em, en: f"Using {em} might suggest that someone is experiencing {en}.",
    lambda em, en: f"The emoji {em} stands for {en} in many cases.",
]

elco_df['text'] = [random.choice(templates)(em, en) for em, en in zip(elco_df["EM"], elco_df["EN"])]
elco_dataset = Dataset.from_pandas(elco_df[['text']])

tokenizer = RobertaTokenizer.from_pretrained("roberta-base")

def tokenize_mlm(example):
    return tokenizer(example["text"], truncation=True, padding="max_length", max_length=128)

tokenized_elco = elco_dataset.map(tokenize_mlm, batched=True)
tokenized_elco.set_format(type="torch", columns=["input_ids", "attention_mask"])

data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=True, mlm_probability=0.15)
mlm_model = RobertaForMaskedLM.from_pretrained("roberta-base")

mlm_args = TrainingArguments(
    output_dir="./roberta_elco_mlm",
    num_train_epochs=20,
    per_device_train_batch_size=16,
    save_steps=500,
    logging_steps=100,
    prediction_loss_only=True,
    overwrite_output_dir=True,
    learning_rate=1e-4
)

mlm_trainer = Trainer(model=mlm_model, args=mlm_args, train_dataset=tokenized_elco, data_collator=data_collator)
mlm_trainer.train()
mlm_model.save_pretrained("./roberta_elco_mlm")
tokenizer.save_pretrained("./roberta_elco_mlm")



In [None]:

# ========== 3. EmoTE fine-tune with YouTube 自製資料集 ==========
from datasets import Dataset

yt_emote_df = pd.read_csv("/content/drive/MyDrive/YouTube_Emote_Combined.csv")
yt_emote_dataset = Dataset.from_pandas(yt_emote_df)

def tokenize_emote(example):
    return tokenizer(example["premise"], example["hypothesis"], truncation="only_first",  # or "only_second"
          return_overflowing_tokens=True,
          padding="max_length", max_length=128)

encoded_emote = yt_emote_dataset.map(tokenize_emote, batched=True)
encoded_emote = encoded_emote.rename_column("label", "labels")
encoded_emote.set_format("torch", columns=["input_ids", "attention_mask", "labels"])

# 使用前面 MLM 預訓練完成的模型初始化
from transformers import RobertaForSequenceClassification

emote_model = RobertaForSequenceClassification.from_pretrained("./roberta_elco_mlm", num_labels=2)

emote_args = TrainingArguments(
    output_dir="./roberta_emote_finetune_youtube",  # ✅ output 改個名字
    num_train_epochs=5,
    per_device_train_batch_size=5,
    learning_rate=2e-5,
    warmup_steps=300,
    lr_scheduler_type="linear",
    logging_steps=100,
    save_steps=500,
    overwrite_output_dir=True,
    report_to="none"  # 關掉 wandb
)

emote_trainer = Trainer(
    model=emote_model,
    args=emote_args,
    train_dataset=encoded_emote,
    tokenizer=tokenizer
)

emote_trainer.train()
emote_model.save_pretrained("./roberta_emote_finetune")
tokenizer.save_pretrained("./roberta_emote_finetune")


In [None]:
# ========== 4. YouTube 情緒分類 with emoji2vec ==========
train_df = pd.read_csv('/content/drive/MyDrive/final_balanced_training_set(1).csv')
test_df = pd.read_csv('/content/drive/MyDrive/final_test_set_processed(1).csv')
sentiment_map = {"Negative": 0, "Neutral": 1, "Positive": 2}
train_df['Sentiment'] = train_df['Sentiment'].map(sentiment_map)
test_df['Sentiment'] = test_df['Sentiment'].map(sentiment_map)

class YoutubeDatasetWithEmoji(TorchDataset):
    def __init__(self, df, tokenizer, max_length=128):
        self.texts = df['CommentText'].tolist()
        self.labels = df['Sentiment'].tolist()
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self): return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        encoding = self.tokenizer(
            text, truncation=True, padding="max_length", max_length=self.max_length, return_tensors='pt')
        item = {k: v.squeeze() for k, v in encoding.items()}
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.long)
        item['emoji_vec'] = torch.tensor(get_emoji_vector(text), dtype=torch.float)
        return item

class RobertaWithEmoji(nn.Module):
    def __init__(self, model_path, emoji_dim=300, num_labels=3):
        super().__init__()
        self.roberta = RobertaModel.from_pretrained(model_path)
        self.dropout = nn.Dropout(0.1)
        self.emoji_proj = nn.Linear(emoji_dim, 768)
        self.classifier = nn.Linear(768 + 768, num_labels)

    def forward(self, input_ids, attention_mask, emoji_vec, labels=None):
        roberta_out = self.roberta(input_ids=input_ids, attention_mask=attention_mask).pooler_output
        emoji_embed = self.emoji_proj(emoji_vec)
        concat = torch.cat([roberta_out, emoji_embed], dim=1)
        logits = self.classifier(self.dropout(concat))

        if labels is not None:
            loss_fn = nn.CrossEntropyLoss()
            loss = loss_fn(logits, labels)
            return {"loss": loss, "logits": logits}
        return {"logits": logits}

def custom_collate_fn(batch):
    return {
        'input_ids': torch.stack([b['input_ids'] for b in batch]),
        'attention_mask': torch.stack([b['attention_mask'] for b in batch]),
        'emoji_vec': torch.stack([b['emoji_vec'] for b in batch]),
        'labels': torch.stack([b['labels'] for b in batch])
    }

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=1)
    acc = accuracy_score(labels, preds)
    macro_f1 = f1_score(labels, preds, average='macro')
    precision = precision_score(labels, preds, average='macro')
    recall = recall_score(labels, preds, average='macro')
    try:
        y_true_bin = label_binarize(labels, classes=[0,1,2])
        auc = roc_auc_score(y_true_bin, logits, multi_class='ovr', average='macro')
    except:
        auc = 0.0
    return {"accuracy": acc, "macro_f1": macro_f1, "precision": precision, "recall": recall, "auc": auc}

model = RobertaWithEmoji("./roberta_emote_finetune")
train_dataset = YoutubeDatasetWithEmoji(train_df, tokenizer)
test_dataset = YoutubeDatasetWithEmoji(test_df, tokenizer)

finetune_args = TrainingArguments(
    output_dir="./roberta_youtube_emoji_sentiment",
    num_train_epochs=3,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    logging_dir="./logs_youtube",
    logging_steps=50,
    save_steps=500,
    save_total_limit=2,
    do_eval=True,
    do_train=True,
    learning_rate=5e-5,
    warmup_steps=300,
    lr_scheduler_type="linear"
)

trainer = Trainer(
    model=model,
    args=finetune_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    compute_metrics=compute_metrics,
    data_collator=custom_collate_fn
)

trainer.train()
results = trainer.evaluate()
print("\n🎯 Final Evaluation:", results)

# 準備 Dataloader
test_loader = DataLoader(test_dataset, batch_size=16, collate_fn=custom_collate_fn)

# 儲存欄位
texts = test_df['CommentText'].tolist()
true_labels = []
pred_labels = []
emoji_lists = []

# 預測 loop
for i, batch in enumerate(tqdm(test_loader, desc="🔍 Predicting")):
    input_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    emoji_vec = batch['emoji_vec'].to(device)
    labels = batch['labels'].to(device)

    with torch.no_grad():
        outputs = model(input_ids=input_ids, attention_mask=attention_mask, emoji_vec=emoji_vec)
        logits = outputs["logits"]
        preds = torch.argmax(logits, dim=1)

    true_labels.extend(labels.cpu().tolist())
    pred_labels.extend(preds.cpu().tolist())

# 抽取 emoji 函數
def extract_emojis(text):
    return ' '.join(re.findall(r"[^\w\s,.!?\'\"@#$%^&*()<>+=:;~`]+", text))

emoji_lists = [extract_emojis(t) for t in texts]

# 建立 DataFrame 並匯出
df_result = pd.DataFrame({
    "text": texts,
    "true_label": true_labels,
    "pred_label": pred_labels,
    "emoji_list": emoji_lists
})

df_result.to_csv("youtube_emoji_sentiment_pretrain.csv", index=False)
print("✅ 結果已匯出至 youtube_emoji_sentiment_pretrain.csv")

# pretrain (測試的)

In [None]:
!pip install transformers --upgrade

In [None]:
import pandas as pd
import numpy as np
import torch
import random
import math
from datasets import Dataset
from sklearn.model_selection import train_test_split
from transformers import (
    RobertaTokenizer, RobertaForMaskedLM, DataCollatorForLanguageModeling,
    TrainingArguments, Trainer
)

# ========== 1. 載入 ELCo 資料 ==========
elco_df = pd.read_csv("/content/ELCo/ELCo.csv")

# 建立多樣化 MLM 樣式句子
templates = [
    lambda em, en: f"{em} usually means {en}.",
    lambda em, en: f"When people use {em}, they are likely feeling {en}.",
    lambda em, en: f"{em} is often used to show {en}.",
    lambda em, en: f"Using {em} might suggest that someone is experiencing {en}.",
    lambda em, en: f"The emoji {em} stands for {en} in many cases.",
]
elco_df['text'] = [random.choice(templates)(em, en) for em, en in zip(elco_df["EM"], elco_df["EN"])]

# ========== 2. 切分 train / val ==========
train_texts, val_texts = train_test_split(elco_df['text'], test_size=0.1, random_state=42)
train_dataset = Dataset.from_pandas(pd.DataFrame({'text': train_texts}))
val_dataset = Dataset.from_pandas(pd.DataFrame({'text': val_texts}))

# ========== 3. Tokenizer ==========
tokenizer = RobertaTokenizer.from_pretrained("roberta-base")

def tokenize_mlm(example):
    return tokenizer(example["text"], truncation=True, padding="max_length", max_length=128)

tokenized_train = train_dataset.map(tokenize_mlm, batched=True)
tokenized_val = val_dataset.map(tokenize_mlm, batched=True)

tokenized_train.set_format(type="torch", columns=["input_ids", "attention_mask"])
tokenized_val.set_format(type="torch", columns=["input_ids", "attention_mask"])

# ========== 4. 訓練模型 ==========
mlm_model = RobertaForMaskedLM.from_pretrained("roberta-base")
data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=True, mlm_probability=0.15)

mlm_args = TrainingArguments(
    output_dir="./roberta_elco_mlm",
    num_train_epochs=20,
    per_device_train_batch_size=16,
    save_steps=500,
    logging_steps=100,
    learning_rate=1e-4,
    eval_strategy="epoch",  # ✅ 每個 epoch 都 evaluate
    save_total_limit=2
)

mlm_trainer = Trainer(
    model=mlm_model,
    args=mlm_args,
    train_dataset=tokenized_train,
    eval_dataset=tokenized_val,
    data_collator=data_collator,
)

mlm_trainer.train()

# ========== 5. Evaluate：計算 Perplexity ==========
eval_results = mlm_trainer.evaluate()
perplexity = math.exp(eval_results["eval_loss"])
print(f"🔍 Evaluation Perplexity: {perplexity:.2f}")

# ========== 6. 儲存模型 ==========
mlm_model.save_pretrained("./roberta_elco_mlm")
tokenizer.save_pretrained("./roberta_elco_mlm")


In [None]:
import pandas as pd
import math
from datasets import Dataset
from transformers import (
    RobertaTokenizer, RobertaForSequenceClassification,
    TrainingArguments, Trainer, EvalPrediction
)
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

# ========== 1. 讀入資料集 ==========
df = pd.read_csv("/content/drive/MyDrive/____YouTube_Emote____.csv")
df = df.dropna(subset=["premise", "hypothesis", "label"])  # 確保無缺漏資料

# 切分訓練 / 驗證集
from sklearn.model_selection import train_test_split
train_df, val_df = train_test_split(df, test_size=0.1, random_state=42, stratify=df["label"])

train_dataset = Dataset.from_pandas(train_df)
val_dataset = Dataset.from_pandas(val_df)

# ========== 2. Tokenizer ==========
tokenizer = RobertaTokenizer.from_pretrained("./roberta_elco_mlm")

def tokenize_emote(example):
    return tokenizer(example["premise"], example["hypothesis"],
                     truncation=True, padding="max_length", max_length=128)

tokenized_train = train_dataset.map(tokenize_emote, batched=True)
tokenized_val = val_dataset.map(tokenize_emote, batched=True)

for dataset in [tokenized_train, tokenized_val]:
    dataset = dataset.rename_column("label", "labels")
    dataset.set_format("torch", columns=["input_ids", "attention_mask", "labels"])

# ========== 3. 載入 MLM 預訓練模型並 fine-tune ==========
model = RobertaForSequenceClassification.from_pretrained("./roberta_elco_mlm", num_labels=2)

def compute_metrics(p: EvalPrediction):
    preds = p.predictions.argmax(-1)
    labels = p.label_ids
    return {
        "accuracy": accuracy_score(labels, preds),
        "f1": f1_score(labels, preds),
        "precision": precision_score(labels, preds),
        "recall": recall_score(labels, preds)
    }

training_args = TrainingArguments(
    output_dir="./roberta_emote_finetune_youtube",
    num_train_epochs=5,
    per_device_train_batch_size=5,
    per_device_eval_batch_size=16,
    eval_strategy="epoch",  # ✅ 每個 epoch 都 evaluate
    learning_rate=2e-5,
    warmup_steps=300,
    lr_scheduler_type="linear",
    logging_steps=100,
    save_steps=500,
    overwrite_output_dir=True,
    report_to="none"
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train,
    eval_dataset=tokenized_val,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

# ========== 4. 訓練與評估 ==========
trainer.train()
eval_result = trainer.evaluate()
print("📊 Evaluation Results:", eval_result)

# ========== 5. 儲存模型 ==========
model.save_pretrained("./roberta_emote_finetune")
tokenizer.save_pretrained("./roberta_emote_finetune")


In [None]:
# ========== 4. YouTube 情緒分類 with emoji2vec ==========
train_df = pd.read_csv('/content/drive/MyDrive/final_balanced_training_set(1).csv')
test_df = pd.read_csv('/content/drive/MyDrive/final_test_set_processed(1).csv')
sentiment_map = {"Negative": 0, "Neutral": 1, "Positive": 2}
train_df['Sentiment'] = train_df['Sentiment'].map(sentiment_map)
test_df['Sentiment'] = test_df['Sentiment'].map(sentiment_map)

class YoutubeDatasetWithEmoji(TorchDataset):
    def __init__(self, df, tokenizer, max_length=128):
        self.texts = df['CommentText'].tolist()
        self.labels = df['Sentiment'].tolist()
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self): return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        encoding = self.tokenizer(
            text, truncation=True, padding="max_length", max_length=self.max_length, return_tensors='pt')
        item = {k: v.squeeze() for k, v in encoding.items()}
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.long)
        item['emoji_vec'] = torch.tensor(get_emoji_vector(text), dtype=torch.float)
        return item

class RobertaWithEmoji(nn.Module):
    def __init__(self, model_path, emoji_dim=300, num_labels=3):
        super().__init__()
        self.roberta = RobertaModel.from_pretrained(model_path)
        self.dropout = nn.Dropout(0.1)
        self.emoji_proj = nn.Linear(emoji_dim, 768)
        self.classifier = nn.Linear(768 + 768, num_labels)

    def forward(self, input_ids, attention_mask, emoji_vec, labels=None):
        roberta_out = self.roberta(input_ids=input_ids, attention_mask=attention_mask).pooler_output
        emoji_embed = self.emoji_proj(emoji_vec)
        concat = torch.cat([roberta_out, emoji_embed], dim=1)
        logits = self.classifier(self.dropout(concat))

        if labels is not None:
            loss_fn = nn.CrossEntropyLoss()
            loss = loss_fn(logits, labels)
            return {"loss": loss, "logits": logits}
        return {"logits": logits}

def custom_collate_fn(batch):
    return {
        'input_ids': torch.stack([b['input_ids'] for b in batch]),
        'attention_mask': torch.stack([b['attention_mask'] for b in batch]),
        'emoji_vec': torch.stack([b['emoji_vec'] for b in batch]),
        'labels': torch.stack([b['labels'] for b in batch])
    }

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=1)
    acc = accuracy_score(labels, preds)
    macro_f1 = f1_score(labels, preds, average='macro')
    precision = precision_score(labels, preds, average='macro')
    recall = recall_score(labels, preds, average='macro')
    try:
        y_true_bin = label_binarize(labels, classes=[0,1,2])
        auc = roc_auc_score(y_true_bin, logits, multi_class='ovr', average='macro')
    except:
        auc = 0.0
    return {"accuracy": acc, "macro_f1": macro_f1, "precision": precision, "recall": recall, "auc": auc}

model = DebertaWithEmoji("./roberta_emote_finetune")
train_dataset = YoutubeDatasetWithEmoji(train_df, tokenizer)
test_dataset = YoutubeDatasetWithEmoji(test_df, tokenizer)

finetune_args = TrainingArguments(
    output_dir="./roberta_youtube_emoji_sentiment",
    num_train_epochs=3,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    logging_dir="./logs_youtube",
    logging_steps=50,
    save_steps=500,
    save_total_limit=2,
    do_eval=True,
    do_train=True,
    learning_rate=5e-5,
    warmup_steps=300,
    lr_scheduler_type="linear",
    eval_strategy="epoch",  # ✅ 每個 epoch 都 evaluate

)

trainer = Trainer(
    model=model,
    args=finetune_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    compute_metrics=compute_metrics,
    data_collator=custom_collate_fn
)

trainer.train()
results = trainer.evaluate()
print("\n🎯 Final Evaluation:", results)

# 準備 Dataloader
test_loader = DataLoader(test_dataset, batch_size=16, collate_fn=custom_collate_fn)

# 儲存欄位
texts = test_df['CommentText'].tolist()
true_labels = []
pred_labels = []
emoji_lists = []

# 預測 loop
for i, batch in enumerate(tqdm(test_loader, desc="🔍 Predicting")):
    input_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    emoji_vec = batch['emoji_vec'].to(device)
    labels = batch['labels'].to(device)

    with torch.no_grad():
        outputs = model(input_ids=input_ids, attention_mask=attention_mask, emoji_vec=emoji_vec)
        logits = outputs["logits"]
        preds = torch.argmax(logits, dim=1)

    true_labels.extend(labels.cpu().tolist())
    pred_labels.extend(preds.cpu().tolist())

# 抽取 emoji 函數
def extract_emojis(text):
    return ' '.join(re.findall(r"[^\w\s,.!?\'\"@#$%^&*()<>+=:;~`]+", text))

emoji_lists = [extract_emojis(t) for t in texts]

# 建立 DataFrame 並匯出
df_result = pd.DataFrame({
    "text": texts,
    "true_label": true_labels,
    "pred_label": pred_labels,
    "emoji_list": emoji_lists
})

df_result.to_csv("youtube_emoji_sentiment_pretrain.csv", index=False)
print("✅ 結果已匯出至 youtube_emoji_sentiment_pretrain.csv")

# pretrain (deberta)

In [None]:
!pip install transformers --upgrade

In [None]:
# ✅ 完整整合流程：使用 DeBERTa + emoji2vec，無位置特徵
import pandas as pd
import numpy as np
import torch
import random
import re
import math
from datasets import Dataset
from transformers import (
    DebertaTokenizer, DebertaForMaskedLM, DebertaModel,
    DebertaForSequenceClassification,
    DataCollatorForLanguageModeling,
    TrainingArguments, Trainer, EvalPrediction
)
from torch.utils.data import Dataset as TorchDataset, DataLoader
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score
from sklearn.preprocessing import label_binarize
from gensim.models import KeyedVectors
from tqdm import tqdm
import os

# ========== STEP 1: MLM 預訓練 ==========
elco_df = pd.read_csv("/content/ELCo/ELCo.csv")
templates = [
    lambda em, en: f"{em} usually means {en}.",
    lambda em, en: f"When people use {em}, they are likely feeling {en}.",
    lambda em, en: f"{em} is often used to show {en}.",
    lambda em, en: f"Using {em} might suggest that someone is experiencing {en}.",
    lambda em, en: f"The emoji {em} stands for {en} in many cases."
]
elco_df['text'] = [random.choice(templates)(em, en) for em, en in zip(elco_df["EM"], elco_df["EN"])]

train_texts, val_texts = train_test_split(elco_df['text'], test_size=0.1, random_state=42)
train_dataset = Dataset.from_pandas(pd.DataFrame({'text': train_texts}))
val_dataset = Dataset.from_pandas(pd.DataFrame({'text': val_texts}))

tokenizer = DebertaTokenizer.from_pretrained("microsoft/deberta-base")
mlm_model = DebertaForMaskedLM.from_pretrained("microsoft/deberta-base")

def tokenize_mlm(example):
    return tokenizer(example['text'], truncation=True, padding="max_length", max_length=128)

tokenized_train = train_dataset.map(tokenize_mlm, batched=True)
tokenized_val = val_dataset.map(tokenize_mlm, batched=True)

tokenized_train.set_format("torch", columns=["input_ids", "attention_mask"])
tokenized_val.set_format("torch", columns=["input_ids", "attention_mask"])

data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=True, mlm_probability=0.15)

mlm_args = TrainingArguments(
    output_dir="./deberta_elco_mlm",
    num_train_epochs=20,
    per_device_train_batch_size=16,
    save_steps=500,
    logging_steps=100,
    learning_rate=1e-4,
    eval_strategy="epoch",
    save_total_limit=2
)

mlm_trainer = Trainer(
    model=mlm_model,
    args=mlm_args,
    train_dataset=tokenized_train,
    eval_dataset=tokenized_val,
    data_collator=data_collator
)

mlm_trainer.train()
mlm_model.save_pretrained("./deberta_elco_mlm")
tokenizer.save_pretrained("./deberta_elco_mlm")


# step2
# ========== 1. 讀入資料集 ==========
df = pd.read_csv("/content/drive/MyDrive/____YouTube_Emote____.csv")
df = df.dropna(subset=["premise", "hypothesis", "label"])  # 確保無缺漏資料

# 切分訓練 / 驗證集
from sklearn.model_selection import train_test_split
train_df, val_df = train_test_split(df, test_size=0.1, random_state=42, stratify=df["label"])

train_dataset = Dataset.from_pandas(train_df)
val_dataset = Dataset.from_pandas(val_df)

# ========== 2. Tokenizer ==========
tokenizer = DebertaTokenizer.from_pretrained("./deberta_elco_mlm")



def tokenize_emote(example):
    return tokenizer(example["premise"], example["hypothesis"],
                     truncation=True, padding="max_length", max_length=128)

tokenized_train = train_dataset.map(tokenize_emote, batched=True)
tokenized_val = val_dataset.map(tokenize_emote, batched=True)

for dataset in [tokenized_train, tokenized_val]:
    dataset = dataset.rename_column("label", "labels")
    dataset.set_format("torch", columns=["input_ids", "attention_mask", "labels"])

# ========== 3. 載入 MLM 預訓練模型並 fine-tune ==========
model = DebertaForSequenceClassification.from_pretrained("./deberta_elco_mlm", num_labels=2)

def compute_metrics(p: EvalPrediction):
    preds = p.predictions.argmax(-1)
    labels = p.label_ids
    return {
        "accuracy": accuracy_score(labels, preds),
        "f1": f1_score(labels, preds),
        "precision": precision_score(labels, preds),
        "recall": recall_score(labels, preds)
    }

training_args = TrainingArguments(
    output_dir="./roberta_emote_finetune_youtube",
    num_train_epochs=10,
    per_device_train_batch_size=5,
    per_device_eval_batch_size=16,
    eval_strategy="epoch",  # ✅ 每個 epoch 都 evaluate
    learning_rate=2e-5,
    warmup_steps=300,
    lr_scheduler_type="linear",
    logging_steps=100,
    save_steps=500,
    overwrite_output_dir=True,
    report_to="none"
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train,
    eval_dataset=tokenized_val,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

# ========== 4. 訓練與評估 ==========
trainer.train()
eval_result = trainer.evaluate()
print("📊 Evaluation Results:", eval_result)

# ========== 5. 儲存模型 ==========
model.save_pretrained("./deberta_emote_finetune")
tokenizer.save_pretrained("./deberta_emote_finetune")
# ========== STEP 2: 情緒分類（含 emoji2vec） ==========
emoji_model = KeyedVectors.load_word2vec_format("/content/emoji2vec/pre-trained/emoji2vec.bin", binary=True)

def get_emoji_vector(text):
    emojis = re.findall(r"[\U00010000-\U0010ffff]", text)
    if not emojis: return np.zeros(300)
    vecs = [emoji_model[e] for e in emojis if e in emoji_model]
    return np.mean(vecs, axis=0) if vecs else np.zeros(300)

train_df = pd.read_csv('/content/drive/MyDrive/final_balanced_training_set(1).csv')
test_df = pd.read_csv('/content/drive/MyDrive/final_test_set_processed(1).csv')
sentiment_map = {"Negative": 0, "Neutral": 1, "Positive": 2}
train_df['Sentiment'] = train_df['Sentiment'].map(sentiment_map)
test_df['Sentiment'] = test_df['Sentiment'].map(sentiment_map)

class YoutubeDatasetWithEmoji(TorchDataset):
    def __init__(self, df, tokenizer, max_length=128):
        self.texts = df['CommentText'].tolist()
        self.labels = df['Sentiment'].tolist()
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self): return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        encoding = self.tokenizer(text, truncation=True, padding="max_length", max_length=self.max_length, return_tensors='pt')
        item = {k: v.squeeze() for k, v in encoding.items()}
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.long)
        item['emoji_vec'] = torch.tensor(get_emoji_vector(text), dtype=torch.float)
        return item

class DebertaWithEmoji(nn.Module):
    def __init__(self, model_path, emoji_dim=300, num_labels=3):
        super().__init__()
        self.deberta = DebertaModel.from_pretrained(model_path)
        self.dropout = nn.Dropout(0.1)
        self.emoji_proj = nn.Linear(emoji_dim, 768)
        self.classifier = nn.Linear(768 + 768, num_labels)

    def forward(self, input_ids, attention_mask, emoji_vec, labels=None):
        deberta_out = self.deberta(input_ids=input_ids, attention_mask=attention_mask).last_hidden_state[:, 0]
        emoji_embed = self.emoji_proj(emoji_vec)
        concat = torch.cat([deberta_out, emoji_embed], dim=1)
        logits = self.classifier(self.dropout(concat))
        if labels is not None:
            loss_fn = nn.CrossEntropyLoss()
            loss = loss_fn(logits, labels)
            return {"loss": loss, "logits": logits}
        return {"logits": logits}

def collate_fn(batch):
    return {
        'input_ids': torch.stack([b['input_ids'] for b in batch]),
        'attention_mask': torch.stack([b['attention_mask'] for b in batch]),
        'emoji_vec': torch.stack([b['emoji_vec'] for b in batch]),
        'labels': torch.stack([b['labels'] for b in batch])
    }

def compute_metrics(pred):
    logits, labels = pred
    preds = np.argmax(logits, axis=1)
    return {
        "accuracy": accuracy_score(labels, preds),
        "macro_f1": f1_score(labels, preds, average='macro'),
        "precision": precision_score(labels, preds, average='macro'),
        "recall": recall_score(labels, preds, average='macro'),
        "auc": roc_auc_score(label_binarize(labels, classes=[0, 1, 2]), logits, average='macro', multi_class='ovr')
    }

# ========== Training ==========
model = DebertaWithEmoji("./deberta_emote_finetune")
tokenizer = DebertaTokenizer.from_pretrained("./deberta_emote_finetune")
train_dataset = YoutubeDatasetWithEmoji(train_df, tokenizer)
test_dataset = YoutubeDatasetWithEmoji(test_df, tokenizer)

trainer = Trainer(
    model=model,
    args=TrainingArguments(
        output_dir="./deberta_youtube_emoji_sentiment",
        num_train_epochs=3,
        per_device_train_batch_size=16,
        per_device_eval_batch_size=16,
        learning_rate=2e-5,
        eval_strategy="epoch",
        save_total_limit=2,
        logging_steps=50,
    ),
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    compute_metrics=compute_metrics,
    data_collator=collate_fn
)

trainer.train()
print("✅ 評估結果：", trainer.evaluate())

# ========== 預測輸出 ==========
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

loader = DataLoader(test_dataset, batch_size=16, collate_fn=collate_fn)
texts = test_df['CommentText'].tolist()
true_labels, pred_labels = [], []

for batch in tqdm(loader, desc="🔍 Predicting"):
    input_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    emoji_vec = batch['emoji_vec'].to(device)
    labels = batch['labels'].to(device)

    with torch.no_grad():
        outputs = model(input_ids=input_ids, attention_mask=attention_mask, emoji_vec=emoji_vec)
        preds = torch.argmax(outputs["logits"], dim=1)

    true_labels.extend(labels.cpu().tolist())
    pred_labels.extend(preds.cpu().tolist())

emoji_lists = [' '.join(re.findall(r"[^\w\s,.!?\'\"@#$%^&*()<>+=:;~`]+", t)) for t in texts]

pd.DataFrame({
    "text": texts,
    "true_label": true_labels,
    "pred_label": pred_labels,
    "emoji_list": emoji_lists
}).to_csv("youtube_emoji_sentiment_pretrain.csv", index=False)
print("✅ 匯出完成：youtube_emoji_sentiment_pretrain.csv")


In [None]:

# ====== STEP 3: YouTube 情緒分析 with emoji features ======
def get_emoji_vec(text):
    emojis = re.findall(r"[\U00010000-\U0010ffff]", text)
    if not emojis: return np.zeros(300)
    vecs = [emoji_model[e] for e in emojis if e in emoji_model]
    return np.mean(vecs, axis=0) if vecs else np.zeros(300)

def get_emoji_position(text):
    emojis = re.findall(r"[\U00010000-\U0010ffff]", text)
    if not emojis: return 0
    emoji_str = ''.join(emojis)
    idx = text.find(emoji_str)
    return 1 if idx / max(1, len(text)) > 0.5 else 0

emoji_model = KeyedVectors.load_word2vec_format("/content/emoji2vec/pre-trained/emoji2vec.bin", binary=True)
yt_train = pd.read_csv('/content/drive/MyDrive/final_balanced_training_set(1).csv')
yt_test = pd.read_csv('/content/drive/MyDrive/final_test_set_processed(1).csv')
label_map = {"Negative": 0, "Neutral": 1, "Positive": 2}
yt_train["Sentiment"] = yt_train["Sentiment"].map(label_map)
yt_test["Sentiment"] = yt_test["Sentiment"].map(label_map)

tokenizer = DebertaTokenizer.from_pretrained("microsoft/deberta-base")
# 移除 position 特徵，只保留 emoji_vec
def get_emoji_vec(text):
    emojis = re.findall(r"[\U00010000-\U0010ffff]", text)
    if not emojis: return np.zeros(300)
    vecs = [emoji_model[e] for e in emojis if e in emoji_model]
    return np.mean(vecs, axis=0) if vecs else np.zeros(300)

# 刪除 position 特徵
class YouTubeEmojiDataset(TorchDataset):
    def __init__(self, df, tokenizer, max_len=128):
        self.df = df
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self): return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        text = row['CommentText']
        label = row['Sentiment']
        encoding = self.tokenizer(text, truncation=True, padding="max_length", max_length=self.max_len, return_tensors="pt")
        emoji_vec = get_emoji_vec(text)
        return {
            'input_ids': encoding['input_ids'].squeeze(),
            'attention_mask': encoding['attention_mask'].squeeze(),
            'labels': torch.tensor(label, dtype=torch.long),
            'emoji_vec': torch.tensor(emoji_vec, dtype=torch.float32)
        }

# 刪除 position 特徵嵌入層
class DebertaWithEmoji(nn.Module):
    def __init__(self, model_path, emoji_dim=300, num_labels=3):
        super().__init__()
        self.deberta = DebertaModel.from_pretrained(model_path)
        self.emoji_proj = nn.Linear(emoji_dim, 768)
        self.classifier = nn.Linear(768 + 768, num_labels)
        self.dropout = nn.Dropout(0.1)

    def forward(self, input_ids, attention_mask, emoji_vec, labels=None):
        deberta_out = self.deberta(input_ids=input_ids, attention_mask=attention_mask).last_hidden_state[:, 0]
        emoji_embed = self.emoji_proj(emoji_vec)
        combined = torch.cat([deberta_out, emoji_embed], dim=1)
        logits = self.classifier(self.dropout(combined))
        if labels is not None:
            loss_fn = nn.CrossEntropyLoss()
            loss = loss_fn(logits, labels)
            return {"loss": loss, "logits": logits}
        return {"logits": logits}

# 修改 collate_fn
def collate_fn(batch):
    return {
        'input_ids': torch.stack([b['input_ids'] for b in batch]),
        'attention_mask': torch.stack([b['attention_mask'] for b in batch]),
        'emoji_vec': torch.stack([b['emoji_vec'] for b in batch]),
        'labels': torch.stack([b['labels'] for b in batch])
    }

# 評估與訓練流程保持不變


def compute_metrics(pred):
    logits, labels = pred
    preds = np.argmax(logits, axis=1)
    return {
        "accuracy": accuracy_score(labels, preds),
        "macro_f1": f1_score(labels, preds, average='macro'),
        "precision": precision_score(labels, preds, average='macro'),
        "recall": recall_score(labels, preds, average='macro'),
        "auc": roc_auc_score(label_binarize(labels, classes=[0,1,2]), logits, average='macro', multi_class='ovr')
    }

train_set = YouTubeEmojiDataset(yt_train, tokenizer)
test_set = YouTubeEmojiDataset(yt_test, tokenizer)
model = DebertaWithEmoji("./deberta_emote_finetune")

args = TrainingArguments(
    output_dir="./deberta_youtube_sentiment",
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    logging_dir="./logs",
    learning_rate=2e-5,
    logging_steps=100,
    eval_strategy="epoch"
)

trainer = Trainer(
    model=model,
    args=args,
    train_dataset=train_set,
    eval_dataset=test_set,
    data_collator=collate_fn,
    compute_metrics=compute_metrics
)

trainer.train()
eval_result = trainer.evaluate()
print("Evaluation:", eval_result)

# ========== 預測並匯出結果 ==========
import pandas as pd
from torch.utils.data import DataLoader
from tqdm import tqdm
import re

# 切換到評估模式
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# 準備 Dataloader
test_loader = DataLoader(test_set, batch_size=16, collate_fn=collate_fn)

# 儲存欄位
texts = yt_test['CommentText'].tolist()
true_labels = []
pred_labels = []
emoji_lists = []

# 預測 loop
for i, batch in enumerate(tqdm(test_loader, desc="🔍 Predicting")):
    input_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    emoji_vec = batch['emoji_vec'].to(device)
    emoji_pos = batch['emoji_pos'].to(device) # Add this line to get emoji_pos from the batch
    labels = batch['labels'].to(device)

    with torch.no_grad():
        # Pass emoji_pos to the model's forward method
        outputs = model(input_ids=input_ids, attention_mask=attention_mask, emoji_vec=emoji_vec, emoji_pos=emoji_pos)
        logits = outputs["logits"]
        preds = torch.argmax(logits, dim=1)

    true_labels.extend(labels.cpu().tolist())
    pred_labels.extend(preds.cpu().tolist())

# 抽取 emoji 函數
def extract_emojis(text):
    return ' '.join(re.findall(r"[^\w\s,.!?\'\"@#$%^&*()<>+=:;~`]+", text))

emoji_lists = [extract_emojis(t) for t in texts]

# 建立 DataFrame 並匯出
df_result = pd.DataFrame({
    "text": texts,
    "true_label": true_labels,
    "pred_label": pred_labels,
    "emoji_list": emoji_lists
})

df_result.to_csv("youtube_emoji_sentiment_pretrain+position.csv", index=False)
print("✅ 匯出完成：youtube_emoji_sentiment_pretrain+position.csv")


# pretrain (bert)

In [None]:
!pip install transformers --upgrade

In [None]:
# ✅ 完整整合流程：使用 BERT + emoji2vec，無位置特徵
import pandas as pd
import numpy as np
import torch
import random
import re
import math
from datasets import Dataset
from transformers import (
    BertTokenizer, BertForMaskedLM, BertModel,
    BertForSequenceClassification,
    DataCollatorForLanguageModeling,
    TrainingArguments, Trainer, EvalPrediction
)
from torch.utils.data import Dataset as TorchDataset, DataLoader
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score
from sklearn.preprocessing import label_binarize
from gensim.models import KeyedVectors
from tqdm import tqdm
import os

# ========== STEP 1: MLM 預訓練 ==========
elco_df = pd.read_csv("/content/ELCo/ELCo.csv")
templates = [
    lambda em, en: f"{em} usually means {en}.",
    lambda em, en: f"When people use {em}, they are likely feeling {en}.",
    lambda em, en: f"{em} is often used to show {en}.",
    lambda em, en: f"Using {em} might suggest that someone is experiencing {en}.",
    lambda em, en: f"The emoji {em} stands for {en} in many cases."
]
elco_df['text'] = [random.choice(templates)(em, en) for em, en in zip(elco_df["EM"], elco_df["EN"])]

train_texts, val_texts = train_test_split(elco_df['text'], test_size=0.1, random_state=42)
train_dataset = Dataset.from_pandas(pd.DataFrame({'text': train_texts}))
val_dataset = Dataset.from_pandas(pd.DataFrame({'text': val_texts}))

mlm_tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
mlm_model = BertForMaskedLM.from_pretrained("bert-base-uncased")

def tokenize_mlm(example):
    return mlm_tokenizer(example['text'], truncation=True, padding="max_length", max_length=128)

tokenized_train = train_dataset.map(tokenize_mlm, batched=True)
tokenized_val = val_dataset.map(tokenize_mlm, batched=True)

tokenized_train.set_format("torch", columns=["input_ids", "attention_mask"])
tokenized_val.set_format("torch", columns=["input_ids", "attention_mask"])

data_collator = DataCollatorForLanguageModeling(tokenizer=mlm_tokenizer, mlm=True, mlm_probability=0.15)

mlm_args = TrainingArguments(
    output_dir="./bert_elco_mlm",
    num_train_epochs=20,
    per_device_train_batch_size=16,
    save_steps=500,
    logging_steps=100,
    learning_rate=1e-4,
    eval_strategy="epoch",
    save_total_limit=2
)

mlm_trainer = Trainer(
    model=mlm_model,
    args=mlm_args,
    train_dataset=tokenized_train,
    eval_dataset=tokenized_val,
    data_collator=data_collator
)

mlm_trainer.train()
mlm_model.save_pretrained("./bert_elco_mlm")
mlm_tokenizer.save_pretrained("./bert_elco_mlm")

# ========== 1. 讀入資料集 ==========
df = pd.read_csv("/content/drive/MyDrive/____YouTube_Emote____.csv")
df = df.dropna(subset=["premise", "hypothesis", "label"])  # 確保無缺漏資料

# 切分訓練 / 驗證集
from sklearn.model_selection import train_test_split
train_df, val_df = train_test_split(df, test_size=0.1, random_state=42, stratify=df["label"])

train_dataset = Dataset.from_pandas(train_df)
val_dataset = Dataset.from_pandas(val_df)

# ========== 2. Tokenizer ==========
tokenizer = BertTokenizer.from_pretrained("./bert_elco_mlm")


def tokenize_emote(example):
    return tokenizer(example["premise"], example["hypothesis"],
                     truncation=True, padding="max_length", max_length=128)

tokenized_train = train_dataset.map(tokenize_emote, batched=True)
tokenized_val = val_dataset.map(tokenize_emote, batched=True)

for dataset in [tokenized_train, tokenized_val]:
    dataset = dataset.rename_column("label", "labels")
    dataset.set_format("torch", columns=["input_ids", "attention_mask", "labels"])

# ========== 3. 載入 MLM 預訓練模型並 fine-tune ==========
model = BertForSequenceClassification.from_pretrained("./bert_elco_mlm", num_labels=2)

def compute_metrics(p: EvalPrediction):
    preds = p.predictions.argmax(-1)
    labels = p.label_ids
    return {
        "accuracy": accuracy_score(labels, preds),
        "f1": f1_score(labels, preds),
        "precision": precision_score(labels, preds),
        "recall": recall_score(labels, preds)
    }

training_args = TrainingArguments(
    output_dir="./roberta_emote_finetune_youtube",
    num_train_epochs=10,
    per_device_train_batch_size=5,
    per_device_eval_batch_size=16,
    eval_strategy="epoch",  # ✅ 每個 epoch 都 evaluate
    learning_rate=2e-5,
    warmup_steps=300,
    lr_scheduler_type="linear",
    logging_steps=100,
    save_steps=500,
    overwrite_output_dir=True,
    report_to="none"
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train,
    eval_dataset=tokenized_val,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

# ========== 4. 訓練與評估 ==========
trainer.train()
eval_result = trainer.evaluate()
print("📊 Evaluation Results:", eval_result)

# ========== 5. 儲存模型 ==========
model.save_pretrained("./bert_emote_finetune")
tokenizer.save_pretrained("./bert_emote_finetune")

# ========== STEP 2: 情緒分類（含 emoji2vec） ==========
emoji_model = KeyedVectors.load_word2vec_format("/content/emoji2vec/pre-trained/emoji2vec.bin", binary=True)

def get_emoji_vector(text):
    emojis = re.findall(r"[\U00010000-\U0010ffff]", text)
    if not emojis: return np.zeros(300)
    vecs = [emoji_model[e] for e in emojis if e in emoji_model]
    return np.mean(vecs, axis=0) if vecs else np.zeros(300)

train_df = pd.read_csv('/content/drive/MyDrive/final_balanced_training_set(1).csv')
test_df = pd.read_csv('/content/drive/MyDrive/final_test_set_processed(1).csv')
sentiment_map = {"Negative": 0, "Neutral": 1, "Positive": 2}
train_df['Sentiment'] = train_df['Sentiment'].map(sentiment_map)
test_df['Sentiment'] = test_df['Sentiment'].map(sentiment_map)

class YoutubeDatasetWithEmoji(TorchDataset):
    def __init__(self, df, tokenizer, max_length=128):
        self.texts = df['CommentText'].tolist()
        self.labels = df['Sentiment'].tolist()
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self): return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        encoding = self.tokenizer(text, truncation=True, padding="max_length", max_length=self.max_length, return_tensors='pt')
        item = {k: v.squeeze() for k, v in encoding.items()}
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.long)
        item['emoji_vec'] = torch.tensor(get_emoji_vector(text), dtype=torch.float)
        return item

class BertWithEmoji(nn.Module):
    def __init__(self, model_path, emoji_dim=300, num_labels=3):
        super().__init__()
        self.bert = BertModel.from_pretrained(model_path)
        self.dropout = nn.Dropout(0.1)
        self.emoji_proj = nn.Linear(emoji_dim, 768)
        self.classifier = nn.Linear(768 + 768, num_labels)

    def forward(self, input_ids, attention_mask, emoji_vec, labels=None):
        bert_out = self.bert(input_ids=input_ids, attention_mask=attention_mask).pooler_output
        emoji_embed = self.emoji_proj(emoji_vec)
        concat = torch.cat([bert_out, emoji_embed], dim=1)
        logits = self.classifier(self.dropout(concat))
        if labels is not None:
            loss_fn = nn.CrossEntropyLoss()
            loss = loss_fn(logits, labels)
            return {"loss": loss, "logits": logits}
        return {"logits": logits}

def collate_fn(batch):
    return {
        'input_ids': torch.stack([b['input_ids'] for b in batch]),
        'attention_mask': torch.stack([b['attention_mask'] for b in batch]),
        'emoji_vec': torch.stack([b['emoji_vec'] for b in batch]),
        'labels': torch.stack([b['labels'] for b in batch])
    }

def compute_metrics(pred):
    logits, labels = pred
    preds = np.argmax(logits, axis=1)
    return {
        "accuracy": accuracy_score(labels, preds),
        "macro_f1": f1_score(labels, preds, average='macro'),
        "precision": precision_score(labels, preds, average='macro'),
        "recall": recall_score(labels, preds, average='macro'),
        "auc": roc_auc_score(label_binarize(labels, classes=[0, 1, 2]), logits, average='macro', multi_class='ovr')
    }

# ========== Training ==========
tokenizer = BertTokenizer.from_pretrained("./bert_emote_finetune")
model = BertWithEmoji("./bert_emote_finetune")
train_dataset = YoutubeDatasetWithEmoji(train_df, tokenizer)
test_dataset = YoutubeDatasetWithEmoji(test_df, tokenizer)

trainer = Trainer(
    model=model,
    args=TrainingArguments(
        output_dir="./bert_youtube_emoji_sentiment",
        num_train_epochs=3,
        per_device_train_batch_size=16,
        per_device_eval_batch_size=16,
        learning_rate=2e-5,
        eval_strategy="epoch",
        save_total_limit=2,
        logging_steps=50,
    ),
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    compute_metrics=compute_metrics,
    data_collator=collate_fn
)

trainer.train()
print("✅ 評估結果：", trainer.evaluate())

# ========== 預測輸出 ==========
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

loader = DataLoader(test_dataset, batch_size=16, collate_fn=collate_fn)
texts = test_df['CommentText'].tolist()
true_labels, pred_labels = [], []

for batch in tqdm(loader, desc="🔍 Predicting"):
    input_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    emoji_vec = batch['emoji_vec'].to(device)
    labels = batch['labels'].to(device)

    with torch.no_grad():
        outputs = model(input_ids=input_ids, attention_mask=attention_mask, emoji_vec=emoji_vec)
        preds = torch.argmax(outputs["logits"], dim=1)

    true_labels.extend(labels.cpu().tolist())
    pred_labels.extend(preds.cpu().tolist())

emoji_lists = [' '.join(re.findall(r"[^\w\s,.!?\'\"@#$%^&*()<>+=:;~`]+", t)) for t in texts]

pd.DataFrame({
    "text": texts,
    "true_label": true_labels,
    "pred_label": pred_labels,
    "emoji_list": emoji_lists
}).to_csv("youtube_emoji_sentiment_pretrain.csv", index=False)
print("✅ 匯出完成：youtube_emoji_sentiment_pretrain.csv")


# pretrain+position


In [None]:
import pandas as pd
import numpy as np
import torch
import random
import math
from datasets import Dataset
from sklearn.model_selection import train_test_split
from transformers import (
    RobertaTokenizer, RobertaForMaskedLM, DataCollatorForLanguageModeling,
    TrainingArguments, Trainer
)

# ========== 1. 載入 ELCo 資料 ==========
elco_df = pd.read_csv("/content/ELCo/ELCo.csv")

# 建立多樣化 MLM 樣式句子
templates = [
    lambda em, en: f"{em} usually means {en}.",
    lambda em, en: f"When people use {em}, they are likely feeling {en}.",
    lambda em, en: f"{em} is often used to show {en}.",
    lambda em, en: f"Using {em} might suggest that someone is experiencing {en}.",
    lambda em, en: f"The emoji {em} stands for {en} in many cases.",
]
elco_df['text'] = [random.choice(templates)(em, en) for em, en in zip(elco_df["EM"], elco_df["EN"])]

# ========== 2. 切分 train / val ==========
train_texts, val_texts = train_test_split(elco_df['text'], test_size=0.1, random_state=42)
train_dataset = Dataset.from_pandas(pd.DataFrame({'text': train_texts}))
val_dataset = Dataset.from_pandas(pd.DataFrame({'text': val_texts}))

# ========== 3. Tokenizer ==========
tokenizer = RobertaTokenizer.from_pretrained("roberta-base")

def tokenize_mlm(example):
    return tokenizer(example["text"], truncation=True, padding="max_length", max_length=128)

tokenized_train = train_dataset.map(tokenize_mlm, batched=True)
tokenized_val = val_dataset.map(tokenize_mlm, batched=True)

tokenized_train.set_format(type="torch", columns=["input_ids", "attention_mask"])
tokenized_val.set_format(type="torch", columns=["input_ids", "attention_mask"])

# ========== 4. 訓練模型 ==========
mlm_model = RobertaForMaskedLM.from_pretrained("roberta-base")
data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=True, mlm_probability=0.15)

mlm_args = TrainingArguments(
    output_dir="./roberta_elco_mlm",
    num_train_epochs=20,
    per_device_train_batch_size=16,
    save_steps=500,
    logging_steps=100,
    learning_rate=1e-4,
    eval_strategy="epoch",  # ✅ 每個 epoch 都 evaluate
    save_total_limit=2
)

mlm_trainer = Trainer(
    model=mlm_model,
    args=mlm_args,
    train_dataset=tokenized_train,
    eval_dataset=tokenized_val,
    data_collator=data_collator,
)

mlm_trainer.train()

# ========== 5. Evaluate：計算 Perplexity ==========
eval_results = mlm_trainer.evaluate()
perplexity = math.exp(eval_results["eval_loss"])
print(f"🔍 Evaluation Perplexity: {perplexity:.2f}")

# ========== 6. 儲存模型 ==========
mlm_model.save_pretrained("./roberta_elco_mlm")
tokenizer.save_pretrained("./roberta_elco_mlm")

import pandas as pd
import math
from datasets import Dataset
from transformers import (
    RobertaTokenizer, RobertaForSequenceClassification,
    TrainingArguments, Trainer, EvalPrediction
)
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

# ========== 1. 讀入資料集 ==========
df = pd.read_csv("/content/drive/MyDrive/____YouTube_Emote____.csv")
df = df.dropna(subset=["premise", "hypothesis", "label"])  # 確保無缺漏資料

# 切分訓練 / 驗證集
from sklearn.model_selection import train_test_split
train_df, val_df = train_test_split(df, test_size=0.1, random_state=42, stratify=df["label"])

train_dataset = Dataset.from_pandas(train_df)
val_dataset = Dataset.from_pandas(val_df)

# ========== 2. Tokenizer ==========
tokenizer = RobertaTokenizer.from_pretrained("./roberta_elco_mlm")

def tokenize_emote(example):
    return tokenizer(example["premise"], example["hypothesis"],
                     truncation=True, padding="max_length", max_length=128)

tokenized_train = train_dataset.map(tokenize_emote, batched=True)
tokenized_val = val_dataset.map(tokenize_emote, batched=True)

for dataset in [tokenized_train, tokenized_val]:
    dataset = dataset.rename_column("label", "labels")
    dataset.set_format("torch", columns=["input_ids", "attention_mask", "labels"])

# ========== 3. 載入 MLM 預訓練模型並 fine-tune ==========
model = RobertaForSequenceClassification.from_pretrained("./roberta_elco_mlm", num_labels=2)

def compute_metrics(p: EvalPrediction):
    preds = p.predictions.argmax(-1)
    labels = p.label_ids
    return {
        "accuracy": accuracy_score(labels, preds),
        "f1": f1_score(labels, preds),
        "precision": precision_score(labels, preds),
        "recall": recall_score(labels, preds)
    }

training_args = TrainingArguments(
    output_dir="./roberta_emote_finetune_youtube",
    num_train_epochs=10,
    per_device_train_batch_size=5,
    per_device_eval_batch_size=16,
    eval_strategy="epoch",  # ✅ 每個 epoch 都 evaluate
    learning_rate=2e-5,
    warmup_steps=300,
    lr_scheduler_type="linear",
    logging_steps=100,
    save_steps=500,
    overwrite_output_dir=True,
    report_to="none"
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train,
    eval_dataset=tokenized_val,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

# ========== 4. 訓練與評估 ==========
trainer.train()
eval_result = trainer.evaluate()
print("📊 Evaluation Results:", eval_result)

# ========== 5. 儲存模型 ==========
model.save_pretrained("./roberta_emote_finetune")
tokenizer.save_pretrained("./roberta_emote_finetune")


# ✅ STEP 3: YouTube 留言情緒分析（emoji2vec + emoji位置）
import torch
import torch.nn as nn
from torch.utils.data import Dataset as TorchDataset
from gensim.models import KeyedVectors
from transformers import RobertaTokenizer, TrainingArguments, Trainer
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from sklearn.preprocessing import label_binarize
import numpy as np
import pandas as pd
import re

# 載入 emoji2vec 向量模型
emoji_model = KeyedVectors.load_word2vec_format("/content/emoji2vec/pre-trained/emoji2vec.bin", binary=True)

# 擷取 emoji 向量平均
def get_emoji_vec(text):
    emojis = re.findall(r"[\U00010000-\U0010ffff]", text)
    if not emojis: return np.zeros(emoji_model.vector_size)
    vecs = [emoji_model[e] for e in emojis if e in emoji_model]
    return np.mean(vecs, axis=0) if vecs else np.zeros(emoji_model.vector_size)

# 判斷 emoji 是否出現在留言後半段
def get_emoji_position(text):
    emojis = re.findall(r"[\U00010000-\U0010ffff]", text)
    if not emojis: return 0  # 無 emoji 則預設為前段
    emoji_str = ''.join(emojis)
    idx = text.find(emoji_str)
    return 1 if idx / max(1, len(text)) > 0.5 else 0

# 載入資料
yt_train = pd.read_csv('/content/drive/MyDrive/final_balanced_training_set(1).csv')
yt_test = pd.read_csv('/content/drive/MyDrive/final_test_set_processed(1).csv')
label_map = {"Negative": 0, "Neutral": 1, "Positive": 2}
yt_train["Sentiment"] = yt_train["Sentiment"].map(label_map)
yt_test["Sentiment"] = yt_test["Sentiment"].map(label_map)

tokenizer = RobertaTokenizer.from_pretrained("roberta-base")

class YouTubeEmojiDataset(TorchDataset):
    def __init__(self, df, tokenizer, max_len=128):
        self.df = df
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self): return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        text = row['CommentText']
        label = row['Sentiment']
        encoding = self.tokenizer(text, truncation=True, padding="max_length", max_length=self.max_len, return_tensors="pt")
        emoji_vec = get_emoji_vec(text)
        emoji_pos = get_emoji_position(text)
        return {
            'input_ids': encoding['input_ids'].squeeze(),
            'attention_mask': encoding['attention_mask'].squeeze(),
            'labels': torch.tensor(label, dtype=torch.long),
            'emoji_vec': torch.tensor(emoji_vec, dtype=torch.float32),
            'emoji_pos': torch.tensor(emoji_pos, dtype=torch.long)
        }

class RobertaWithEmoji(nn.Module):
    def __init__(self, model_path, emoji_dim=300, pos_dim=16, num_labels=3):
        super().__init__()
        from transformers import RobertaModel
        self.roberta = RobertaModel.from_pretrained(model_path)
        self.emoji_proj = nn.Linear(emoji_dim, 768)
        self.pos_embedding = nn.Embedding(2, pos_dim)
        self.classifier = nn.Linear(768 + 768 + pos_dim, num_labels)
        self.dropout = nn.Dropout(0.1)

    def forward(self, input_ids, attention_mask, emoji_vec, emoji_pos, labels=None):
        roberta_out = self.roberta(input_ids=input_ids, attention_mask=attention_mask).pooler_output
        emoji_embed = self.emoji_proj(emoji_vec)
        pos_embed = self.pos_embedding(emoji_pos)
        combined = torch.cat([roberta_out, emoji_embed, pos_embed], dim=1)
        logits = self.classifier(self.dropout(combined))
        if labels is not None:
            loss_fn = nn.CrossEntropyLoss()
            loss = loss_fn(logits, labels)
            return {"loss": loss, "logits": logits}
        return {"logits": logits}

def collate_fn(batch):
    return {
        'input_ids': torch.stack([b['input_ids'] for b in batch]),
        'attention_mask': torch.stack([b['attention_mask'] for b in batch]),
        'emoji_vec': torch.stack([b['emoji_vec'] for b in batch]),
        'emoji_pos': torch.stack([b['emoji_pos'] for b in batch]),
        'labels': torch.stack([b['labels'] for b in batch])
    }

def compute_metrics(pred):
    logits, labels = pred
    preds = np.argmax(logits, axis=1)
    return {
        "accuracy": accuracy_score(labels, preds),
        "macro_f1": f1_score(labels, preds, average='macro'),
        "precision": precision_score(labels, preds, average='macro'),
        "recall": recall_score(labels, preds, average='macro'),
        "auc": roc_auc_score(label_binarize(labels, classes=[0,1,2]), logits, average='macro', multi_class='ovr')
    }

# 準備訓練
train_set = YouTubeEmojiDataset(yt_train, tokenizer)
test_set = YouTubeEmojiDataset(yt_test, tokenizer)
model = RobertaWithEmoji("./roberta_emote_finetune")

args = TrainingArguments(
    output_dir="./roberta_youtube_sentiment",
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    logging_dir="./logs",
    learning_rate=2e-5,
    logging_steps=100,
    eval_strategy="epoch"
)

trainer = Trainer(
    model=model,
    args=args,
    train_dataset=train_set,
    eval_dataset=test_set,
    data_collator=collate_fn,
    compute_metrics=compute_metrics
)

trainer.train()
trainer.evaluate()
# ========== 預測並匯出結果 ==========
import pandas as pd
from torch.utils.data import DataLoader
from tqdm import tqdm
import re

# 切換到評估模式
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# 準備 Dataloader
test_loader = DataLoader(test_set, batch_size=16, collate_fn=collate_fn)

# 儲存欄位
texts = yt_test['CommentText'].tolist()
true_labels = []
pred_labels = []
emoji_lists = []

# 預測 loop
for i, batch in enumerate(tqdm(test_loader, desc="🔍 Predicting")):
    input_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    emoji_vec = batch['emoji_vec'].to(device)
    emoji_pos = batch['emoji_pos'].to(device) # Add this line to get emoji_pos from the batch
    labels = batch['labels'].to(device)

    with torch.no_grad():
        # Pass emoji_pos to the model's forward method
        outputs = model(input_ids=input_ids, attention_mask=attention_mask, emoji_vec=emoji_vec, emoji_pos=emoji_pos)
        logits = outputs["logits"]
        preds = torch.argmax(logits, dim=1)

    true_labels.extend(labels.cpu().tolist())
    pred_labels.extend(preds.cpu().tolist())

# 抽取 emoji 函數
def extract_emojis(text):
    return ' '.join(re.findall(r"[^\w\s,.!?\'\"@#$%^&*()<>+=:;~`]+", text))

emoji_lists = [extract_emojis(t) for t in texts]

# 建立 DataFrame 並匯出
df_result = pd.DataFrame({
    "text": texts,
    "true_label": true_labels,
    "pred_label": pred_labels,
    "emoji_list": emoji_lists
})

df_result.to_csv("youtube_emoji_sentiment_pretrain+position.csv", index=False)
print("✅ 匯出完成：youtube_emoji_sentiment_pretrain+position.csv")


# pretrain+position


In [None]:
# ✅ STEP 3: YouTube 留言情緒分析（emoji2vec + emoji位置）
import torch
import torch.nn as nn
from torch.utils.data import Dataset as TorchDataset
from gensim.models import KeyedVectors
from transformers import RobertaTokenizer, TrainingArguments, Trainer
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from sklearn.preprocessing import label_binarize
import numpy as np
import pandas as pd
import re

# 載入 emoji2vec 向量模型
emoji_model = KeyedVectors.load_word2vec_format("/content/emoji2vec/pre-trained/emoji2vec.bin", binary=True)

# 擷取 emoji 向量平均
def get_emoji_vec(text):
    emojis = re.findall(r"[\U00010000-\U0010ffff]", text)
    if not emojis: return np.zeros(emoji_model.vector_size)
    vecs = [emoji_model[e] for e in emojis if e in emoji_model]
    return np.mean(vecs, axis=0) if vecs else np.zeros(emoji_model.vector_size)

# 判斷 emoji 是否出現在留言後半段
def get_emoji_position(text):
    emojis = re.findall(r"[\U00010000-\U0010ffff]", text)
    if not emojis: return 0  # 無 emoji 則預設為前段
    emoji_str = ''.join(emojis)
    idx = text.find(emoji_str)
    return 1 if idx / max(1, len(text)) > 0.5 else 0

# 載入資料
yt_train = pd.read_csv('/content/drive/MyDrive/final_balanced_training_set(1).csv')
yt_test = pd.read_csv('/content/drive/MyDrive/final_test_set_processed(1).csv')
label_map = {"Negative": 0, "Neutral": 1, "Positive": 2}
yt_train["Sentiment"] = yt_train["Sentiment"].map(label_map)
yt_test["Sentiment"] = yt_test["Sentiment"].map(label_map)

tokenizer = RobertaTokenizer.from_pretrained("roberta-base")

class YouTubeEmojiDataset(TorchDataset):
    def __init__(self, df, tokenizer, max_len=128):
        self.df = df
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self): return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        text = row['CommentText']
        label = row['Sentiment']
        encoding = self.tokenizer(text, truncation=True, padding="max_length", max_length=self.max_len, return_tensors="pt")
        emoji_vec = get_emoji_vec(text)
        emoji_pos = get_emoji_position(text)
        return {
            'input_ids': encoding['input_ids'].squeeze(),
            'attention_mask': encoding['attention_mask'].squeeze(),
            'labels': torch.tensor(label, dtype=torch.long),
            'emoji_vec': torch.tensor(emoji_vec, dtype=torch.float32),
            'emoji_pos': torch.tensor(emoji_pos, dtype=torch.long)
        }

class RobertaWithEmoji(nn.Module):
    def __init__(self, model_path, emoji_dim=300, pos_dim=16, num_labels=3):
        super().__init__()
        from transformers import RobertaModel
        self.roberta = RobertaModel.from_pretrained(model_path)
        self.emoji_proj = nn.Linear(emoji_dim, 768)
        self.pos_embedding = nn.Embedding(2, pos_dim)
        self.classifier = nn.Linear(768 + 768 + pos_dim, num_labels)
        self.dropout = nn.Dropout(0.1)

    def forward(self, input_ids, attention_mask, emoji_vec, emoji_pos, labels=None):
        roberta_out = self.roberta(input_ids=input_ids, attention_mask=attention_mask).pooler_output
        emoji_embed = self.emoji_proj(emoji_vec)
        pos_embed = self.pos_embedding(emoji_pos)
        combined = torch.cat([roberta_out, emoji_embed, pos_embed], dim=1)
        logits = self.classifier(self.dropout(combined))
        if labels is not None:
            loss_fn = nn.CrossEntropyLoss()
            loss = loss_fn(logits, labels)
            return {"loss": loss, "logits": logits}
        return {"logits": logits}

def collate_fn(batch):
    return {
        'input_ids': torch.stack([b['input_ids'] for b in batch]),
        'attention_mask': torch.stack([b['attention_mask'] for b in batch]),
        'emoji_vec': torch.stack([b['emoji_vec'] for b in batch]),
        'emoji_pos': torch.stack([b['emoji_pos'] for b in batch]),
        'labels': torch.stack([b['labels'] for b in batch])
    }

def compute_metrics(pred):
    logits, labels = pred
    preds = np.argmax(logits, axis=1)
    return {
        "accuracy": accuracy_score(labels, preds),
        "macro_f1": f1_score(labels, preds, average='macro'),
        "precision": precision_score(labels, preds, average='macro'),
        "recall": recall_score(labels, preds, average='macro'),
        "auc": roc_auc_score(label_binarize(labels, classes=[0,1,2]), logits, average='macro', multi_class='ovr')
    }

# 準備訓練
train_set = YouTubeEmojiDataset(yt_train, tokenizer)
test_set = YouTubeEmojiDataset(yt_test, tokenizer)
model = RobertaWithEmoji("./roberta_emote_finetune")

args = TrainingArguments(
    output_dir="./roberta_youtube_sentiment",
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    logging_dir="./logs",
    learning_rate=2e-5,
    logging_steps=100,
    eval_strategy="epoch"
)

trainer = Trainer(
    model=model,
    args=args,
    train_dataset=train_set,
    eval_dataset=test_set,
    data_collator=collate_fn,
    compute_metrics=compute_metrics
)

trainer.train()
trainer.evaluate()
# ========== 預測並匯出結果 ==========
import pandas as pd
from torch.utils.data import DataLoader
from tqdm import tqdm
import re

# 切換到評估模式
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# 準備 Dataloader
test_loader = DataLoader(test_set, batch_size=16, collate_fn=collate_fn)

# 儲存欄位
texts = yt_test['CommentText'].tolist()
true_labels = []
pred_labels = []
emoji_lists = []

# 預測 loop
for i, batch in enumerate(tqdm(test_loader, desc="🔍 Predicting")):
    input_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    emoji_vec = batch['emoji_vec'].to(device)
    emoji_pos = batch['emoji_pos'].to(device) # Add this line to get emoji_pos from the batch
    labels = batch['labels'].to(device)

    with torch.no_grad():
        # Pass emoji_pos to the model's forward method
        outputs = model(input_ids=input_ids, attention_mask=attention_mask, emoji_vec=emoji_vec, emoji_pos=emoji_pos)
        logits = outputs["logits"]
        preds = torch.argmax(logits, dim=1)

    true_labels.extend(labels.cpu().tolist())
    pred_labels.extend(preds.cpu().tolist())

# 抽取 emoji 函數
def extract_emojis(text):
    return ' '.join(re.findall(r"[^\w\s,.!?\'\"@#$%^&*()<>+=:;~`]+", text))

emoji_lists = [extract_emojis(t) for t in texts]

# 建立 DataFrame 並匯出
df_result = pd.DataFrame({
    "text": texts,
    "true_label": true_labels,
    "pred_label": pred_labels,
    "emoji_list": emoji_lists
})

df_result.to_csv("youtube_emoji_sentiment_pretrain+position.csv", index=False)
print("✅ 匯出完成：youtube_emoji_sentiment_pretrain+position.csv")


# pretrain+position (deberta)


In [None]:
# ✅ 整合後：使用 microsoft/deberta-base 實作完整流程
import pandas as pd
import numpy as np
import torch
import random
import math
import re
from datasets import Dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from sklearn.preprocessing import label_binarize
from transformers import (
    DebertaTokenizer, DebertaForMaskedLM, DebertaModel,
    DebertaForSequenceClassification,
    DataCollatorForLanguageModeling,
    TrainingArguments, Trainer, EvalPrediction
)
from torch.utils.data import Dataset as TorchDataset, DataLoader
import torch.nn as nn
from gensim.models import KeyedVectors
from tqdm import tqdm

# ====== STEP 1: MLM 預訓練 (ELCo) ======
elco_df = pd.read_csv("/content/ELCo/ELCo.csv")
templates = [
    lambda em, en: f"{em} usually means {en}.",
    lambda em, en: f"When people use {em}, they are likely feeling {en}.",
    lambda em, en: f"{em} is often used to show {en}.",
    lambda em, en: f"Using {em} might suggest that someone is experiencing {en}.",
    lambda em, en: f"The emoji {em} stands for {en} in many cases."
]
elco_df['text'] = [random.choice(templates)(em, en) for em, en in zip(elco_df['EM'], elco_df['EN'])]
train_texts, val_texts = train_test_split(elco_df['text'], test_size=0.1, random_state=42)
train_dataset = Dataset.from_pandas(pd.DataFrame({'text': train_texts}))
val_dataset = Dataset.from_pandas(pd.DataFrame({'text': val_texts}))

tokenizer = DebertaTokenizer.from_pretrained("microsoft/deberta-base")
def tokenize_mlm(example):
    return tokenizer(example['text'], truncation=True, padding="max_length", max_length=128)

tokenized_train = train_dataset.map(tokenize_mlm, batched=True)
tokenized_val = val_dataset.map(tokenize_mlm, batched=True)

tokenized_train.set_format(type="torch", columns=["input_ids", "attention_mask"])
tokenized_val.set_format(type="torch", columns=["input_ids", "attention_mask"])

mlm_model = DebertaForMaskedLM.from_pretrained("microsoft/deberta-base")
data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=True, mlm_probability=0.15)

mlm_args = TrainingArguments(
    output_dir="./deberta_elco_mlm",
    num_train_epochs=20,
    per_device_train_batch_size=16,
    save_steps=500,
    logging_steps=100,
    learning_rate=1e-4,
    eval_strategy="epoch",
    save_total_limit=2
)

mlm_trainer = Trainer(
    model=mlm_model,
    args=mlm_args,
    train_dataset=tokenized_train,
    eval_dataset=tokenized_val,
    data_collator=data_collator,
)
mlm_trainer.train()
mlm_model.save_pretrained("./deberta_elco_mlm")
tokenizer.save_pretrained("./deberta_elco_mlm")

# ====== STEP 2: Entailment fine-tuning ======
df = pd.read_csv("/content/drive/MyDrive/____YouTube_Emote____.csv").dropna()
train_df, val_df = train_test_split(df, test_size=0.1, random_state=42, stratify=df["label"])
train_dataset = Dataset.from_pandas(train_df)
val_dataset = Dataset.from_pandas(val_df)

tokenizer = DebertaTokenizer.from_pretrained("./deberta_elco_mlm")
def tokenize_emote(example):
    return tokenizer(example["premise"], example["hypothesis"], truncation=True, padding="max_length", max_length=128)

tokenized_train = train_dataset.map(tokenize_emote, batched=True)
tokenized_val = val_dataset.map(tokenize_emote, batched=True)
tokenized_train = tokenized_train.rename_column("label", "labels")
tokenized_val = tokenized_val.rename_column("label", "labels")
tokenized_train.set_format("torch", columns=["input_ids", "attention_mask", "labels"])
tokenized_val.set_format("torch", columns=["input_ids", "attention_mask", "labels"])

model = DebertaForSequenceClassification.from_pretrained("./deberta_elco_mlm", num_labels=2)

def compute_metrics(p: EvalPrediction):
    preds = p.predictions.argmax(-1)
    labels = p.label_ids
    return {
        "accuracy": accuracy_score(labels, preds),
        "f1": f1_score(labels, preds),
        "precision": precision_score(labels, preds),
        "recall": recall_score(labels, preds)
    }

training_args = TrainingArguments(
    output_dir="./deberta_emote_finetune",
    num_train_epochs=10,
    per_device_train_batch_size=5,
    per_device_eval_batch_size=16,
    eval_strategy="epoch",
    learning_rate=2e-5,
    warmup_steps=300,
    lr_scheduler_type="linear",
    logging_steps=100,
    save_steps=500,
    overwrite_output_dir=True,
    report_to="none"
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train,
    eval_dataset=tokenized_val,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)
trainer.train()
model.save_pretrained("./deberta_emote_finetune")

# ====== STEP 3: YouTube 情緒分析 with emoji features ======
def get_emoji_vec(text):
    emojis = re.findall(r"[\U00010000-\U0010ffff]", text)
    if not emojis: return np.zeros(300)
    vecs = [emoji_model[e] for e in emojis if e in emoji_model]
    return np.mean(vecs, axis=0) if vecs else np.zeros(300)

def get_emoji_position(text):
    emojis = re.findall(r"[\U00010000-\U0010ffff]", text)
    if not emojis: return 0
    emoji_str = ''.join(emojis)
    idx = text.find(emoji_str)
    return 1 if idx / max(1, len(text)) > 0.5 else 0

emoji_model = KeyedVectors.load_word2vec_format("/content/emoji2vec/pre-trained/emoji2vec.bin", binary=True)
yt_train = pd.read_csv('/content/drive/MyDrive/final_balanced_training_set(1).csv')
yt_test = pd.read_csv('/content/drive/MyDrive/final_test_set_processed(1).csv')
label_map = {"Negative": 0, "Neutral": 1, "Positive": 2}
yt_train["Sentiment"] = yt_train["Sentiment"].map(label_map)
yt_test["Sentiment"] = yt_test["Sentiment"].map(label_map)

tokenizer = DebertaTokenizer.from_pretrained("microsoft/deberta-base")

class YouTubeEmojiDataset(TorchDataset):
    def __init__(self, df, tokenizer, max_len=128):
        self.df = df
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self): return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        text = row['CommentText']
        label = row['Sentiment']
        encoding = self.tokenizer(text, truncation=True, padding="max_length", max_length=self.max_len, return_tensors="pt")
        emoji_vec = get_emoji_vec(text)
        emoji_pos = get_emoji_position(text)
        return {
            'input_ids': encoding['input_ids'].squeeze(),
            'attention_mask': encoding['attention_mask'].squeeze(),
            'labels': torch.tensor(label, dtype=torch.long),
            'emoji_vec': torch.tensor(emoji_vec, dtype=torch.float32),
            'emoji_pos': torch.tensor(emoji_pos, dtype=torch.long)
        }

class DebertaWithEmoji(nn.Module):
    def __init__(self, model_path, emoji_dim=300, pos_dim=16, num_labels=3):
        super().__init__()
        self.deberta = DebertaModel.from_pretrained(model_path)
        self.emoji_proj = nn.Linear(emoji_dim, 768)
        self.pos_embedding = nn.Embedding(2, pos_dim)
        self.classifier = nn.Linear(768 + 768 + pos_dim, num_labels)
        self.dropout = nn.Dropout(0.1)

    def forward(self, input_ids, attention_mask, emoji_vec, emoji_pos, labels=None):
        deberta_out = self.deberta(input_ids=input_ids, attention_mask=attention_mask).last_hidden_state[:, 0]
        emoji_embed = self.emoji_proj(emoji_vec)
        pos_embed = self.pos_embedding(emoji_pos)
        combined = torch.cat([deberta_out, emoji_embed, pos_embed], dim=1)
        logits = self.classifier(self.dropout(combined))
        if labels is not None:
            loss_fn = nn.CrossEntropyLoss()
            loss = loss_fn(logits, labels)
            return {"loss": loss, "logits": logits}
        return {"logits": logits}

def collate_fn(batch):
    return {
        'input_ids': torch.stack([b['input_ids'] for b in batch]),
        'attention_mask': torch.stack([b['attention_mask'] for b in batch]),
        'emoji_vec': torch.stack([b['emoji_vec'] for b in batch]),
        'emoji_pos': torch.stack([b['emoji_pos'] for b in batch]),
        'labels': torch.stack([b['labels'] for b in batch])
    }

def compute_metrics(pred):
    logits, labels = pred
    preds = np.argmax(logits, axis=1)
    return {
        "accuracy": accuracy_score(labels, preds),
        "macro_f1": f1_score(labels, preds, average='macro'),
        "precision": precision_score(labels, preds, average='macro'),
        "recall": recall_score(labels, preds, average='macro'),
        "auc": roc_auc_score(label_binarize(labels, classes=[0,1,2]), logits, average='macro', multi_class='ovr')
    }

train_set = YouTubeEmojiDataset(yt_train, tokenizer)
test_set = YouTubeEmojiDataset(yt_test, tokenizer)
model = DebertaWithEmoji("./deberta_emote_finetune")

args = TrainingArguments(
    output_dir="./deberta_youtube_sentiment",
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    logging_dir="./logs",
    learning_rate=2e-5,
    logging_steps=100,
    eval_strategy="epoch"
)

trainer = Trainer(
    model=model,
    args=args,
    train_dataset=train_set,
    eval_dataset=test_set,
    data_collator=collate_fn,
    compute_metrics=compute_metrics
)

trainer.train()
eval_result = trainer.evaluate()
print("Evaluation:", eval_result)

# ========== 預測並匯出結果 ==========
import pandas as pd
from torch.utils.data import DataLoader
from tqdm import tqdm
import re

# 切換到評估模式
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# 準備 Dataloader
test_loader = DataLoader(test_set, batch_size=16, collate_fn=collate_fn)

# 儲存欄位
texts = yt_test['CommentText'].tolist()
true_labels = []
pred_labels = []
emoji_lists = []

# 預測 loop
for i, batch in enumerate(tqdm(test_loader, desc="🔍 Predicting")):
    input_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    emoji_vec = batch['emoji_vec'].to(device)
    emoji_pos = batch['emoji_pos'].to(device) # Add this line to get emoji_pos from the batch
    labels = batch['labels'].to(device)

    with torch.no_grad():
        # Pass emoji_pos to the model's forward method
        outputs = model(input_ids=input_ids, attention_mask=attention_mask, emoji_vec=emoji_vec, emoji_pos=emoji_pos)
        logits = outputs["logits"]
        preds = torch.argmax(logits, dim=1)

    true_labels.extend(labels.cpu().tolist())
    pred_labels.extend(preds.cpu().tolist())

# 抽取 emoji 函數
def extract_emojis(text):
    return ' '.join(re.findall(r"[^\w\s,.!?\'\"@#$%^&*()<>+=:;~`]+", text))

emoji_lists = [extract_emojis(t) for t in texts]

# 建立 DataFrame 並匯出
df_result = pd.DataFrame({
    "text": texts,
    "true_label": true_labels,
    "pred_label": pred_labels,
    "emoji_list": emoji_lists
})

df_result.to_csv("youtube_emoji_sentiment_pretrain+position.csv", index=False)
print("✅ 匯出完成：youtube_emoji_sentiment_pretrain+position.csv")
