In [1]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
from pyspark.ml import Pipeline # pipeline to transform data
from pyspark.sql import SparkSession # to initiate spark
from pyspark.sql.types import FloatType
from pyspark.ml.feature import RegexTokenizer # tokenizer
from pyspark.ml.feature import HashingTF, IDF # vectorizer
from pyspark.ml.feature import StopWordsRemover # to remove stop words
from pyspark.sql.functions import concat_ws, col # to concatinate cols
from pyspark.ml.classification import LogisticRegression # ml model
from pyspark.ml.evaluation import MulticlassClassificationEvaluator # to evaluate the model
from pyspark.mllib.evaluation import MulticlassMetrics # # performance metrics

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import Normalizer, StandardScaler


scala_version = '2.12'  # your scala version
spark_version = '3.5.1' # your spark version
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.7.0' #your kafka version
]

spark = SparkSession \
        .builder \
        .appName("Feedback classification") \
        .master("local[*]") \
        .config("spark.jars.packages", ','.join(packages)) \
        .getOrCreate()

spark

In [4]:
# %pip install pyspark

In [5]:
# Load training data
df = spark.read.csv("E:/University/VI BigData/DLSpark/StudentsFeedback\StudentFeedbackDataset.csv", inferSchema=True, header=True)
df.show(truncate=False)



+---------------------------------------------------------------------------------------------------------------------------------+-----+
|Feedbacks                                                                                                                        |senti|
+---------------------------------------------------------------------------------------------------------------------------------+-----+
|slide giáo trình đầy đủ .                                                                                                        |2    |
|nhiệt tình giảng dạy , gần gũi với sinh viên .                                                                                   |2    |
|đi học đầy đủ full điểm chuyên cần .                                                                                             |0    |
|chưa áp dụng công nghệ thông tin và các thiết bị hỗ trợ cho việc giảng dạy .                                                     |0    |
|thầy giảng bài hay , có nhiều bài

In [6]:
df = df.withColumnRenamed('senti', 'label')
df = df.withColumnRenamed('Feedbacks', 'Text')

# convert label string to integer
from pyspark.sql.functions import col
df = df.withColumn("label", col("label").cast("integer"))

# Shows top 10 rows
df.show(10)

+--------------------+-----+
|                Text|label|
+--------------------+-----+
|slide giáo trình ...|    2|
|nhiệt tình giảng ...|    2|
|đi học đầy đủ ful...|    0|
|chưa áp dụng công...|    0|
|thầy giảng bài ha...|    2|
|giảng viên đảm bả...|    2|
|em sẽ nợ môn này ...|    1|
|thời lượng học qu...|    0|
|nội dung môn học ...|    0|
|cần nói rõ hơn bằ...|    0|
+--------------------+-----+
only showing top 10 rows



In [7]:
# convert sentences to list of words
tokenizer = RegexTokenizer(inputCol="Text", outputCol="words", pattern=" ")

# remove white space in each list in words column

# adds a column 'words' to df after tokenization
df = tokenizer.transform(df)
# df = df.withColumn('Text', regexp_replace('Text', ' ', ''))

# df.select(['label','Text', 'words']).show(5)
df.show(5)

+--------------------+-----+--------------------+
|                Text|label|               words|
+--------------------+-----+--------------------+
|slide giáo trình ...|    2|[slide, giáo, trì...|
|nhiệt tình giảng ...|    2|[nhiệt, tình, giả...|
|đi học đầy đủ ful...|    0|[đi, học, đầy, đủ...|
|chưa áp dụng công...|    0|[chưa, áp, dụng, ...|
|thầy giảng bài ha...|    2|[thầy, giảng, bài...|
+--------------------+-----+--------------------+
only showing top 5 rows



In [8]:
# read txt file and convert it to a list
with open('E:/University/VI BigData/DLSpark/StudentsFeedback/vietnamese-stopwords.txt','r',encoding='utf-8') as f:
    stopwords = f.readlines()

stopwords = [x.strip() for x in stopwords]



# to remove stop words like is, the, in, etc.
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered",stopWords=stopwords)
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered",stopWords=[", ","."," "," ,"," , ","!","?","(",")","[","]","{","}","-","_","@","#","$","%","^","&","*","/","\\","|","<",">","~","`","1","2","3","4","5","6","7","8","9","0","'",";","=","+","-","_",".",":",";","/","?","!","@","#","$","%","^","&","*","(",")","[","]","{","}","<",">","~","`","1","2","3","4","5","6","7","8","9","0","-","_",".",";",":","/","?","!","@","#","$","%","^","&","*","(",")","[","]","{","}","<",">","~","`","1","2","3","4","5","6","7","8","9","0","-","_",".",";",":","/","?","!","@","#","$","%","^","&","*","(",")","[","]","{","}","<",">","~","`","1","2","3","4","5","6","7","8","9","0","-","_",".",";",":","/","?","!","@","#","$","%","^","&","*","(",")","[","]","{","}","<",">","~","`","1","2","3","4","5","6","7","8","9","0","-","_",".",";",":","/","?","!","@","#","$","%","^","&","*","(",")","[","]","{","}","<",">","~","`","1","2","3","4","5","6","7","8","9","0","-","_",".",";",":","/","?","!","@","#","$","%","^","&","*","(",")","[","]","{","}","<",">","~","`","1","2","3","4","5","6","7","8","9","0","-","_",".",";",":","/","?","!","@","#","$","%","^","&"])

# adds a column 'filtered' to df without stopwords
df = stopwords_remover.transform(df)

# remove white spaces


df.select(['label','Text', 'words', 'filtered']).show(truncate=False)


+-----+---------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|Text                                                                                                                             |words                                                                                                                                                           |filtered                                                                                                                                                     |
+-----+-------------------------------------------------------------------------------

In [9]:
# Calculate term frequency in each article
hashing_tf = HashingTF(inputCol="filtered",
                       outputCol="raw_features", 
                       numFeatures=10000)

# adds raw tf features to df
featurized_data = hashing_tf.transform(df)

In [10]:
# Inverse document frequency
idf = IDF(inputCol="raw_features", outputCol="features")

idf_vectorizer = idf.fit(featurized_data)

# converting text to vectors
rescaled_data = idf_vectorizer.transform(featurized_data)

# top 20 rows
rescaled_data.select("label",'Text', 'words', 'filtered', "features").show()

# remove last element in each list in filtered column

+-----+--------------------+--------------------+--------------------+--------------------+
|label|                Text|               words|            filtered|            features|
+-----+--------------------+--------------------+--------------------+--------------------+
|    2|slide giáo trình ...|[slide, giáo, trì...|[slide, giáo, trì...|(10000,[1472,1833...|
|    2|nhiệt tình giảng ...|[nhiệt, tình, giả...|[nhiệt, tình, giả...|(10000,[2103,3267...|
|    0|đi học đầy đủ ful...|[đi, học, đầy, đủ...|[đi, học, đầy, đủ...|(10000,[373,572,1...|
|    0|chưa áp dụng công...|[chưa, áp, dụng, ...|[chưa, áp, dụng, ...|(10000,[598,874,1...|
|    2|thầy giảng bài ha...|[thầy, giảng, bài...|[thầy, giảng, bài...|(10000,[445,640,9...|
|    2|giảng viên đảm bả...|[giảng, viên, đảm...|[giảng, viên, đảm...|(10000,[325,445,1...|
|    1|em sẽ nợ môn này ...|[em, sẽ, nợ, môn,...|[em, sẽ, nợ, môn,...|(10000,[156,492,1...|
|    0|thời lượng học qu...|[thời, lượng, học...|[thời, lượng, học...|(10000,[32

In [11]:
# Split Train/Test data
(train, test) = rescaled_data.randomSplit([0.75, 0.25], seed = 202)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 8524
Test Dataset Count: 2902


In [14]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

len_features = len(train.select("features").first()[0])
layers = [len_features, 4, 2, 3]
train = train.filter(train.label.isNotNull() & ~isnan(train.label))


lr = MultilayerPerceptronClassifier(labelCol='label',
                                            featuresCol='features',
                                            maxIter=100,
                                            layers=layers,
                                            blockSize=128,
                                            seed=1234)

# train the model
lrModel = lr.fit(train)

# compute accuracy on the test set
predictions = lrModel.transform(test)
# remove all NULL or NaN values from all categorical columns
predictions = predictions.filter(predictions.label.isNotNull() & ~isnan(predictions.label))
predictions.show()

+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|                Text|label|               words|            filtered|        raw_features|            features|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|"chỉ một câu "" g...|    2|["chỉ, một, câu, ...|["chỉ, một, câu, ...|(10000,[1066,2346...|(10000,[1066,2346...|[5.09208191976221...|[0.99712822660144...|       0.0|
|"cả năm toàn học ...|    0|["cả, năm, toàn, ...|["cả, năm, toàn, ...|(10000,[92,125,37...|(10000,[92,125,37...|[-2.0115877639236...|[0.01062161009016...|       1.0|
|"dạy học sinh hỏi...|    0|["dạy, học, sinh,...|["dạy, học, sinh,...|(10000,[1094,1877...|(10000,[1094,1877...|[-0.3090696925464...|[0.10068488246831...|       2.0|
|"kh

In [17]:
# convert 0,1,2 to negative, neutral, positive
result2 = predictions.withColumn('prediction', when(predictions['prediction'] == 0, 'negative')
                                     .when(predictions['prediction'] == 1, 'neutral')
                                     .otherwise('positive'))

result2.select("Text", 'prediction').show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+
|Text                                                                                                                                                                                    |prediction|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+
|"chỉ một câu "" giấc mơ thành đại gia "" ."                                                                                                                                             |negative  |
|"cả năm toàn học vòng lặp "" if "" "" for "" "" while "" đến bữa cuối mới lơ mơ học được struct với tạo file header mà ra đề 5 điểm trong phần đó ."                                    |neutral   |
|"dạy học 

In [18]:
# Remove null values from label column
predictions = predictions.filter(predictions.label.isNotNull())

# Convert prediction column to DoubleType
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

# Evaluate the model
print("Test-set Accuracy is:", evaluator.evaluate(predictions))

Test-set Accuracy is: 0.874587831490776


In [None]:
# # export predictions to csv
# # predictions.select("Text", 'probability','prediction', 'label').toPandas().to_csv('E:/University/VI BigData/DLSpark/StudentsFeedback/predictions.csv')

# # export result to csv
# result.select("Text", 'prediction').toPandas().to_csv('E:/University/VI BigData/DLSpark/StudentsFeedback/result.csv')
