In [1]:
#Install pip
%pip install "transformers[torch]"
%pip install "underthesea"

import pandas as pd
import torch
import torch.nn as nn
import re
import numpy as np

# from torch.utils.data import Dataset, DataLoader, random_split, DataLoader
# from transformers import DistilBertForSequenceClassification, DistilBertTokenizerFast
from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline, AutoModelForTokenClassification
from underthesea import text_normalize, word_tokenize

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


  from .autonotebook import tqdm as notebook_tqdm


# 1. Ti·ªÅn x·ª≠ l√Ω text

In [None]:
# Restored
class VietnameseDiacriticRestorer:
    def __init__(self, model_path='peterhung/vietnamese-accent-marker-xlm-roberta'):
        self.model = AutoModelForTokenClassification.from_pretrained(model_path)
        self.tokenizer = AutoTokenizer.from_pretrained(model_path, add_prefix_space=True)
        self.TOKENIZER_WORD_PREFIX = "‚ñÅ"

        #device
        self.device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
        self.model.to(self.device)
        self.model.eval()

        #load labels list
        self.label_list = self._load_tags_set("selected_tags_names.txt")

    def insert_accents(self, text):
      our_tokens = text.strip().split()

      # the tokenizer may further split our tokens
      inputs = self.tokenizer(our_tokens,
                        is_split_into_words=True,
                        truncation=True,
                        padding=True,
                        return_tensors="pt"
                        )

      input_ids = inputs['input_ids']
      tokens = self.tokenizer.convert_ids_to_tokens(input_ids[0])
      tokens = tokens[1:-1]

      with torch.no_grad():
          inputs = {k: v.to(self.device) for k, v in inputs.items()}
          outputs = self.model(**inputs)

      predictions = outputs["logits"].cpu().numpy()
      predictions = np.argmax(predictions, axis=2)

      # exclude output at index 0 and the last index, which correspond to '<s>' and '</s>'
      predictions = predictions[0][1:-1]

      return tokens, predictions

    def _load_tags_set(self, fpath):
      labels = []
      with open(fpath, 'r') as f:
        for line in f:
            line = line.strip()
            if line:
                labels.append(line)
      return labels

    # assert len(label_list) == 528

    def merge_tokens_and_preds(self, tokens, predictions):
      merged_tokens_preds = []
      i = 0
      while i < len(tokens):
        tok = tokens[i]
        label_indexes = set([predictions[i]])
        if tok.startswith(self.TOKENIZER_WORD_PREFIX): # start a new word
            tok_no_prefix = tok[len(self.TOKENIZER_WORD_PREFIX):]
            cur_word_toks = [tok_no_prefix]
            # check if subsequent toks are part of this word
            j = i + 1
            while j < len(tokens):
                if not tokens[j].startswith(self.TOKENIZER_WORD_PREFIX):
                    cur_word_toks.append(tokens[j])
                    label_indexes.add(predictions[j])
                    j += 1
                else:
                    break
            cur_word = ''.join(cur_word_toks)
            merged_tokens_preds.append((cur_word, label_indexes))
            i = j
        else:
            merged_tokens_preds.append((tok, label_indexes))
            i += 1

      return merged_tokens_preds

    def get_accented_words(self, merged_tokens_preds, label_list):
      accented_words = []
      for word_raw, label_indexes in merged_tokens_preds:
        # use the first label that changes word_raw
        for label_index in label_indexes:
            tag_name = label_list[int(label_index)]
            raw, vowel = tag_name.split("-")
            if raw and raw in word_raw:
                word_accented = word_raw.replace(raw, vowel)
                break
        else:
            word_accented = word_raw

        accented_words.append(word_accented)

      return " ".join(accented_words)

    def restore(self, text):
      tokens, predictions = self.insert_accents(text)
      merged_tokens_preds = self.merge_tokens_and_preds(tokens, predictions)
      accented_words = self.get_accented_words(merged_tokens_preds, self.label_list)
      return accented_words

In [None]:
# Clear text
class VietnameseTextStandardizer:
    def __init__(self):
        # T·ª´ ƒëi·ªÉn chu·∫©n h√≥a t·ª´ vi·∫øt t·∫Øt/th√¥ng d·ª•ng
        self.normalization_dict = {
            "sp": "s·∫£n ph·∫©m", "dk": "ƒë∆∞·ª£c", "dc": "ƒë∆∞·ª£c", "ko": "kh√¥ng",
            "k": "kh√¥ng", "bt": "b√¨nh th∆∞·ªùng", "ok": "t·ªët", "oke": "t·ªët",
            "okela": "t·ªët", "sg": "s√†i g√≤n", "hn": "h√† n·ªôi", "tks": "c·∫£m ∆°n",
            "thank": "c·∫£m ∆°n", "please": "l√†m ∆°n", "thanks": "c·∫£m ∆°n",
            "good": "t·ªët", "bad": "t·ªá", "very": "r·∫•t", "like": "th√≠ch",
            "hate": "gh√©t", "du": "ƒë·ªß"
        }

        # C√°c t·ª´ vi·∫øt li·ªÅn nhau
        self.joined_words_dict = {
            "toithich": "t√¥i th√≠ch",
            "toimuon": "t√¥i mu·ªën",
            "toicamthay": "t√¥i c·∫£m th·∫•y",
            "ratthich": "r·∫•t th√≠ch",
            "quathich": "qu√° th√≠ch",
            "thichqua": "th√≠ch qu√°",
            "banthat": "b·∫°n th·∫≠t",
            "spnay": "s·∫£n ph·∫©m n√†y",
            "dichvunay": "d·ªãch v·ª• n√†y",
            "toikhong": "t√¥i kh√¥ng",
            "toiko": "t√¥i kh√¥ng",
            "toicung": "t√¥i c≈©ng",
            "toiratthich": "t√¥i r·∫•t th√≠ch",  
        }

        # C√°c t·ª´ c√≥ k√≠ hi·ªáu emote
        self.emoticon_sentiment_dict = {
            # Positive emoticons
            ":)": " t√≠ch_c·ª±c ", ":-)": " t√≠ch_c·ª±c ", "=)": " t√≠ch_c·ª±c ",
            ":D": " r·∫•t_t√≠ch_c·ª±c ", ":-D": " r·∫•t_t√≠ch_c·ª±c ", "=D": " r·∫•t_t√≠ch_c·ª±c ",
            ":)": " t√≠ch_c·ª±c ", "üòä": " t√≠ch_c·ª±c ", "üòç": " r·∫•t_t√≠ch_c·ª±c ",
            "ü§©": " r·∫•t_t√≠ch_c·ª±c ", "üëç": " t·ªët ", "‚ù§Ô∏è": " y√™u_th√≠ch ",
            "üíñ": " y√™u_th√≠ch ", "üòò": " y√™u_th√≠ch ", "ü•∞": " y√™u_th√≠ch ",
            "üòÅ": " vui ", "üòÑ": " vui ", "üòÜ": " vui ", "üòÇ": " vui ",

            # Negative emoticons
            ":(": " ti√™u_c·ª±c ", ":-(": " ti√™u_c·ª±c ", "=(": " ti√™u_c·ª±c ",
            ":'(": " bu·ªìn ", "üòû": " bu·ªìn ", "üòî": " bu·ªìn ", "üòü": " lo_l·∫Øng ",
            "üò†": " t·ª©c_gi·∫≠n ", "üò°": " r·∫•t_t·ª©c_gi·∫≠n ", "ü§¨": " r·∫•t_t·ª©c_gi·∫≠n ",
            "üëé": " t·ªá ", "üíî": " th·∫•t_v·ªçng ", "üò¢": " kh√≥c ", "üò≠": " kh√≥c_nhi·ªÅu ",

            # Neutral/Sarcastic
            ":|": " b√¨nh_th∆∞·ªùng ", ":-|": " b√¨nh_th∆∞·ªùng ", "üòê": " b√¨nh_th∆∞·ªùng ",
            "üòë": " kh√¥ng_h√†i_l√≤ng ", "ü§®": " nghi_ng·ªù ", "üòí": " ch√°n ",
            "üôÑ": " m·∫Øt_ƒë·∫£o ", "üòè": " m·ªâa_mai "
        }

        # üéØ EMOJI PATTERNS (ƒë·ªÉ kh√¥ng remove ho√†n to√†n)
        self.emoji_pattern = re.compile(
            "["
            "\U0001F600-\U0001F64F"  # emoticons
            "\U0001F300-\U0001F5FF"  # symbols & pictographs
            "\U0001F680-\U0001F6FF"  # transport & map symbols
            "\U0001F1E0-\U0001F1FF"  # flags (iOS)
            "]+", flags=re.UNICODE
        )

    def split_joined_words(self, text):
        """T√°ch t·ª´ vi·∫øt li·ªÅn b·∫±ng t·ª´ ƒëi·ªÉn"""
        for joined_word, separated in self.joined_words_dict.items():
            text = text.replace(joined_word, separated)
        return text

    def handle_emoticons(self, text):
        """Chuy·ªÉn emoticons th√†nh sentiment words"""
        for emoticon, sentiment_word in self.emoticon_sentiment_dict.items():
            text = text.replace(emoticon, sentiment_word)
        return text

    def standardize(self, text):
        """
        Chu·∫©n h√≥a ti·∫øng Vi·ªát to√†n di·ªán
        """
        if not text or not isinstance(text, str):
            return ""

        # B∆∞·ªõc 1: Chu·∫©n h√≥a unicode & lowercase
        normalized = text_normalize(text)

        # B∆∞·ªõc 2: Chu·∫©n h√≥a kho·∫£ng tr·∫Øng
        text = re.sub(r'\s+', ' ', normalized).strip()

        # B∆∞·ªõc 3: Chu·∫©n h√≥a emote
        text = self.handle_emoticons(text)

        # B∆∞·ªõc 4: T√°ch t·ª´ vi·∫øt li·ªÅn 
        text = self.split_joined_words(text)

        # B∆∞·ªõc 5: T√°ch t·ª´ (QUAN TR·ªåNG)
        tokens = word_tokenize(text)

        # B∆∞·ªõc 6: Chu·∫©n h√≥a t·ª´ v·ª±ng
        standardized_tokens = []
        for token in tokens:
            # Chu·∫©n h√≥a t·ª´ vi·∫øt t·∫Øt/th√¥ng d·ª•ng
            standardized_token = self.normalization_dict.get(token.lower(), token.lower())
            standardized_tokens.append(standardized_token)

        # B∆∞·ªõc 7: Gh√©p l·∫°i th√†nh c√¢u chu·∫©n
        clean_text = " ".join(standardized_tokens)

        return clean_text

In [4]:
# Sentiment Analysis
class VietnameseSentimentAnalyzer:
    def __init__(self, model_name="wonrax/phobert-base-vietnamese-sentiment"):
        """
        model_name options:
        - "wonrax/phobert-base-vietnamese-sentiment" (PhoBERT sentiment)
        - "vinai/phobert-base" (PhoBERT base)
        - "FPTAI/vibert-base-cased" (ViBERT)
        """
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForSequenceClassification.from_pretrained(model_name)
        self.sentiment_pipeline = pipeline(
            "sentiment-analysis",
            model=self.model,
            tokenizer=self.tokenizer
        )

        # Chu·∫©n h√≥a text
        self.standardizer = VietnameseTextStandardizer()
        self.restored = VietnameseDiacriticRestorer()


    def analyze_sentiment(self, text):
        """Ph√¢n t√≠ch sentiment"""

        # 1. Restored
        restored_text = self.restored.restore(text)

        # 2. Clear text
        cleaned_text = self.standardizer.standardize(restored_text)

        # 3. Ph√¢n t√≠ch sentiment b·∫±ng model ƒë√£ fine-tuned
        result = self.sentiment_pipeline(cleaned_text)

        return {
            'original_text': text,
            'cleaned_text': cleaned_text,
            'sentiment': result[0]['label'],
            'confidence': result[0]['score']
        }

In [6]:
# # S·ª≠ d·ª•ng
analyzer = VietnameseSentimentAnalyzer()

test_texts = [
    "toi that bai that roi",
]

for text in test_texts:
    result = analyzer.analyze_sentiment(text)
    print(f"Text: {result['original_text']}")
    print(f"Cleaned: {result['cleaned_text']}")
    print(f"Sentiment: {result['sentiment']} (Confidence: {result['confidence']:.4f})")
    print("-" * 50)

Device set to use mps:0


Text: toi that bai that roi
Cleaned: t√¥i th·∫•t b·∫°i th·∫≠t r·ªìi
Sentiment: NEG (Confidence: 0.9872)
--------------------------------------------------
