In [35]:
%pip install pyspark==3.5.1
%pip install pandas
%pip install findspark
%pip install joblib
%pip install scikit-learn
%pip install xgboost
%pip install pyarrow

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Collecting pyarrow
  Downloading pyarrow-21.0.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (3.3 kB)
Downloading pyarrow-21.0.0-cp311-cp311-manylinux_2_28_x86_64.whl (42.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.8/42.8 MB[0m [31m33.3 MB/s[0m  [33m0:00:01[0mm0:00:01[0m00:01[0m
[?25hInstalling collected packages: pyarrow
Successfully installed pyarrow-21.0.0
Note: you may need to restart the kernel to use updated packages.


In [41]:
import os
# PATH definitions
DATA_DIR = "./dataset"  # đổi nếu dùng Drive
MODEL_DIR = "./models"
TRAIN_PATH = os.path.join(DATA_DIR, "train_data.csv")
VAL_PATH   = os.path.join(DATA_DIR, "val_data.csv")
TEST_PATH  = os.path.join(DATA_DIR, "test_data.csv")

In [16]:
# Columns
TEXT_COL = "Review"
ASPECTS  = ["Price","Shipping","Outlook","Quality","Size","Shop_Service","General","Others"]

# Label mapping
SENT_ID2NAME = {-1: "None", 0: "Negative", 1: "Positive", 2: "Neutral"}
LABEL_VALUES = list(SENT_ID2NAME.keys())  # [-1, 0, 1, 2]
LABEL_NAMES  = list(SENT_ID2NAME.values())  # ["None","Negative","Positive","Neutral"]

In [17]:
# Utilities function 
import re, unicodedata
import pandas as pd

URL_RE = re.compile(r"https?://\S+|www\.\S+")
TAG_RE = re.compile(r"<[^>]+>")
MULTISPACE_RE = re.compile(r"\s+")
VIETNAMESE_BASIC_STOPWORDS = set("""
và hoặc nhưng là thì mà được bị của cho với về từ tới đến nỗi do vì nên nếu khi để bằng như lại đã đang sẽ không chưa chẳng rất quá lắm hơi
này kia nọ đó đây ấy vậy thế sao tại vì do đó tuy nhiên hơn kém chỉ mỗi một các những cái con chiếc đôi đc nhé nha ạ ơi
""".split())

def preprocess_xgb(text: str) -> str:
    if not isinstance(text, str):
        text = str(text)
    text = text.strip().lower()
    text = URL_RE.sub(" ", text)
    text = TAG_RE.sub(" ", text)
    text = text.replace("❤️", " yeu ").replace("❤", " yeu ").replace("😍", " yeu ")
    text = re.sub(r"[^\w\sáàảãạăắằẳẵặâấầẩẫậéèẻẽẹêếềểễệíìỉĩịóòỏõọôốồổỗộơớờởỡợúùủũụưứừửữựýỳỷỹỵđ]", " ", text)
    text = unicodedata.normalize("NFC", text)  # normalize accents
    text = MULTISPACE_RE.sub(" ", text).strip()
    tokens = [w for w in text.split() if w not in VIETNAMESE_BASIC_STOPWORDS]
    return " ".join(tokens)

In [18]:
def load_and_merge(train_path, val_path, test_path):
    train_df = pd.read_csv(train_path)
    val_df   = pd.read_csv(val_path)
    test_df  = pd.read_csv(test_path)

    # Merge train + val
    full_train = pd.concat([train_df, val_df], axis=0).reset_index(drop=True)

    # Preprocess review text
    full_train[TEXT_COL] = full_train[TEXT_COL].map(preprocess_xgb)
    test_df[TEXT_COL]    = test_df[TEXT_COL].map(preprocess_xgb)

    return full_train, test_df

In [21]:
from xgboost import XGBClassifier
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report
from sklearn.preprocessing import LabelEncoder

# ==== STORAGE ====

def train_xgb_models(train_df, test_df):
    models = {}
    encoders = {}
    reports = {}

    for aspect in ASPECTS:
        print(f"\n=== Training aspect: {aspect} ===")

        # Encode labels
        le = LabelEncoder()
        y_train = le.fit_transform(train_df[aspect])
        y_test  = le.transform(test_df[aspect])
        encoders[aspect] = le

        # Build pipeline: TF-IDF + XGBoost
        pipe = Pipeline([
            ("tfidf", TfidfVectorizer(max_features=5000, ngram_range=(1,2))),
            ("xgb", XGBClassifier(
                n_estimators=300,
                learning_rate=0.1,
                max_depth=6,
                subsample=0.8,
                colsample_bytree=0.8,
                random_state=42,
                n_jobs=-1,
                use_label_encoder=False,
                eval_metric="mlogloss"
            ))
        ])

        # Train
        pipe.fit(train_df[TEXT_COL], y_train)
        models[aspect] = pipe

        # Evaluate
        y_pred_encoded = pipe.predict(test_df[TEXT_COL])
        y_pred_original = le.inverse_transform(y_pred_encoded)
        y_test_original = le.inverse_transform(y_test)

        report = classification_report(y_test_original, y_pred_original, labels=LABEL_VALUES, target_names=LABEL_NAMES)
        print(report)

        models[aspect] = pipe
        reports[aspect] = report

    return models, encoders, reports

In [22]:
def predict_aspects(models, encoders, texts):
    texts = [preprocess_xgb(t) for t in texts]
    results = {}
    for aspect, model in models.items():
        pred_encoded = model.predict(texts)                # e.g. [0,1,2,3]
        pred_original = encoders[aspect].inverse_transform(pred_encoded)  # e.g. [-1,0,1,2]
        results[aspect] = [SENT_ID2NAME[int(x)] for x in pred_original]
    return pd.DataFrame(results, index=range(len(texts)))

In [None]:
# Example usage
train_df, test_df = load_and_merge(TRAIN_PATH, VAL_PATH, TEST_PATH)
xgb_models, xgb_encoders, xgb_reports = train_xgb_models(train_df, test_df)


=== Training aspect: Price ===


Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])
  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])
  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])


              precision    recall  f1-score   support

        None       0.95      0.98      0.97      1999
    Negative       0.00      0.00      0.00         3
    Positive       0.91      0.82      0.86       247
     Neutral       0.61      0.36      0.46        91

    accuracy                           0.94      2340
   macro avg       0.62      0.54      0.57      2340
weighted avg       0.93      0.94      0.94      2340


=== Training aspect: Shipping ===


Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)


              precision    recall  f1-score   support

        None       0.98      0.98      0.98      1635
    Negative       0.82      0.85      0.83       124
    Positive       0.91      0.95      0.93       549
     Neutral       0.25      0.06      0.10        32

    accuracy                           0.95      2340
   macro avg       0.74      0.71      0.71      2340
weighted avg       0.94      0.95      0.95      2340


=== Training aspect: Outlook ===


Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)


              precision    recall  f1-score   support

        None       0.89      0.93      0.91      1069
    Negative       0.67      0.46      0.55        95
    Positive       0.91      0.93      0.92      1118
     Neutral       0.27      0.05      0.09        58

    accuracy                           0.89      2340
   macro avg       0.68      0.59      0.62      2340
weighted avg       0.87      0.89      0.88      2340


=== Training aspect: Quality ===


Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)


              precision    recall  f1-score   support

        None       0.91      0.95      0.93      1654
    Negative       0.70      0.33      0.44        98
    Positive       0.78      0.83      0.81       478
     Neutral       0.49      0.22      0.30       110

    accuracy                           0.87      2340
   macro avg       0.72      0.58      0.62      2340
weighted avg       0.85      0.87      0.86      2340


=== Training aspect: Size ===


Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)


              precision    recall  f1-score   support

        None       0.96      0.97      0.97      1953
    Negative       0.60      0.62      0.61       125
    Positive       0.74      0.77      0.76       165
     Neutral       0.36      0.23      0.28        97

    accuracy                           0.91      2340
   macro avg       0.67      0.65      0.65      2340
weighted avg       0.90      0.91      0.90      2340


=== Training aspect: Shop_Service ===


Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)


              precision    recall  f1-score   support

        None       0.92      0.97      0.95      1740
    Negative       0.70      0.49      0.58       140
    Positive       0.81      0.77      0.79       431
     Neutral       0.00      0.00      0.00        29

    accuracy                           0.89      2340
   macro avg       0.61      0.56      0.58      2340
weighted avg       0.88      0.89      0.89      2340


=== Training aspect: General ===


Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)


              precision    recall  f1-score   support

        None       0.88      0.96      0.92      1861
    Negative       0.00      0.00      0.00        11
    Positive       0.69      0.45      0.54       285
     Neutral       0.58      0.36      0.44       183

    accuracy                           0.85      2340
   macro avg       0.54      0.44      0.48      2340
weighted avg       0.83      0.85      0.83      2340


=== Training aspect: Others ===


Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)


              precision    recall  f1-score   support

        None       0.97      0.99      0.98      2151
    Negative       0.00      0.00      0.00         0
    Positive       0.00      0.00      0.00         0
     Neutral       0.88      0.65      0.75       189

    accuracy                           0.96      2340
   macro avg       0.46      0.41      0.43      2340
weighted avg       0.96      0.96      0.96      2340

     Price  Shipping   Outlook   Quality  Size Shop_Service General Others
0  Neutral  Negative  Positive      None  None         None    None   None
1     None      None      None  Positive  None     Positive    None   None


  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])
  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])
  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])
  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])
  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])
  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])
  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])
  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])
  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])


In [37]:

sample_texts = [
    "Giày rất đẹp, giá hợp lý nhưng giao hàng chậm",
    "Giá cao, giày xấu, shop giao hàng chậm"
]

print("=== XGBoost Predictions ===")
print(predict_aspects(xgb_models, xgb_encoders, sample_texts))

=== XGBoost Predictions ===
     Price  Shipping   Outlook Quality  Size Shop_Service General Others
0  Neutral  Negative  Positive    None  None         None    None   None
1     None  Negative  Negative    None  None         None    None   None


In [42]:
import joblib

# Save all models
for aspect, model in xgb_models.items():
    joblib.dump(model, os.path.join(MODEL_DIR, f"{aspect}_xgb.pkl"))

# Save all encoders
for aspect, encoder in xgb_encoders.items():
    joblib.dump(encoder, os.path.join(MODEL_DIR, f"{aspect}_encoder.pkl"))

print("✅ All existing models and encoders saved to disk")

✅ All existing models and encoders saved to disk


In [43]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Pre-load models/encoders and broadcast them
bc_models = {
    aspect: spark.sparkContext.broadcast(joblib.load(os.path.join(MODEL_DIR, f"{aspect}_xgb.pkl")))
    for aspect in ASPECTS
}
bc_encoders = {
    aspect: spark.sparkContext.broadcast(joblib.load(os.path.join(MODEL_DIR, f"{aspect}_encoder.pkl")))
    for aspect in ASPECTS
}


# ----------------------
# UDF factory for each aspect
# ----------------------
def make_predict_udf(aspect):
    model = bc_models[aspect].value
    encoder = bc_encoders[aspect].value

    @pandas_udf(StringType())
    def predict_udf(texts: pd.Series) -> pd.Series:
        texts_proc = [preprocess_xgb(t) for t in texts]
        preds_encoded = model.predict(texts_proc)
        preds_original = encoder.inverse_transform(preds_encoded)
        return pd.Series([SENT_ID2NAME[int(x)] for x in preds_original])

    return predict_udf

In [46]:
# Load some Spark dataframe (example: test data)
spark_df = spark.read.csv(TEST_PATH, header=True)
df = spark_df.limit(1000)

# Add predictions for each aspect
for aspect in ASPECTS:
    predict_udf = make_predict_udf(aspect)
    df = df.withColumn(f"{aspect}_pred", predict_udf(df["Review"]))  # check column name

# Show results
cols_to_show = ["Review"] + [f"{aspect}_pred" for aspect in ASPECTS]
df.select(*cols_to_show).limit(10).toPandas()

                                                                                

Unnamed: 0,Review,Price_pred,Shipping_pred,Outlook_pred,Quality_pred,Size_pred,Shop_Service_pred,General_pred,Others_pred
0,"Giày hơi có mùi nồng, lưu ý đôi LA không phải ...",,,,,,,Positive,
1,Hàng về đẹp lắm nha ship thân thiện đi giày vừ...,,Positive,Positive,,Positive,,,
2,Hàng ôk nên mua Dày rất đẹp,,,Positive,,,,,
3,Bun. GTI gửi Oke sớ ơ đi sidbd. Bởi đi được đ...,,,,,,,,
4,Màu đẹp giống trong hình mọi người nên mua nha...,,,Positive,,,,,
5,chất lượng phù hợp với giá tiền đi đúng sz như...,,,,Neutral,Neutral,,,
6,Giày trượt lắm huhu Đánh giải trường mà trượt ...,,,Positive,,,,,
7,Tr ơi dép đẹp vs dth lắm nha vs giá này mà chấ...,,,Positive,Positive,Positive,,,
8,Cũng tạm được thoi,,,,,,,Neutral,
9,Shop hỗ trợ rất tốt. Mn nên mua nhé,,,,,,,Positive,
