In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=edb2830286f142d4799ec02bae687e58f9ff994041c3b0aa7f353f829ccc71b1
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [3]:
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import FloatType, IntegerType
from pyspark.sql.functions import col, when
from pyspark.sql.functions import udf
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, RegexTokenizer
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql import Row
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier

In [4]:
spark = SparkSession.builder.appName("Model").config("spark.executor.memory","4g").getOrCreate()

In [5]:
import html
schema = " free_text string, label_id int"
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

IN_PATH_RAW = "/content/drive/MyDrive/yt_comment_sentiment/train.csv"
IN_PATH_TEST = "/content/drive/MyDrive/yt_comment_sentiment/test.csv"

spark_reader = spark.read.schema(schema)

user_regex = r"(@\w{1,15})"
hashtag_regex = "(#\w{1,})"
url_regex=r"((https?|ftp|file):\/{2,3})+([-\w+&@#/%=~|$?!:,.]*)|(www.)+([-\w+&@#/%=~|$?!:,.]*)"
email_regex=r"[\w.-]+@[\w.-]+\.[a-zA-Z]{1,}"


@f.udf
def html_unescape(s: str):
    if isinstance(s, str):
        return html.unescape(s)
    return s


def clean_data(df):
    df = (
        df
        .withColumn("original_text", f.col("free_text"))
        # Remove numbers and characters from text
        .withColumn("free_text",f.regexp_replace(f.col("free_text"), "[^a-zA-ZÀ-ỹà-ỹ']", " ",))
        .withColumn("free_text", f.regexp_replace(f.col("free_text"), "'", ""))
        # Remove white space
        .withColumn("free_text",f.regexp_replace(f.col("free_text"), " +", " "))
        .withColumn("free_text",f.trim(f.col("free_text")))
        # Lowercase
        .withColumn("free_text",f.lower(f.col("free_text")))
        .withColumn("free_text", f.regexp_replace(f.col("free_text"), url_regex, ""))
        .withColumn("free_text", f.regexp_replace(f.col("free_text"), email_regex, ""))
        .withColumn("free_text", f.regexp_replace(f.col("free_text"), user_regex, ""))
        .withColumn("free_text", f.regexp_replace(f.col("free_text"), "#", " "))
        .withColumn("free_text", html_unescape(f.col("free_text")))
        .filter("free_text != ''")
    )
    return df

df_train_raw = spark_reader.csv(IN_PATH_RAW)
df_train_clean = clean_data(df_train_raw)
df_train_clean = df_train_clean.na.drop()
df_test_raw = spark_reader.csv(IN_PATH_TEST)
df_test_clean = clean_data(df_test_raw)
df_test_clean = df_test_clean.na.drop()

In [6]:
%%time
from pyspark.ml.feature import (
    StopWordsRemover,
    Tokenizer,
    HashingTF,
    IDF,
    CountVectorizer,
)
from pyspark.sql.functions import udf

with open('/content/drive/MyDrive/yt_comment_sentiment/Stopword.txt', 'r', encoding='utf-8') as file:
    vietnamese_stopwords = file.read().splitlines()

tokenizer = Tokenizer(inputCol="free_text", outputCol="words1") # chuyển sang vector
vietnamese_stopwords_remover = StopWordsRemover(inputCol="words1", outputCol="words2", stopWords=vietnamese_stopwords)
hashing_tf = HashingTF(
    inputCol="words2",
    outputCol="term_frequency",
    numFeatures=20,
)
idf = IDF(
    inputCol="term_frequency",
    outputCol="features",
    minDocFreq=5,
)
(training_data, validation_data) = df_train_clean.randomSplit([0.8, 0.2], seed=42)

CPU times: user 73.4 ms, sys: 17.3 ms, total: 90.7 ms
Wall time: 6.32 s


In [7]:
df1 = tokenizer.transform(training_data)
df2 = vietnamese_stopwords_remover.transform(df1) # xóa những từ trong stop-word
df3 = hashing_tf.transform(df2)
df4 = idf.fit(df3).transform(df3)
df4 = df4.na.drop()

# **LogisticRegression**

In [8]:
lr = LogisticRegression(labelCol='label_id')
semantic_analysis_pipeline = Pipeline(
    stages=[tokenizer, vietnamese_stopwords_remover, hashing_tf, idf, lr]
)
semantic_analysis_model = semantic_analysis_pipeline.fit(training_data)

trained_df = semantic_analysis_model.transform(training_data)
val_df = semantic_analysis_model.transform(validation_data)
test_df = semantic_analysis_model.transform(df_test_clean)

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label_id", metricName="accuracy")

accuracy_val = evaluator.evaluate(val_df)
accuracy_test = evaluator.evaluate(test_df)

print("\nTesting Val Data:")
print(f"Accuracy: {accuracy_val*100:.5f}%")
print("\nTesting Data:")
print(f"Accuracy: {accuracy_test*100:.5f}%")


Testing Val Data:
Accuracy: 83.12566%

Testing Data:
Accuracy: 82.69318%


# **DecisionTreeClassifier**

In [9]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

dt = DecisionTreeClassifier(labelCol='label_id')
semantic_analysis_pipeline = Pipeline(
    stages=[tokenizer, vietnamese_stopwords_remover, hashing_tf, idf, dt]
)
semantic_analysis_model = semantic_analysis_pipeline.fit(training_data)

trained_df = semantic_analysis_model.transform(training_data)
val_df = semantic_analysis_model.transform(validation_data)
test_df = semantic_analysis_model.transform(df_test_clean)

evaluator = MulticlassClassificationEvaluator(labelCol="label_id", metricName="accuracy")

accuracy_val = evaluator.evaluate(val_df)
accuracy_test = evaluator.evaluate(test_df)

print("\nTesting Val Data:")
print(f"Accuracy: {accuracy_val*100:.5f}%")
print("\nTesting Data:")
print(f"Accuracy: {accuracy_test*100:.5f}%")


Testing Val Data:
Accuracy: 82.57656%

Testing Data:
Accuracy: 82.43510%


# **RandomForestClassifier**

In [10]:
rf = RandomForestClassifier(labelCol='label_id')
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

dt = DecisionTreeClassifier(labelCol='label_id')
semantic_analysis_pipeline = Pipeline(
    stages=[tokenizer, vietnamese_stopwords_remover, hashing_tf, idf, rf]
)
semantic_analysis_model = semantic_analysis_pipeline.fit(training_data)

trained_df = semantic_analysis_model.transform(training_data)
val_df = semantic_analysis_model.transform(validation_data)
test_df = semantic_analysis_model.transform(df_test_clean)

evaluator = MulticlassClassificationEvaluator(labelCol="label_id", metricName="accuracy")

accuracy_val = evaluator.evaluate(val_df)
accuracy_test = evaluator.evaluate(test_df)

print("\nTesting Val Data:")
print(f"Accuracy: {accuracy_val*100:.5f}%")
print("\nTesting Data:")
print(f"Accuracy: {accuracy_test*100:.5f}%")


Testing Val Data:
Accuracy: 82.87223%

Testing Data:
Accuracy: 82.82982%
