# clean the text

In [2]:
from datasets import load_dataset
dataset = load_dataset("tianharjuno/twitter-parse", cache_dir="cache/")
train_ds = dataset["train_sentiment"]
test_ds = dataset["test_sentiment"]

In [3]:
import re, unicodedata, jaconv, emoji

_URL      = re.compile(r'https?://\S+')
_MENTION  = re.compile(r'@\w+')
_WS       = re.compile(r'\s+')
_KUTI_CUT = re.compile(r'(?i)kutipan.*$', re.DOTALL)

# --- (MODIFIED) ---
# Catches "word" + "dari" + "domain.com" -> replaces with "word"
# Changed \w+ to \S+ to include punctuation like '!'
_DARI_URL_ATTACHED = re.compile(r'(\S+)dari\s+([a-z0-9.-]+\.[a-z]{2,})\b', re.I)

# Catches " dari " + "domain.com" -> replaces with empty string
_DARI_URL_SPACED = re.compile(r'\s+dari\s+([a-z0-9.-]+\.[a-z]{2,})\b', re.I)

# --- (NEW) ---
# Catches any word ending in "dari" (e.g., "anarko!dari", "negaradari")
_DARI_STUCK = re.compile(r'(\S+)dari\b', re.I)

def cleantext(row: str):
    text = row["content"] #type: ignore
    text = unicodedata.normalize('NFKC', text)
    text = jaconv.z2h(text, kana=False, digit=True, ascii=True)
    text = text.replace("tanya grok", " ")
    text = text.replace("grokproductivitypasang", " ")
    text = text.replace('\\n', ' ').replace('\\r', ' ')

    # Handle standard URLs first
    text = _URL.sub(' <url> ', text)
    text = text.replace('ini tidak tersedia', ' ')

    text = _MENTION.sub('@USER', text)
    text = re.sub(r'^rt\s+', '', text, flags=re.I)
    text = re.sub(r'(\b\d{4})(?=[a-zA-Z])', r'\1 ', text)
    text = _KUTI_CUT.sub('', text)

    # text = _DARI_URL_ATTACHED.sub(r'\1', text)
    # text = _DARI_URL_SPACED.sub('', text)
    # text = _DARI_STUCK.sub(r'\1', text)

    text = _WS.sub(' ', text).strip()
    row["content"] = text #type: ignore
    return row

In [4]:
train_ds = train_ds.map(cleantext, num_proc=12)
test_ds = test_ds.map(cleantext, num_proc=12)

train_ds = train_ds.rename_column("sentiment", "label")
test_ds = test_ds.rename_column("sentiment", "label")

Map (num_proc=12):   0%|          | 0/30000 [00:00<?, ? examples/s]

Map (num_proc=12):   0%|          | 0/10000 [00:00<?, ? examples/s]

# load the models and tokenize the data

In [5]:
from transformers import AutoModelForSequenceClassification, AutoTokenizer
from datasets import ClassLabel
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "mps" if torch.mps.is_available() else "cpu")
class_labels = ClassLabel(names=["Negative", "Neutral", "Positive"])
model = AutoModelForSequenceClassification.from_pretrained('indolem/indobertweet-base-uncased', cache_dir="cache/", num_labels=len(class_labels.names))
tokenizer = AutoTokenizer.from_pretrained('indolem/indobertweet-base-uncased', cache_dir="cache/")
model.to(device)

config.json: 0.00B [00:00, ?B/s]

pytorch_model.bin:   0%|          | 0.00/445M [00:00<?, ?B/s]

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at indolem/indobertweet-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


tokenizer_config.json:   0%|          | 0.00/2.00 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

model.safetensors:   0%|          | 0.00/445M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

BertForSequenceClassification(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(31923, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSdpaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e

In [6]:
def tokenize(batch):
    return tokenizer(
        batch["content"],
        padding="max_length",
        truncation=True,
        max_length=512,
        return_tensors="pt",
    )
train_ds = train_ds.map(tokenize, batched=True, num_proc=12)
test_ds = test_ds.map(tokenize, batched=True, num_proc=12)

Map (num_proc=12):   0%|          | 0/30000 [00:00<?, ? examples/s]

Map (num_proc=12):   0%|          | 0/10000 [00:00<?, ? examples/s]

In [7]:
def compute_metrics(class_names):
    num_classes = len(class_names)
    def callback(eval_pred):
        logits, labels = eval_pred
        if isinstance(logits, torch.Tensor):
            logits = logits.detach().cpu().numpy()
        if isinstance(labels, torch.Tensor):
            labels = labels.detach().cpu().numpy()
        preds = np.argmax(logits, axis=1)
        macro_p, macro_r, macro_f1, _ = precision_recall_fscore_support(
            labels, preds, average="macro", zero_division=0
        )
        acc = accuracy_score(labels, preds)
        p_cls, r_cls, f1_cls, support_cls = precision_recall_fscore_support(
            labels,
            preds,
            average=None,
            zero_division=0,
            labels=list(range(num_classes)),
        )
        metrics = {
            "accuracy": acc,
            "macro_f1": macro_f1,
            "macro_precision": macro_p,
            "macro_recall": macro_r,
        }
        for idx, name in enumerate(class_names):
            metrics[f"{name}_precision"] = p_cls[idx]  # type: ignore
            metrics[f"{name}_recall"] = r_cls[idx]  # type: ignore
            metrics[f"{name}_f1"] = f1_cls[idx]  # type: ignore
            metrics[f"{name}_support"] = int(support_cls[idx])  # type: ignore
        return metrics
    return callback

In [8]:
from transformers import Trainer
from torch import nn
from collections import Counter

class WeightedLossTrainer(Trainer):
    def __init__(self, *args, class_weights=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.class_weights = class_weights

    def compute_loss(self, input_model, inputs, return_outputs=False, **kwargs):
        labels = inputs.pop("labels")
        outputs = input_model(**inputs)
        logits = outputs.get("logits")
        weights = self.class_weights.to(logits.device)
        loss_fct = nn.CrossEntropyLoss(weight=weights)
        loss = loss_fct(logits.view(-1, self.model.config.num_labels),
                        labels.view(-1))
        return (loss, outputs) if return_outputs else loss

In [9]:
import torch
from collections import Counter

# 1. Get counts
true_labels = train_ds["label"]
label_counts = Counter(true_labels)
n_classes = 3
total_samples = sum(label_counts.values())

# 2. Ensure weights are ordered by class index (0, 1, 2)
# This is critical for CrossEntropyLoss
class_indices = sorted(label_counts.keys()) # Assumes labels are 0, 1, 2
counts = [label_counts[i] for i in range(n_classes)]

# 3. Calculate Balanced Weights
# Formula: Total / (Num_Classes * Count_Class)
weights = [total_samples / (n_classes * c) for c in counts]

my_weights = torch.tensor(weights, dtype=torch.float)

print(f"Class Counts: {counts}")
print(f"Calculated Weights: {my_weights}")

Class Counts: [20793, 6317, 2890]
Calculated Weights: tensor([0.4809, 1.5830, 3.4602])


In [15]:
from transformers.training_args import TrainingArguments
from transformers.data.data_collator import default_data_collator
from sklearn.metrics import precision_recall_fscore_support, accuracy_score
import numpy as np
training_args = TrainingArguments(
    overwrite_output_dir=True,
    eval_strategy="epoch",     # evaluate at the end of each epoch
    save_strategy="epoch",           # save checkpoint at the end of each epoch
    learning_rate=1e-5,
    per_device_train_batch_size=64,
    per_device_eval_batch_size=256,
    num_train_epochs=10,
    weight_decay=0.05,
    logging_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model="eval_macro_f1",
    greater_is_better=True,
    warmup_ratio=0.01,
    fp16=True,
)
compute_callback = compute_metrics(class_labels.names)
train_ds_split = train_ds.train_test_split(test_size=0.2, seed=42)
trainer = WeightedLossTrainer(
    model = model,
    args = training_args,
    train_dataset = train_ds_split["train"],
    eval_dataset=train_ds_split["test"],
    data_collator=default_data_collator,
    compute_metrics=compute_callback,
    class_weights=my_weights
)
print("Starting training...")
trainer.train()

Starting training...


Epoch,Training Loss,Validation Loss


KeyboardInterrupt: 