In [None]:
# Installation on a PC with conda installed:
# conda create -n spark python=3.7 -y
# conda activate spark
# pip install PyYAML jupyter pyspark bigdl-dllib pandas nltk matplotlib wordcloud plotly

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# !nvidia-smi

In [None]:
import os
os.chdir('/content/drive/MyDrive/Twitter')

In [None]:
# !apt-get install openjdk-8-jdk-headless
# !wget https://dlcdn.apache.org/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
# !tar xf spark-3.4.1-bin-hadoop3.tgz

In [None]:
!pip install pyspark



In [None]:
# !pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
# import findspark
# findspark.init()
import pyspark as ps
import warnings
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# import pandas as pd
# import pyspark.pandas as ps
import numpy as np
import time

import pyspark.sql.functions as func

from pyspark.ml import Pipeline as PLine

from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer, Word2Vec, HashingTF, VectorAssembler
from nltk.stem.snowball import SnowballStemmer

from pyspark.ml.classification import LogisticRegression, NaiveBayes, DecisionTreeClassifier, MultilayerPerceptronClassifier, LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

from tqdm import tqdm

In [None]:
# from bigdl.dllib.utils.common import *
# from bigdl.dllib.nn.layer import *
# from bigdl.dllib.optim.optimizer import *
# from bigdl.dllib.nn.criterion import *
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

## 1. Khởi tạo môi trường và Đọc dữ liệu

In [None]:
# conf = create_spark_conf()
# conf.set("spark.memory.fraction", "1").set("spark.driver.memory", "15g").set('spark.driver.cores', '12')
# sc = get_spark_context(conf=conf)
# redire_spark_logs()
# show_bigdl_info_logs()
# init_engine()

In [None]:
spark = SparkSession.builder\
        .master("local[*]")\
        .appName('Sentiment Analysis')\
        .getOrCreate()

In [None]:
# schema = StructType([\
#                     StructField("target", IntegerType(), True),\
#                     StructField("id", IntegerType(), True),\
#                     StructField("date", DateType(), True),\
#                     StructField("flag", StringType(), True),\
#                     StructField("user", StringType(), True),\
#                     StructField("text", StringType(), True)])

df = spark.read.csv('training.1600000.processed.noemoticon.csv', sep=',', header=False, inferSchema=True)
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: long (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)



In [None]:
df = df.withColumnRenamed('_c0','target').withColumnRenamed('_c1','id').withColumnRenamed('_c2','date')\
  .withColumnRenamed('_c3','flag').withColumnRenamed('_c4','user').withColumnRenamed('_c5','text')
df.show()

+------+----------+--------------------+--------+---------------+--------------------+
|target|        id|                date|    flag|           user|                text|
+------+----------+--------------------+--------+---------------+--------------------+
|     0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|     0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|     0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|     0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|     0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|     0|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|     0|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|     0|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|     0|1467811795|Mon Apr 06 22:20:...|NO_

In [None]:
df.head()

Row(target=0, id=1467810369, date='Mon Apr 06 22:19:45 PDT 2009', flag='NO_QUERY', user='_TheSpecialOne_', text="@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D")

Tính vocab size

In [None]:
# df.withColumn('word', f.explode(f.split(f.col('text'), ' ')))\
#     .groupBy('word')\
#     .count()\
#     .sort('count', ascending=False)\
#     .count()

# # vocab size = 1.350.484

## 2. Phân tích, trực quan hóa dữ liệu:

#### 2.1. Tiền xử lý

In [None]:
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
from  nltk.stem import SnowballStemmer
import re
from pyspark.sql import functions as func
from wordcloud import WordCloud
from matplotlib import pyplot as plt
import plotly.express as px
import pandas as pd

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


In [None]:
stop_words = stopwords.words("english")
stemmer = SnowballStemmer("english")
text_cleaning_re = "@\S+|https?:\S+|http?:\S|[^A-Za-z0-9]+"

In [None]:
def preprocess(text, stem=False):
    # Remove link,user and special characters
    text = re.sub(text_cleaning_re, ' ', str(text).lower()).strip()
    tokens = []
    for token in text.split():
        if token not in stop_words:
            if stem:
                tokens.append(stemmer.stem(token))
            else:
                tokens.append(token)
    return " ".join(tokens)

In [None]:
%%time
clean_text = func.udf(lambda x: preprocess(x), StringType())

In [None]:
df_da = df.withColumn('date', expr('substring(date, 5, 27)'))


df_da = df_da.select(col('target').cast('int'),
                         col('id').cast('int'),
                         to_timestamp(col('date'),'MMM dd HH:mm:ss zzz yyyy').alias('date'),
                         col('flag').cast('string'),
                         col('user').cast('string'),
                         col('text').cast('string'),
                        )
df_da = df_da.withColumn("date_minus_7_hours", expr("date - INTERVAL 7 HOURS"))
df_da = df_da.withColumn('weekday', func.date_format('date_minus_7_hours', 'EEEE'))
df_da = df_da.withColumn("hour", func.hour(func.col("date_minus_7_hours")))
df_da = df_da.withColumn('text_cleaned',clean_text(func.col("text")))
df_da = df_da.withColumn('wordCount_clean', func.size(func.split(func.col('text_cleaned'), ' ')))
df_da = df_da.withColumn('wordCount', func.size(func.split(func.col('text'), ' ')))

In [None]:
df_da.show()

#### 2.2. Phân tích dữ liệu

In [None]:
df_da.groupBy("target").count().show()

In [None]:
s_df = df_da.select('target','weekday','hour','text_cleaned','wordCount')
w_gr = s_df.groupBy('weekday','target').count()
pandasDF = w_gr.toPandas()

In [None]:
pandasDF.loc[pandasDF['target'] == 0, 'target'] = 'Negative'
pandasDF.loc[pandasDF['target'] == 4, 'target'] = 'Positive'
d = {'Monday':1, 'Tuesday':2, 'Wednesday':3, 'Thursday':4,'Friday':5,'Saturday':6,'Sunday':7}
pandasDF['Index'] = pandasDF['weekday'].map(d)
pandasDF.sort_values(by=['Index','target'], inplace=True)

In [None]:
fig = px.histogram(pandasDF, y='count', x='weekday', color='target', barmode='group', height=400 )
fig.show()

Số lượng tweet đầu tuần (Thứ 3) và cuối tuần (Thứ 7 và CN) lớn hơn số lượng tweet trong các ngày giữa tuần. Đồng thời vào những ngày này lượng tweet tích cực lớn hơn lượng tweet tiểu cực, trong khi những ngày giữa tuần, lượng tweet tích tực lại ít hơn tweet tiêu cực.

In [None]:
h_gr = s_df.groupBy('hour','target').count()
pandasDF = h_gr.toPandas()
pandasDF.loc[pandasDF['target'] == 0, 'target'] = 'Negative'
pandasDF.loc[pandasDF['target'] == 4, 'target'] = 'Positive'

In [None]:
fig = px.histogram(pandasDF, y='count', x='hour', color='target', barmode='group', height=400, nbins=24)
fig.show()

Lượng tweet thấp nhất trong khoảng 9h tới 21h, và trong khoảng thời gian này lượng tweet tiêu cực lơn hơn lượng tweet tích cực. Từ 22h tới 6h ngày hôm sau số lượng tweet tích cực lại lớn hơn lượng tweet tiêu cực.

In [None]:
word_count = s_df.filter(col("target") == 4).withColumn('word', func.explode(func.split(func.col('text_cleaned'), ' ')))\
    .groupBy('word')\
    .count()\
    .sort('count', ascending=False)\
    .take(50)
word_count_pd = pd.DataFrame(columns=['word', 'count'],data=word_count)
fig = px.histogram(word_count_pd, y='count', x='word', color='count', barmode='group', height=400, title='50 từ phổ biến nhất trong các tweet tích cực',)
fig.show()

In [None]:
word_count = s_df.filter(col("target") == 0).withColumn('word', func.explode(func.split(func.col('text_cleaned'), ' ')))\
    .groupBy('word')\
    .count()\
    .sort('count', ascending=False)\
    .take(50)
word_count_pd = pd.DataFrame(columns=['word', 'count'],data=word_count)
fig = px.histogram(word_count_pd, y='count', x='word', color='count', barmode='group', height=400, title='50 từ phổ biến nhất trong các tweet tiêu cực',)
fig.show()

In [None]:
filtered_df = s_df.filter(col("target") == 4)
word_cloud_text_df = filtered_df.select(concat_ws("", col("text_cleaned")).alias("word_cloud_text"))

# Collect the Aggregated Text
word_cloud_text_list = word_cloud_text_df.rdd.flatMap(lambda x: x).collect()
word_cloud_text = ''.join(word_cloud_text_list)

In [None]:
wordcloud = WordCloud(
    max_font_size=100,
    max_words=100,
    background_color="black",
    scale=10,
    width=1600,
    height=900
).generate(word_cloud_text)
#Figure properties
plt.figure(figsize=(15,12))
plt.imshow(wordcloud, interpolation="bilinear")
plt.axis("off")
plt.title("Wordcloud for Positive Values")
plt.show()

In [None]:
filtered_df = s_df.filter(col("target") == 0)
word_cloud_text_df = filtered_df.select(concat_ws("", col("text_cleaned")).alias("word_cloud_text"))

# Collect the Aggregated Text
word_cloud_text_list = word_cloud_text_df.rdd.flatMap(lambda x: x).collect()
word_cloud_text = ''.join(word_cloud_text_list)
#Creation of wordcloud
wordcloud = WordCloud(
    max_font_size=100,
    max_words=100,
    background_color="black",
    scale=10,
    width=1600,
    height=900
).generate(word_cloud_text)
#Figure properties
plt.figure(figsize=(15,12))
plt.imshow(wordcloud, interpolation="bilinear")
plt.axis("off")
plt.title("Wordcloud for Negative Values")
plt.show()

Có thể suy luận, liên quan tới công việc thì tỉ lệ tweet tiêu cực thường lớn hơn tweet tích cực:
- Ngày làm việc (giữa tuần)
- Khung thời gian làm việc (9h - 21h)
- Từ "work"

In [None]:
c_gr = s_df.groupBy('wordCount','target').count()
pandasDF = c_gr.toPandas()
pandasDF.loc[pandasDF['target'] == 4, 'target'] = 'Positive'
pandasDF.loc[pandasDF['target'] == 0, 'target'] = 'Negative'
fig = px.histogram(pandasDF, y='count', x='wordCount', color='target', barmode='group', height=400)
fig.show()

Những tweet có độ dài khoảng 15 chiếm tỉ lệ lớn và cân bằng giữa tweet tiêu cực và tích cực. Trong khi đó, những tweet ngắn có tỉ lệ tích cực cao hơn tiêu cực, ngược lại những tweet có độ dài lớn hơn thì tỉ lệ tiêu cực lại lớn hơn.

## 3. Tiền xử lý cho mô hình

Kiểm tra sự cân bằng dữ liệu giữa 2 nhãn

In [None]:
df.groupBy("target").count().show()

                                                                                

+------+------+
|target| count|
+------+------+
|     4|800000|
|     0|800000|
+------+------+



=> như vậy dữ liệu tích cực (target=4) và tiêu cực (target=0) là cân bằng, không có trường hợp nào target null cần xử lý

#### 3.1. Xử lý trường 'text'

Xóa các từ riêng bắt đầu bởi @..., http...

Chỉ giữ lại các kí tự chữ cái và dấu cách.

In [None]:
df_clean = df.select('*', lower(regexp_replace(regexp_replace('text', "(@|\\bhttp\\b)\S+\s", ""), "[^a-zA-Z\\s]", " ")).alias('newtext'))
df_clean.head()

Row(target=0, id=1467810369, date='Mon Apr 06 22:19:45 PDT 2009', flag='NO_QUERY', user='_TheSpecialOne_', text="@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D", newtext='  awww  that s a bummer   you shoulda got david carr of third day to do it   d')

In [None]:
# filter to get rows with length of "newtext" > 10

df_clean2 = df_clean.filter(func.length(func.col('newtext'))>10)


In [None]:
# tokenize "newtext" and drop words with length < 1 (word "")

tokenizer = RegexTokenizer(inputCol='newtext', outputCol='tokens', minTokenLength=1)
df_tokens = tokenizer.transform(df_clean2)
df_tokens.head()

Row(target=0, id=1467810369, date='Mon Apr 06 22:19:45 PDT 2009', flag='NO_QUERY', user='_TheSpecialOne_', text="@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D", newtext='  awww  that s a bummer   you shoulda got david carr of third day to do it   d', tokens=['awww', 'that', 's', 'a', 'bummer', 'you', 'shoulda', 'got', 'david', 'carr', 'of', 'third', 'day', 'to', 'do', 'it', 'd'])

In [None]:
# remove stop words

remover = StopWordsRemover(inputCol="tokens", outputCol="words_clean")
df_words_no_stopw = remover.transform(df_tokens)
df_words_no_stopw.head()

Row(target=0, id=1467810369, date='Mon Apr 06 22:19:45 PDT 2009', flag='NO_QUERY', user='_TheSpecialOne_', text="@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D", newtext='  awww  that s a bummer   you shoulda got david carr of third day to do it   d', tokens=['awww', 'that', 's', 'a', 'bummer', 'you', 'shoulda', 'got', 'david', 'carr', 'of', 'third', 'day', 'to', 'do', 'it', 'd'], words_clean=['awww', 'bummer', 'shoulda', 'got', 'david', 'carr', 'third', 'day', 'd'])

Đưa các token về dạng từ gốc

In [None]:
locale = spark._jvm.java.util.Locale
locale.setDefault(locale.forLanguageTag("en-US"))

stemmer = SnowballStemmer(language='english')
stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
df_stemmed = df_words_no_stopw.withColumn("words_stemmed", stemmer_udf("words_clean"))
df_stemmed.head()

Row(target=0, id=1467810369, date='Mon Apr 06 22:19:45 PDT 2009', flag='NO_QUERY', user='_TheSpecialOne_', text="@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D", newtext='  awww  that s a bummer   you shoulda got david carr of third day to do it   d', tokens=['awww', 'that', 's', 'a', 'bummer', 'you', 'shoulda', 'got', 'david', 'carr', 'of', 'third', 'day', 'to', 'do', 'it', 'd'], words_clean=['awww', 'bummer', 'shoulda', 'got', 'david', 'carr', 'third', 'day', 'd'], words_stemmed=['awww', 'bummer', 'shoulda', 'got', 'david', 'carr', 'third', 'day', 'd'])

tính vocab size mới

In [None]:
# df_stemmed.withColumn('word', func.explode('words_stemmed'))\
#     .groupBy('word')\
#     .count()\
#     .count()

# # new vocab size = 240.627

#### 3.2 Xử lý trường target

In [None]:
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
label_df = label_stringIdx.fit(df_stemmed).transform(df_stemmed)
label_df.head()

Row(target=0, id=1467810369, date='Mon Apr 06 22:19:45 PDT 2009', flag='NO_QUERY', user='_TheSpecialOne_', text="@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D", newtext='  awww  that s a bummer   you shoulda got david carr of third day to do it   d', tokens=['awww', 'that', 's', 'a', 'bummer', 'you', 'shoulda', 'got', 'david', 'carr', 'of', 'third', 'day', 'to', 'do', 'it', 'd'], words_clean=['awww', 'bummer', 'shoulda', 'got', 'david', 'carr', 'third', 'day', 'd'], words_stemmed=['awww', 'bummer', 'shoulda', 'got', 'david', 'carr', 'third', 'day', 'd'], label=0.0)

#### 3.3. Thêm trường:
- đếm số từ trong tweet (wordCount)
- ngày trong tuần (weekday)
- giờ trong ngày

In [None]:
label_df = label_df.withColumn('weekday', expr('substring(date, 1, 3)'))
label_df = label_df.withColumn("hour", expr('substring(date, 12, 2)'))
label_df = label_df.withColumn('wordCount', func.size(func.split(func.col('text'), ' ')))
label_df.head()

Row(target=0, id=1467810369, date='Mon Apr 06 22:19:45 PDT 2009', flag='NO_QUERY', user='_TheSpecialOne_', text="@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D", newtext='  awww  that s a bummer   you shoulda got david carr of third day to do it   d', tokens=['awww', 'that', 's', 'a', 'bummer', 'you', 'shoulda', 'got', 'david', 'carr', 'of', 'third', 'day', 'to', 'do', 'it', 'd'], words_clean=['awww', 'bummer', 'shoulda', 'got', 'david', 'carr', 'third', 'day', 'd'], words_stemmed=['awww', 'bummer', 'shoulda', 'got', 'david', 'carr', 'third', 'day', 'd'], label=0.0, weekday='Mon', hour='22', wordCount=20)

In [None]:
weekday_stringIdx = StringIndexer(inputCol = "weekday", outputCol = "weekdayIdx")
wd_df = weekday_stringIdx.fit(label_df).transform(label_df)
wd_df.head()

Row(target=0, id=1467810369, date='Mon Apr 06 22:19:45 PDT 2009', flag='NO_QUERY', user='_TheSpecialOne_', text="@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D", newtext='  awww  that s a bummer   you shoulda got david carr of third day to do it   d', tokens=['awww', 'that', 's', 'a', 'bummer', 'you', 'shoulda', 'got', 'david', 'carr', 'of', 'third', 'day', 'to', 'do', 'it', 'd'], words_clean=['awww', 'bummer', 'shoulda', 'got', 'david', 'carr', 'third', 'day', 'd'], words_stemmed=['awww', 'bummer', 'shoulda', 'got', 'david', 'carr', 'third', 'day', 'd'], label=0.0, weekday='Mon', hour='22', wordCount=20, weekdayIdx=2.0)

In [None]:
hour_stringIdx = StringIndexer(inputCol = "hour", outputCol = "hourIdx")
hour_df = hour_stringIdx.fit(wd_df).transform(wd_df)
hour_df.head()

Row(target=0, id=1467810369, date='Mon Apr 06 22:19:45 PDT 2009', flag='NO_QUERY', user='_TheSpecialOne_', text="@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D", newtext='  awww  that s a bummer   you shoulda got david carr of third day to do it   d', tokens=['awww', 'that', 's', 'a', 'bummer', 'you', 'shoulda', 'got', 'david', 'carr', 'of', 'third', 'day', 'to', 'do', 'it', 'd'], words_clean=['awww', 'bummer', 'shoulda', 'got', 'david', 'carr', 'third', 'day', 'd'], words_stemmed=['awww', 'bummer', 'shoulda', 'got', 'david', 'carr', 'third', 'day', 'd'], label=0.0, weekday='Mon', hour='22', wordCount=20, weekdayIdx=2.0, hourIdx=5.0)

In [None]:
[train_df, test_df] = hour_df.randomSplit([0.8, 0.2], seed = 123)

## 4. Chọn phương pháp feature extraction cho trường text

#### 4.1. TF-IDF

In [None]:
t1 = time.time()

vectorizer = CountVectorizer(minDF=2, vocabSize = 2**17, inputCol="words_stemmed", outputCol="cntVec")
idf = IDF(inputCol="cntVec", outputCol="idfVec", minDocFreq=5)
vecAssembler = VectorAssembler(inputCols=["idfVec", "weekdayIdx", "hourIdx", "wordCount"], outputCol="features")

pipeline1 = PLine(stages=[vectorizer, idf, vecAssembler])
transformer_pipe1 = pipeline1.fit(train_df)
train_vecAss_df = transformer_pipe1.transform(train_df)

t2 = time.time()

print('time of extracting feature: {0:.2f}'.format(t2-t1))

time of extracting feature: 328.21


#### 4.2. HashTF

In [None]:
t1 = time.time()
hashtf = HashingTF(numFeatures=2**17, inputCol="words_stemmed", outputCol='hashTF')
idf2 = IDF(inputCol='hashTF', outputCol="hashTF_idf", minDocFreq=5)
vecAssembler2 = VectorAssembler(inputCols=["hashTF_idf", "weekdayIdx", "hourIdx", "wordCount"], outputCol="features2")

pipeline2 = PLine(stages=[hashtf, idf2, vecAssembler2])
transformer_pipe2 = pipeline2.fit(train_df)
train_hashAss_df = transformer_pipe2.transform(train_df)

t2 = time.time()

print('time of extracting feature: {0: .2f}'.format(t2-t1))


time of extracting feature:  168.57


#### 4.3. Word2Vec

In [None]:
t1 = time.time()
word2vec = Word2Vec(vectorSize=64, minCount=2, inputCol = "words_stemmed", outputCol = "w2v", seed = 123)
vecAssembler3 = VectorAssembler(inputCols=["w2v", "weekdayIdx", "hourIdx", "wordCount"], outputCol="features3")

pipeline3 = PLine(stages=[word2vec, vecAssembler3])
transformer_pipe3 = pipeline3.fit(train_df)
train_w2vAss_df = transformer_pipe3.transform(train_df)

t2 = time.time()
print('Time of extracting feature (word2Vec): {0:.2f}'.format(t2-t1))

# train_w2v_df = word2vec.fit(train_df).transform(train_df)
# vecAssembler = VectorAssembler(inputCols=["w2v", "weekdayIdx", "hourIdx", "wordCount"], outputCol="features3")
# train_w2vAss_df = vecAssembler.transform(train_w2v_df)

Time of extracting feature (word2Vec): 587.69


In [None]:
# t1 = time.time()
# word2vec = Word2Vec(vectorSize=64, minCount=2, inputCol = "words_stemmed", outputCol = "w2v", seed = 123)
# w2v_df = word2vec.fit(label_df).transform(hour_df)

# vecAssembler = VectorAssembler(inputCols=["w2v", "weekdayIdx", "hourIdx", "wordCount"], outputCol="features3")
# w2vAss_df = vecAssembler.transform(w2v_df)
# t2 = time.time()
# print('Time of extracting feature (word2Vec): {0:.2f}'.format(t2-t1))

# [train_w2vAss_df, test_w2vAss_df] = w2vAss_df.randomSplit([0.8, 0.2], seed = 123)



2023-08-18 14:18:44 WARN  BLAS:61 - Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
2023-08-18 14:18:44 WARN  BLAS:61 - Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


                                                                                

Time of extracting feature (word2Vec): 158.10


In [None]:
train_w2vAss_df.rdd.saveAsPickleFile('train_w2vAss_df')

In [None]:
pickleRdd = sc.pickleFile("train_w2vAss_df").collect()
train_w2vAss_df = spark.createDataFrame(pickleRdd)

## 5. Xây dựng & đánh giá các mô hình

#### 5.1. CountVectorizer + IDF + Logistic Regression

In [None]:
### Use Assembler Vector as features to train

t3 = time.time()
lr = LogisticRegression(featuresCol="features", labelCol = "label")
lrModel = lr.fit(train_vecAss_df)
t4 = time.time()
print('time of training: {0:.2f}'.format(t4-t3))

t1 = time.time()
test_vecAss_df = transformer_pipe1.transform(test_df)
t2 = time.time()
print('time of extracting feature of test: {0:.2f}'.format(t2-t1))


predictions = lrModel.transform(test_vecAss_df)


evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC", rawPredictionCol="rawPrediction", labelCol="label")
print('AUC: ', evaluator.evaluate(predictions))

accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(test_vecAss_df.count())
print('accuracy: ', accuracy)


time of training: 404.67
time of extracting feature of test: 0.26
AUC:  0.8520007896804839
accuracy:  0.7787920279933059


#### 5.2. HashTF + IDF + Logistich Regression

In [None]:
t3 = time.time()
lr = LogisticRegression(featuresCol="features2")
lrModel = lr.fit(train_hashAss_df)
t4 = time.time()
print('time of training: {0:.2f}'.format(t4-t3))


t1 = time.time()
test_hashAss_df = transformer_pipe2.transform(test_df)
t2 = time.time()

print('time of extracting feature for test set: {0: .2f}'.format(t2-t1))


predictions = lrModel.transform(test_hashAss_df)


evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
print('AUC: ', evaluator.evaluate(predictions))

accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(test_hashAss_df.count())
print('accuracy', accuracy)

time of training: 427.14
time of extracting feature for test set:  0.36
AUC:  0.8480430906677863
accuracy 0.775571783559004


#### 5.3. Word2Vec + Logistic Regression

In [None]:
t3= time.time()
lrModel_w2v = LogisticRegression(featuresCol="w2v").fit(train_w2vAss_df)
t4 = time.time()
print('Time process Logistic Regression : {0:.2f} seconds'.format(t4-t3) )


t1 = time.time()
test_w2vAss_df = transformer_pipe3.transform(test_df)
t2 = time.time()
print('Time of extracting feature (word2Vec) for test set: {0:.2f}'.format(t2-t1))


predictions_w2v = lrModel_w2v.transform(test_w2vAss_df)


evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
print('AUC: ', evaluator.evaluate(predictions_w2v))

accuracy = predictions_w2v.filter(predictions_w2v.label == predictions_w2v.prediction).count() / float(test_w2vAss_df.count())
print('accuracy: ', accuracy)

Time process Logistic Regression : 402.59 seconds
Time of extracting feature (word2Vec) for test set: 0.55
AUC:  0.7611014084408716
accuracy:  0.6914238805213246


Kết luận: như vậy cách dùng features với trường 'text' về dạng CountVectorizer-IDF cho kết quả tốt nhất. Ta sẽ dùng features kiểu này để dùng với các mô hình khác nhau. Từ đó chọn ra mô hình tốt nhất.

#### 5.4. Thử nghiệm các phương pháp phân loại khác với cùng feature của CountVectorizer + IDF**

##### 5.4.1. Naive Bayes

In [None]:
t1 = time.time()
nb_Model = NaiveBayes(featuresCol="features", modelType="multinomial").fit(train_vecAss_df)
t2 = time.time()

predictions_nb = nb_Model.transform(test_vecAss_df)


print('time of training : {0:.2f}'.format(t2-t1))

evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
print('AUC: ', evaluator.evaluate(predictions_nb))

accuracy = predictions_nb.filter(predictions_nb.label == predictions_nb.prediction).count() / float(test_vecAss_df.count())
print('accuracy: ', accuracy)

time of training : 170.62
AUC:  0.5275415353719223
accuracy:  0.7543771235863888


##### 5.4.2. Decision Tree

In [None]:
t1 = time.time()
dt_Model = DecisionTreeClassifier(featuresCol="features").fit(train_vecAss_df)
t2 = time.time()
print('time of training : {0:.2f}'.format(t2-t1))

predictions_dt = dt_Model.transform(test_vecAss_df)

evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
print('AUC: ', evaluator.evaluate(predictions_dt))

accuracy = predictions_dt.filter(predictions_dt.label == predictions_dt.prediction).count() / float(test_vecAss_df.count())
print('accuracy: ', accuracy)

time of training : 5269.14
AUC:  0.5692042365025892
accuracy:  0.5910543130990416


##### 5.4.3. Linear SVM

In [None]:
# ### Use assembler vector as features to train

t1 = time.time()
svm = LinearSVC(featuresCol = 'features')
svm_model = svm.fit(train_vecAss_df)
t2 = time.time()
print('time of training : {0:.2f}'.format(t2-t1))

predictions_dt = svm_model.transform(test_vecAss_df)

evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
print('AUC: ', evaluator.evaluate(predictions_dt))

accuracy = predictions_dt.filter(predictions_dt.label == predictions_dt.prediction).count() / float(test_vecAss_df.count())
print('accuracy: ', accuracy)

time of training : 528.76
AUC:  0.8528554805962636
accuracy:  0.7791882194837466
