In [1]:
import re
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, udf, array_join
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.feature import StopWordsRemover
from pyvi.ViTokenizer import ViTokenizer

In [2]:
# init spark session
spark = SparkSession.builder.master('local[*]').config('spark.ui.port', '4040').getOrCreate()

In [11]:
review_fp = "hdfs://namenode:9000/review_data/reviews.csv"
df = spark.read.csv(review_fp, header=True, inferSchema=True)
df.show()

+--------------------+------+-----------+
|              review|rating|place_index|
+--------------------+------+-----------+
|Gà ướp vừa vị , m...|     5|          0|
|Quán sạch, đẹp, t...|     5|          0|
|Nhân viên bự con ...|     5|          0|
|Gà giòn tan, nóng...|     5|          0|
|Gà giòn rụm, thấm...|     5|          0|
|Nhân viên thân th...|     5|          0|
|Phục vụ nhanh, nh...|     5|          0|
|Gà rán thì ở đâu ...|     5|          0|
|Nhân viên phục vụ...|     5|          0|
|Mình là fan KFC, ...|     5|          0|
|Gà giòn ngon dù đ...|     5|          0|
|Mình có ghé đây ă...|     5|          0|
|Ghé quán vào 1 bu...|     5|          0|
|Nhân viên thân th...|     5|          0|
|Ăn nhiều brand gà...|     5|          0|
|Mới ghé tại nhà h...|     5|          0|
|Đồ ăn nóng hổi, c...|     5|          0|
|Ngon lắm mn ơi. Q...|     5|          0|
|Ghé vào buổi tối,...|     5|          0|
|Ghé ăn trưa cùng ...|     5|          0|
+--------------------+------+-----

# Encoding Rating into sentiments
Ratings above 3 stars are considered positive, below 3 stars are considered negative and neutral otherwise.

In [None]:
df_encoded = df.withColumn("sentiment",
    when(col("rating") > 3, "positive")
    .when(col("rating") == 3, "neutral")
    .otherwise("negative")
)
df_encoded.show()

+--------------------+------+-----------+---------+
|              review|rating|place_index|sentiment|
+--------------------+------+-----------+---------+
|Gà ướp vừa vị , m...|     5|          0| positive|
|Quán sạch, đẹp, t...|     5|          0| positive|
|Nhân viên bự con ...|     5|          0| positive|
|Gà giòn tan, nóng...|     5|          0| positive|
|Gà giòn rụm, thấm...|     5|          0| positive|
|Nhân viên thân th...|     5|          0| positive|
|Phục vụ nhanh, nh...|     5|          0| positive|
|Gà rán thì ở đâu ...|     5|          0| positive|
|Nhân viên phục vụ...|     5|          0| positive|
|Mình là fan KFC, ...|     5|          0| positive|
|Gà giòn ngon dù đ...|     5|          0| positive|
|Mình có ghé đây ă...|     5|          0| positive|
|Ghé quán vào 1 bu...|     5|          0| positive|
|Nhân viên thân th...|     5|          0| positive|
|Ăn nhiều brand gà...|     5|          0| positive|
|Mới ghé tại nhà h...|     5|          0| positive|
|Đồ ăn nóng 

In [16]:
df_encoded.filter(df_encoded['sentiment'] != 'positive').show()

+--------------------+------+-----------+---------+
|              review|rating|place_index|sentiment|
+--------------------+------+-----------+---------+
|Khi tôi đặt 5 cán...|     1|          0| negative|
|Gọi gà nướng, bắt...|     1|          0| negative|
|Thanh gà trong co...|     2|          0| negative|
|Đặt hàng trên Sho...|     1|          0| negative|
|Làm ăn cà chớn, đ...|     1|          0| negative|
|Làm ăn tệ hại,bỏ ...|     1|          0| negative|
|Thái độ nhân viên...|     1|          0| negative|
|Nhân viên ăn nói ...|     1|          0| negative|
|Đồ ăn ngon bác bả...|     2|          0| negative|
|Lần thứ 3 mua hàn...|     1|          0| negative|
|Đặt grab mua milo...|     1|          0| negative|
|Thời gian chờ lâu...|     1|          0| negative|
|29/11/2021 mình c...|     1|          0| negative|
|Nóng nực, thái độ...|     2|          0| negative|
|Khuyên các bạn mu...|     1|          0| negative|
|Muốn ăn 1 miệng g...|     1|          0| negative|
|nhân viên c

# Text Preprocessing

In [None]:
# Read stopwords from .txt
url = 'https://drive.google.com/uc?id=1ZyLTTnY0fiLmZo66pcl_6AhJ8TJyjXqt'
response = requests.get(url)

with open("stopwords.txt", "wb") as f:
    f.write(response.content)

with open("stopwords.txt.txt", "r", encoding="utf-8") as f:
    stopwords_list = [line.strip() for line in f.readlines()]

In [None]:
# Define UDF to tokenize using PyVi
def pyvi_tokenize(text):
    try:
        if text and isinstance(text, str):
            text = re.sub(r'[^\w\s\.]', ' ', text)
            tokens = ViTokenizer.tokenize(text).split()
            return [token for token in tokens if token.strip()]
        return []
    except Exception as e:
        print(f"Error tokenizing text: {e}")
        return []

In [None]:
# Define UDF to clean and normalize tokens
def normalize_tokens(tokens):
    try:
        if not tokens:
            return []

        normalized = []
        for token in tokens:
            token = token.replace('_', ' ')            
            clean_token = re.sub(r'[^\w\s]', '', token.lower())
            clean_token = clean_token.strip()
            if clean_token:
                sub_tokens = clean_token.split()
                normalized.extend(sub_tokens)

        return normalized
    except Exception as e:
        print(f"Error normalizing tokens: {e}")
        return []

In [None]:
# Register UDFs
tokenize_udf = udf(pyvi_tokenize, ArrayType(StringType()))
normalize_udf = udf(normalize_tokens, ArrayType(StringType()))

Processing 

In [None]:
# Tokenize
df_with_tokens = df_encoded.withColumn("tokens", tokenize_udf(col("review")))

In [None]:
# Remove stopwords
stopwords_remover = StopWordsRemover() \
    .setInputCol("tokens") \
    .setOutputCol("filtered_tokens") \
    .setStopWords(stopwords_list) \
    .setCaseSensitive(False)

df_no_stopwords = stopwords_remover.transform(df_with_tokens)

In [None]:
# Normalize
df_normalized = df_no_stopwords.withColumn("normalized_tokens", normalize_udf(col("filtered_tokens")))

In [None]:
# Join tokens into a text column
df_join = df_normalized.withColumn("cleaned_reviews", array_join(col("normalized_tokens"), " "))

In [None]:
# Drop intermediate columns
df_final = df_join.drop("tokens", "filtered_tokens", "normalized_tokens")

In [None]:
# Show final result
print("Processed results:")
df_final.show(truncate=False)

In [17]:
# Stop session
spark.stop()