In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassifier,LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.pipeline import Pipeline
from pyspark.sql.functions import lower, col,regexp_replace,rand
from pyspark.sql import Row
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, IntegerType
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, HashingTF, IDF,Word2Vec
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import pyspark
from pyspark.sql import SparkSession
import os 

In [None]:
 
os.environ["PYTHONPATH"] = "./sparkenv/Lib/site-packages"  # replace path with your python env

scala_version = '2.12'  # Scala version
spark_version = '3.5.3' # Spark version

packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.6.0'  # Kafka version
]

PYTHON_EXECUTABLE = "//sparkenv/Scripts/python.exe"# replace path with your python env
# Replace path file that install in your folder
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Sentiment-Model") \
    .config("spark.jars.packages", ",".join(packages)) \
    .config("spark.pyspark.python", PYTHON_EXECUTABLE) \
    .config("spark.pyspark.driver.python", PYTHON_EXECUTABLE) \
    .config("spark.executorEnv.PYTHONPATH", PYTHON_EXECUTABLE) \
    .config("spark.executorEnv.PYSPARK_PYTHON", PYTHON_EXECUTABLE) \
    .config("spark.driver.extraClassPath", "./spark-3.5.3-bin-hadoop3/jars") \
    .config("spark.executor.extraClassPath", "./spark-3.5.3-bin-hadoop3/jars") \
    .config("spark.local.dir", "C:/sparktmp") \
    .config("spark.hadoop.io.native.lib", "false") \
    .getOrCreate()

spark


In [None]:
# prepare data for training and testing
train = spark.read.csv(r".\data\vihsd\train.csv", header=True, inferSchema=True)
test =spark.read.csv(r".\data\vihsd\test.csv", header=True, inferSchema=True)

In [None]:
train.show(5)

In [None]:
# Undersampling for the major label in data to handle imbalance data
train_major=train.filter(train.label_id==0).orderBy(rand())
train_major=train_major.limit(5000)
train=train.filter((train.label_id==1) | (train.label_id==2)).union(train_major)

In [None]:
#convert label to int type
train = train.withColumn("label_id", train["label_id"].cast(IntegerType()))
test = test.withColumn("label_id", test["label_id"].cast(IntegerType()))

In [None]:
train.printSchema()

In [None]:
#Rename columns
train=train.withColumnRenamed("free_text","Comment")\
    .withColumnRenamed("label_id","Sentiment")
test=test.withColumnRenamed("free_text","Comment")\
    .withColumnRenamed("label_id","Sentiment")

In [None]:
#Data processing retains letters, spaces
regex_pattern = "[^a-zA-ZÀÁÂÃÈÉÊÌÍÒÓÔÕÙÚĂĐĨŨƠàáâãèéêìíòóôõùúăđĩũơƯĂÂÊÔơưăâêô\s]"
train = train.withColumn("Comment", lower(regexp_replace(train["Comment"], regex_pattern, "")))
test = test.withColumn("Comment", lower(regexp_replace(test["Comment"], regex_pattern, "")))

In [None]:
# Drop null values 
train = train.na.drop(subset=["Comment","Sentiment"])
test=train.na.drop(subset=["Comment","Sentiment"])

In [None]:
train.select("Sentiment").distinct().show()

In [None]:
# Tokenize the "Comment" column by splitting text based on whitespace
tokenizer = RegexTokenizer(inputCol="Comment", outputCol="words", pattern="\\s+")
# Remove common stopwords from the tokenized words
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
# Convert words into numerical vectors using Word2Vec
word2vec = Word2Vec(vectorSize=1000, minCount=3, inputCol="filtered_words", outputCol="features")

In [None]:
# Metrics to evaluate Precision, Recall, F1 Score
precision_evaluator = MulticlassClassificationEvaluator(labelCol="Sentiment", predictionCol="prediction", metricName="precisionByLabel")
recall_evaluator = MulticlassClassificationEvaluator(labelCol="Sentiment", predictionCol="prediction", metricName="recallByLabel")
f1_evaluator = MulticlassClassificationEvaluator(labelCol="Sentiment", predictionCol="prediction", metricName="f1")

## Random Forest

In [None]:
# Create Pipeline and training model random forest
rf= RandomForestClassifier(featuresCol="features",labelCol="Sentiment")

pipeline_rf = Pipeline(stages=[tokenizer, stopwords_remover, word2vec, rf])

model_rf = pipeline_rf.fit(train)

In [None]:
# Predict in test dataset
predictions_rf = model_rf.transform(test)
predictions_rf.select("Sentiment","prediction").distinct().show()

In [None]:

# Evaluate model on test dataset
precision_rf = precision_evaluator.evaluate(predictions_rf)
recall_rf = recall_evaluator.evaluate(predictions_rf)
f1_rf = f1_evaluator.evaluate(predictions_rf)

print(f"Random Forest - Precision: {precision_rf:.4f}, Recall: {recall_rf:.4f}, F1-score: {f1_rf:.4f}")

Random Forest - Precision: 0.6574, Recall: 0.8942, F1-score: 0.5681


### Tuning

In [None]:
# Optimize param with CrossValidation for RandomForest
rf= RandomForestClassifier(featuresCol="features",labelCol="Sentiment")
pipeline_rf = Pipeline(stages=[tokenizer, stopwords_remover, word2vec, rf])
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [50, 100, 150])  # Num tree
             .addGrid(rf.maxDepth, [ 10, 15])  # Depth Tree
             .build())

# Cross Validator with 3 folds
cv = CrossValidator(estimator=pipeline_rf,
                    estimatorParamMaps=paramGrid,
                    evaluator=f1_evaluator,
                    numFolds=3)  

cvModel = cv.fit(train)

predictions = cvModel.transform(test)

f1 = f1_evaluator.evaluate(predictions)
print(f"f1 (Random Forest): {f1}")

best_model = cvModel.bestModel


f1 (Random Forest): 0.9237625958393878


In [None]:
predictions.select("Sentiment","prediction").distinct().show()

In [None]:
# Take best model Random Forest from CrossValidator
best_rf_model = cvModel.bestModel.stages[-1]  # RandomForestClassifier last stage in pipeline

# Show the param
print(f"Best numTrees: {best_rf_model.getNumTrees}")
print(f"Best maxDepth: {best_rf_model.getMaxDepth()}")


Best numTrees: 100
Best maxDepth: 15


In [17]:
predictions_rf=best_model.transform(test)

In [18]:
precision_rf = precision_evaluator.evaluate(predictions_rf)
recall_rf = recall_evaluator.evaluate(predictions_rf)
f1_rf = f1_evaluator.evaluate(predictions_rf)

print(f"Random Forest - Precision: {precision_rf:.4f}, Recall: {recall_rf:.4f}, F1-score: {f1_rf:.4f}")

Random Forest - Precision: 0.9155, Recall: 0.9790, F1-score: 0.9238


In [None]:
# Save model
best_model.save("models/randomforest_model")

## Logistic Regression

In [None]:
lr = LogisticRegression(labelCol="Sentiment", featuresCol="features")
pipeline_lr = Pipeline(stages=[tokenizer, stopwords_remover, word2vec, lr])


model_lr = pipeline_lr.fit(train)


In [None]:
predictions_lr = model_lr.transform(test)
predictions_lr.select("Sentiment","prediction").distinct().show()

In [None]:
precision_lr = precision_evaluator.evaluate(predictions_lr)
recall_lr = recall_evaluator.evaluate(predictions_lr)
f1_lr = f1_evaluator.evaluate(predictions_lr)

print(f"Logistic Regression - Precision: {precision_lr:.4f}, Recall: {recall_lr:.4f}, F1-score: {f1_lr:.4f}")

Logistic Regression - Precision: 0.6570, Recall: 0.8904, F1-score: 0.5969


### Tunning

In [None]:
lr = LogisticRegression(labelCol="Sentiment", featuresCol="features")
pipeline_lr = Pipeline(stages=[tokenizer, stopwords_remover, word2vec, lr])

paramGrid = (ParamGridBuilder()
             .addGrid(lr.maxIter, [10, 50, 100])       # Iter
             .addGrid(lr.regParam, [0.0, 0.1, 0.01])   #  regularization
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])  #  L1 & L2
             .build())

cv = CrossValidator(estimator=pipeline_lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=f1_evaluator,
                    numFolds=3)  

cvModel = cv.fit(train)

predictions = cvModel.transform(test)

f1 = f1_evaluator.evaluate(predictions)
print(f"f1 (Logistic Regression): {f1}")

best_model_lr = cvModel.bestModel


f1 (Logistic Regression): 0.6101048121071495


In [None]:
# Take best model Logistic Regression from CrossValidation
best_lr_model = cvModel.bestModel.stages[-1] 

# Show optimize param
print(f"Best maxIter: {best_lr_model.getMaxIter()}")
print(f"Best regParam: {best_lr_model.getRegParam()}")
print(f"Best elasticNetParam: {best_lr_model.getElasticNetParam()}")

Best maxIter: 100
Best regParam: 0.0
Best elasticNetParam: 0.0


In [None]:
best_model_lr.save("models/logisticRegression_model")