# WangchanBERTa

# Installation

Install dependencies at specific versions to make sure WangchanBERTa works.

In [None]:
try:
    import transformers
except ImportError:
    %pip install transformers

try:
    import emoji
except ImportError:
    %pip install emoji

try:
    import pythainlp
except ImportError:
    %pip install pythainlp

try:
    import sefr_cut
except ImportError:
    %pip install sefr_cut

try:
    import tinydb
except ImportError:
    %pip install tinydb

try:
    import seqeval
except ImportError:
    %pip install seqeval

try:
    import sentencepiece
except ImportError:
    %pip install sentencepiece

try:
    import pydantic
except ImportError:
    %pip install pydantic

try:
    import jsonlines
except ImportError:
    %pip install jsonlines

try:
    import thai2transformers
except ImportError:
    %pip install --no-deps thai2transformers==0.1.2

try:
    import torch
except ImportError:
    %pip install torch

try:
    import tf_keras
except ImportError:
    %pip install tf_keras

try:
    import pytorch_lightning
except ImportError:
    %pip install pytorch-lightning

try:
    import datasets
except ImportError:
    %pip install datasets

try:
    import matplotlib
except ImportError:
    %pip install matplotlib

try:
    import tokenizers
except ImportError:
    %pip install tokenizers==0.9.3

In [None]:
# Core libraries
import os
import numpy as np
import torch
import pandas as pd
import matplotlib.pyplot as plt

# Progress bar
from tqdm.auto import tqdm

# Hugging Face libraries
from huggingface_hub import login
from transformers import (
    AutoModelForSequenceClassification, 
    Trainer, 
    TrainingArguments, 
    AutoTokenizer, 
    default_data_collator,
    pipeline,
    CamembertTokenizer,
    AutoModel,
    AutoModelForMaskedLM,
    AutoModelForTokenClassification
)

# Datasets
from datasets import Dataset

# Scikit-learn metrics and model selection
from sklearn.metrics import (
    accuracy_score, 
    precision_score, 
    recall_score, 
    f1_score, 
    classification_report, 
    confusion_matrix, 
    ConfusionMatrixDisplay
)
from sklearn.model_selection import GroupKFold

# Thai2Transformers
import thai2transformers
from thai2transformers.preprocess import process_transformers
from thai2transformers.metrics import (
    classification_metrics,
    multilabel_classification_metrics,
)
from thai2transformers.tokenizers import (
    ThaiRobertaTokenizer,
    ThaiWordsNewmmTokenizer,
    ThaiWordsSyllableTokenizer,
    FakeSefrCutTokenizer,
    SEFR_SPLIT_TOKEN
)

# Setting Files

In [None]:
train_data = pd.read_csv('train_1200.csv')
test_data = pd.read_csv("newtestcase_1200.csv")

# Choose Pretrained Model

In [None]:
model_names = [
    'wangchanberta-base-att-spm-uncased',
]

tokenizers = {
    'wangchanberta-base-att-spm-uncased': AutoTokenizer,
    'xlm-roberta-base': AutoTokenizer,
    'bert-base-multilingual-cased': AutoTokenizer,
    'wangchanberta-base-wiki-newmm': ThaiWordsNewmmTokenizer,
    'wangchanberta-base-wiki-ssg': ThaiWordsSyllableTokenizer,
    'wangchanberta-base-wiki-sefr': FakeSefrCutTokenizer,
    'wangchanberta-base-wiki-spm': ThaiRobertaTokenizer,
}
public_models = ['xlm-roberta-base', 'bert-base-multilingual-cased']
model_name = "wangchanberta-base-att-spm-uncased"

#create tokenizer
tokenizer = tokenizers[model_name].from_pretrained(
                f'airesearch/{model_name}' if model_name not in public_models else f'{model_name}',
                revision='main',
                model_max_length=416,)


# Sequence Classification

In [None]:
dataset_name = "wongnai_reviews"

#pipeline
classify_multiclass = pipeline(task='sentiment-analysis',
         tokenizer=tokenizer,
         model = f'airesearch/{model_name}' if model_name not in public_models else f'{model_name}',
         revision = f'finetuned@{dataset_name}',
        device=0)

In [None]:
input_text = "พนักงานบริการแย่มาก พูดจาไม่ดี ไม่อยากกลับมาอีกแล้ว"

preprocess_input_text = True
if preprocess_input_text:
    if model_name not in public_models:
        input_text = process_transformers(input_text)

classify_multiclass(input_text)

# Test Performance Model

In [None]:
login(token="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")

In [None]:
if "review_body" not in test_data.columns or "stars" not in test_data.columns:
    raise ValueError("CSV ต้องมีคอลัมน์ 'review_body' และ 'stars'")

In [None]:
y_true_base = []
y_pred_base = []
results_base = []

In [None]:
for index, row in tqdm(test_data.iterrows(), total=len(test_data), desc="Processing Reviews"):
    review_text = row["review_body"]  
    actual_label = str(row["stars"])  

    if preprocess_input_text and model_name not in public_models:
        review_text = process_transformers(review_text)

    prediction = classify_multiclass(review_text)

    predicted_label = prediction[0]["label"]

    y_true_base.append(actual_label)
    y_pred_base.append(predicted_label)

    is_correct = predicted_label == actual_label

    result_text = (
        f"text: {review_text}\n"
        f"label: {actual_label}\n"
        f"predict: {prediction}\n"
        f"result: {is_correct}\n"
        "--------------------------------------------"
    )

    results_base.append(result_text)

In [None]:
accuracy = accuracy_score(y_true_base, y_pred_base) * 100
precision = precision_score(y_true_base, y_pred_base, average='weighted') * 100
recall = recall_score(y_true_base, y_pred_base, average='weighted') * 100
f1 = f1_score(y_true_base, y_pred_base, average='weighted') * 100

In [None]:
for res in results_base:
    print(res)

In [None]:
print("============= Model Performance Summary =============")
print(f"Total Samples: {len(test_data)}")
print(f"Accuracy: {accuracy:.2f}%")
print(f"Precision: {precision:.2f}%")
print(f"Recall: {recall:.2f}%")
print(f"F1-score: {f1:.2f}%")

print("\n=============== Classification Report ===============")
print(classification_report(y_true_base, y_pred_base, digits=4))

In [None]:
cm = confusion_matrix(y_true_base, y_pred_base)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot(cmap=plt.cm.Greens)
plt.title('Base Model')
plt.show()

# Fine-Tune

In [None]:
print("PyTorch Version:", torch.__version__)
print("CUDA Available:", torch.cuda.is_available())
print("CUDA Version:", torch.version.cuda)
print("Available GPUs:", torch.cuda.device_count())

In [None]:
model = AutoModelForSequenceClassification.from_pretrained(
    f'airesearch/{model_name}' if model_name not in public_models else f'{model_name}',
    num_labels=5, 
    hidden_dropout_prob=0.2, 
    attention_probs_dropout_prob=0.2 
)

if os.path.exists("./results"):
    os.system("rm -rf ./results")

# Hyperparameters
training_args = TrainingArguments(
    output_dir="./results",       
    num_train_epochs=10,          
    per_device_train_batch_size=16,  
    per_device_eval_batch_size=32,
    gradient_accumulation_steps=2,  
    warmup_steps=200,             
    weight_decay=0.01,            
    logging_dir="./logs",         
    logging_steps=10,             
    evaluation_strategy="epoch",  
    save_strategy="epoch",        
    save_total_limit=1,           
    load_best_model_at_end=True,  
    metric_for_best_model="accuracy",  
    greater_is_better=True,       
    fp16=True,                    
    report_to="none",              
)

def compute_metrics(p):
    predictions, labels = p
    preds = np.argmax(predictions, axis=1)
    return {'accuracy': accuracy_score(labels, preds)}

# ใช้ default_data_collator แทน
data_collator = default_data_collator

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

## Cross-Validation

In [None]:
train_data['stars'] = train_data['stars'] - 1

In [None]:
print("ค่าของ stars ที่ไม่อยู่ในช่วง 0-4:", train_data[~train_data['stars'].between(0, 4)])
print("จำนวนตัวอย่างของแต่ละค่า:")
print(train_data['stars'].value_counts())

In [None]:
model_name = "airesearch/wangchanberta-base-att-spm-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_name)

# Tokenization
train_encodings = tokenizer(
    train_data["review_body"].tolist(),
    truncation=True,
    padding=True,
    max_length=512
)

# แปลงเป็น DataFrame
train_data["input_ids"] = train_encodings["input_ids"]
train_data["attention_mask"] = train_encodings["attention_mask"]
train_dataset = Dataset.from_pandas(train_data[["input_ids", "attention_mask", "stars"]])
train_dataset = train_dataset.rename_column("stars", "label")
print(train_data.columns)

In [None]:
n_splits = 5
random_state = 42 

train_data = train_data.sample(frac=1, random_state=random_state).reset_index(drop=True)

groups = train_data["review_body"]

group_kfold = GroupKFold(n_splits=n_splits)
cv_results = []

In [None]:
def convert_to_hf_dataset(df):
    df = df[["input_ids", "attention_mask", "stars"]].rename(columns={"stars": "label"})  # ✅ แก้ไขชื่อเป็น 'label'
    return Dataset.from_pandas(df)

os.environ["CUDA_LAUNCH_BLOCKING"] = "1"

def plot_loss_curve(fold, train_losses, train_losses_epoch, val_losses, val_losses_epoch, accuracy, accuracy_epoch):
    plt.figure(figsize=(18, 5))

    # กราฟ Training Loss
    plt.subplot(1, 3, 1)
    plt.plot(train_losses_epoch, train_losses, marker='o', label='Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title(f'Fold {fold + 1}: Training Loss')
    plt.legend()

    # กราฟ Validation Loss
    plt.subplot(1, 3, 2)
    plt.plot(val_losses_epoch, val_losses, marker='o', label='Validation Loss', color='red')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title(f'Fold {fold + 1}: Validation Loss')
    plt.legend()
    
    # กราฟ Accuracy
    plt.subplot(1, 3, 3)
    plt.plot(accuracy_epoch, accuracy, marker='o', label='Accuracy', color='green')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.title(f'Fold {fold + 1}: Accuracy')
    plt.legend()
    
    plt.tight_layout()
    plt.show()

for fold, (train_idx, val_idx) in enumerate(group_kfold.split(train_data, train_data["stars"], groups)):
    print(f"Training fold {fold + 1}/{n_splits}...")

    train_data = train_data.drop(columns=["_index_level_0__"], errors="ignore")
    train_data = train_data[train_data["stars"].between(0, 4)] 
    train_data["stars"] = train_data["stars"].astype(int)

    train_fold = convert_to_hf_dataset(train_data.iloc[train_idx])
    val_fold = convert_to_hf_dataset(train_data.iloc[val_idx])

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_fold,
        eval_dataset=val_fold,
        compute_metrics=compute_metrics,
        data_collator=data_collator
    )

    train_result = trainer.train()
    print("Log history:", trainer.state.log_history)

    train_losses = []  
    train_losses_epoch = []
    val_losses = [] 
    val_losses_epoch = [] 
    accuracy = []
    accuracy_epoch = []
    

    for log in trainer.state.log_history:
        if 'loss' in log:
            train_losses.append(log['loss'])
            train_losses_epoch.append(log['epoch'])
        if 'eval_loss' in log:
            val_losses.append(log['eval_loss'])
            val_losses_epoch.append(log['epoch'])
        if 'eval_accuracy' in log:
            accuracy.append(log['eval_accuracy'])
            accuracy_epoch.append(log['epoch'])

    plot_loss_curve(fold, train_losses, train_losses_epoch, val_losses, val_losses_epoch, accuracy, accuracy_epoch)
    
    accuracy_model = max(accuracy)
    
    final_model_dir = f"saved_model/Fold {fold + 1} - Accuracy {accuracy_model:.4f}"
    trainer.save_model(final_model_dir)
    print(f"Final model saved at {final_model_dir}")
    
    # pipeline
    classify_multiclass = pipeline(task='sentiment-analysis', 
                               model=final_model_dir, 
                               tokenizer=tokenizer, 
                               device=0)
    
    y_true = []
    y_pred = []
    results = []
    
    for index, row in tqdm(test_data.iterrows(), total=len(test_data), desc="Processing Reviews"):
        review_text = row["review_body"]  
        actual_label = str(row["stars"])  

        if preprocess_input_text and model_name not in public_models:
            review_text = process_transformers(review_text)

        prediction = classify_multiclass(review_text)

        predicted_label = str(int(prediction[0]["label"].replace("LABEL_", "")) + 1)

        y_true.append(actual_label)
        y_pred.append(predicted_label)

        is_correct = predicted_label == actual_label

        result_text = (
            f"text: {review_text}\n"
            f"label: {actual_label}\n"
            f"predict: {prediction}\n"
            f"predicted_label: {predicted_label}\n"
            f"result: {is_correct}\n"
            "--------------------------------------------"
        )

        results.append(result_text)

    y_true = [int(label) for label in y_true]
    y_pred = [int(label) for label in y_pred]

    accuracy = accuracy_score(y_true, y_pred) * 100
    precision = precision_score(y_true, y_pred, average='weighted') * 100
    recall = recall_score(y_true, y_pred, average='weighted') * 100
    f1 = f1_score(y_true, y_pred, average='weighted') * 100
    
    print("============= Model Performance Summary =============")
    print(f"Total Samples: {len(test_data)}")
    print(f"Accuracy: {accuracy:.2f}%")
    print(f"Precision: {precision:.2f}%")
    print(f"Recall: {recall:.2f}%")
    print(f"F1-score: {f1:.2f}%")

    print("\n=============== Classification Report ===============")
    print(classification_report(y_true, y_pred, digits=4))
    
    cm = confusion_matrix(y_true, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm)
    disp.plot(cmap=plt.cm.Greens)
    plt.title(f'Fine-Tune Model Fold {fold + 1}')
    plt.show()