

*   Võ Đình Tứ 
*   Hồ Anh Dũng
*   Nguyễn Hữu Trường



# Dataset

In [None]:
!pip install pyspark
!pip install findspark
!pip install sparknlp
!sudo apt install openjdk-8-jdk
!sudo update-alternatives --config java
! java -version

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f 
import pandas as pd
import html
from pyspark.sql.types import StructField,IntegerType, StructType,StringType, FloatType
from pyspark.sql.functions import col, when , regexp_replace
from pyspark.sql.functions import udf

In [None]:
import sparknlp
spark = sparknlp.start()

print("Spark NLP version: ", sparknlp.version())
print("Apache Spark version: ", spark.version)

In [None]:
sc = spark.sparkContext
path = "/content/drive/MyDrive/bigdata/IMDB_review/part-06.json"
data = spark.read.json(path)

In [None]:
data=data.dropna(how="any")

In [None]:
data = data.withColumn("sentiment",when(data.rating < 5  ,"negative").when((data.rating >= 5)&(data.rating<7),"neutral").when(data.rating >= 7,"positive"))

In [None]:
data_clean=data.select("review_detail","sentiment")

In [None]:
train, test,val = data_clean.randomSplit([2.0,0.5,7.5])

In [None]:
user_regex = r"(@\w{1,15})"
hashtag_replace_regex = "#(\w{1,})"
url_regex = r"((https?|ftp|file):\/{2,3})+([-\w+&@#/%=~|$?!:,.]*)|(www.)+([-\w+&@#/%=~|$?!:,.]*)"
email_regex = r"[\w.-]+@[\w.-]+\.[a-zA-Z]{1,}"
i_regex = r"i "

def cleaning_process(data):
            # Loại bỏ @Mention khỏi text
    data=(data.withColumn("review_detail",f.regexp_replace(f.col("review_detail"), user_regex, "")) 
            # Loại bỏ #Hashtag khỏi text
            .withColumn("review_detail",f.regexp_replace(f.col("review_detail"), hashtag_replace_regex, "$1"))
            # Loại bỏ URL khỏi text
            .withColumn("review_detail",f.regexp_replace(f.col("review_detail"), url_regex, "")) 
            # Loại bỏ Email khỏi text
            .withColumn("review_detail",f.regexp_replace(f.col("review_detail"), email_regex, ""))
            # Loại bỏ số cũng như các ký tự khỏi đoạn text
            .withColumn("review_detail",f.regexp_replace(f.col("review_detail"), "[^a-zA-Z]", " "))
            # Loại bỏ các khoảng trắng thừa trong câu
            .withColumn("review_detail",f.regexp_replace(f.col("review_detail"), " +", " "))
            # Loại vỏ các khoảng trắng đầu và cuối câu
            .withColumn("review_detail",f.trim(f.col("review_detail")))\
            # Chuẩn hoá viết thường
            .withColumn("review_detail",f.lower(f.col("review_detail")))
            # Giữ lại các dòng mà đoạn text có nội dung 
            .filter(f.col("review_detail") != ""))
    return data

In [None]:
dl_train = cleaning_process(train)
dl_test = cleaning_process(test)

In [None]:
dl_train.toPandas().to_csv("/content/drive/MyDrive/bigdata/df_train.csv",index = 0,header = True)
dl_test.toPandas().to_csv("/content/drive/MyDrive/bigdata/df_test.csv",index = 0,header = True)

# Train model

## Setup colab

In [None]:
# Kết nối driver
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
#! pip install --ignore-installed pyspark==2.4.4
#! pip install --ignore-installed spark-nlp==2.6.2

## Cài đặt các thư viện và java

In [None]:
!pip install pyspark
!pip install findspark
!pip install sparknlp
!sudo apt install openjdk-8-jdk
!sudo update-alternatives --config java
! java -version

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 61 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 64.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=fd20a713a3bf8c3f63538fe1512854e180da00cb166b1257e5b4c97b7d3482eb
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2
Collecting findspark
  Downloading findspark-1.4.2-py2.py3-none-any.whl (4.2 kB)
Installing collected packages: findspark
Successfully installed findspark-1.4.2
Collecting sparknlp
  Downloading sparknlp-1.0.0-py3-none-any.w

In [None]:
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline
import pandas as pd


## Khai báo spark

In [None]:
spark = sparknlp.start(gpu = True) # for GPU training >> sparknlp.start(gpu = True) # for Spark 2.3 =>> sparknlp.start(spark23 = True)
print("Spark NLP version", sparknlp.version())
print("Apache Spark version:", spark.version)

spark

Spark NLP version 3.1.1
Apache Spark version: 3.1.2


## Load Data

In [None]:
schema1 = "review_detail STRING, sentiment STRING"
data_train = spark.read.csv("/content/drive/MyDrive/bigdata/df_train.csv", header=True,schema=schema1)

data_test = spark.read.csv("/content/drive/MyDrive/bigdata/df_test.csv", header=True,schema=schema1)

In [None]:
data_train.show()

+--------------------+---------+
|       review_detail|sentiment|
+--------------------+---------+
|spoilers ahead a ...| positive|
|spoilers a hot sh...|  neutral|
|spoilers for thos...|  neutral|
|mild spoilers das...|  neutral|
|spoilers cop jack...| negative|
|spoilers and no i...| negative|
|spoilers this ins...| positive|
|spoilers the very...| positive|
|may contain mild ...| positive|
|a lot like love i...| positive|
|a mighty wind was...| positive|
|bought it watched...| positive|
|heist directed by...| positive|
|documentary filmm...| negative|
|i m not worth it ...| positive|
|did not return fr...| positive|
|and her love for ...| positive|
|the beat is too s...| positive|
|all the marbles i...|  neutral|
+--------------------+---------+
only showing top 20 rows



## Train model

In [None]:
from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, OneHotEncoder, StringIndexer, VectorAssembler, SQLTransformer
from pyspark.sql.functions import col, when , regexp_replace
import shutil

In [None]:
%%time

document_assembler = DocumentAssembler() \
      .setInputCol("review_detail") \
      .setOutputCol("document")
    
tokenizer = Tokenizer() \
      .setInputCols(["document"]) \
      .setOutputCol("token")
      
normalizer = Normalizer() \
      .setInputCols(["token"]) \
      .setOutputCol("normalized")

stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)

stemmer = Stemmer() \
      .setInputCols(["cleanTokens"]) \
      .setOutputCol("stem")

finisher = Finisher() \
      .setInputCols(["stem"]) \
      .setOutputCols(["token_features"]) \
      .setOutputAsArray(True) \
      .setCleanAnnotations(False)

hashing_tf = HashingTF(inputCol = "token_features",
                       outputCol = "raw_feature")

idf = IDF(inputCol = "raw_feature",
          outputCol = "features",
          minDocFreq = 5) 

label_stringIdx = StringIndexer(inputCol = "sentiment", outputCol = "label")

CPU times: user 25.6 ms, sys: 13.1 ms, total: 38.6 ms
Wall time: 213 ms


### Machine learning

#### TF-IDF vectorizer + Logistic Regeression Classifier

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol = "label",maxIter=10, regParam=0.3, elasticNetParam=0)

nlp_pipeline = Pipeline(
    stages=[document_assembler, 
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            stemmer, 
            finisher,
            hashing_tf,
            idf,
            label_stringIdx,
            lr
            ])


lr_model= nlp_pipeline.fit(data_train)

In [None]:
pred_lr = lr_model.transform(data_test)

In [None]:
pred_lr.show(100)

In [None]:
from sklearn.metrics import classification_report, accuracy_score

df_lr = pred_lr.select('review_detail','label','prediction').toPandas()
print(classification_report(df_lr.label,df_lr.prediction))

              precision    recall  f1-score   support

         0.0       0.80      0.97      0.88     10371
         1.0       0.79      0.56      0.66      3014
         2.0       0.40      0.07      0.12      1771

    accuracy                           0.79     15156
   macro avg       0.66      0.54      0.55     15156
weighted avg       0.75      0.79      0.74     15156



In [None]:
lr_model.save("/content/drive/MyDrive/bigdata/NLP_model_lr")
shutil.make_archive("model","zip","/content/drive/MyDrive/bigdata/NLP_model_lr")

'/content/model.zip'

#### TF-IDF vectorizer + NavieBayes

In [None]:
from pyspark.ml.classification import NaiveBayes
Nb = NaiveBayes(smoothing=111)

nlp_pipeline_NB = Pipeline(
    stages=[document_assembler, 
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            stemmer, 
            finisher,
            hashing_tf,
            idf,
            label_stringIdx,
            Nb
            ])


NB_model= nlp_pipeline_NB.fit(data_train)

pred_NB = NB_model.transform(data_test)

In [None]:
from sklearn.metrics import classification_report, accuracy_score

df_NB = pred_NB.select('review_detail','label','prediction').toPandas()
print(classification_report(df_NB.label,df_NB.prediction))

              precision    recall  f1-score   support

         0.0       0.68      1.00      0.81     10371
         1.0       1.00      0.00      0.00      3014
         2.0       0.00      0.00      0.00      1771

    accuracy                           0.68     15156
   macro avg       0.56      0.33      0.27     15156
weighted avg       0.67      0.68      0.56     15156



  _warn_prf(average, modifier, msg_start, len(result))


### Deeplearning

#### Bertembedding + ClassifierDLApproach

In [None]:

bert_embeddings = BertEmbeddings().pretrained(name='small_bert_L4_256', lang='en') \
    .setInputCols(["document",'token'])\
    .setOutputCol("embeddings")

embeddingsSentence = SentenceEmbeddings() \
    .setInputCols(["document", "embeddings"]) \
    .setOutputCol("sentence_embeddings") \
    .setPoolingStrategy("AVERAGE")

classsifierdl = ClassifierDLApproach()\
    .setInputCols(["sentence_embeddings"])\
    .setOutputCol("class")\
    .setLabelColumn("sentiment")\
    .setMaxEpochs(10)\
    .setLr(0.001)\
    .setBatchSize(8)\
    .setEnableOutputLogs(True)

Dl_model = Pipeline(stages=[
    document_assembler, 
    tokenizer,
    bert_embeddings,
    embeddingsSentence,
    classsifierdl
])

small_bert_L4_256 download started this may take some time.
Approximate size to download 40.5 MB
[OK!]


In [None]:
%%time
bert_model= Dl_model.fit(data_train)

In [None]:
pred_bert = bert_model.transform(data_test)

In [None]:
pred_bert.show()

+--------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+
|       review_detail|sentiment|            document|               token|          embeddings| sentence_embeddings|               class|
+--------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+
|possible mild spo...| negative|[{document, 0, 72...|[{token, 0, 7, po...|[{word_embeddings...|[{sentence_embedd...|[{category, 0, 72...|
|a lot like love i...| positive|[{document, 0, 80...|[{token, 0, 0, a,...|[{word_embeddings...|[{sentence_embedd...|[{category, 0, 80...|
|a worthy sequel b...| negative|[{document, 0, 21...|[{token, 0, 0, a,...|[{word_embeddings...|[{sentence_embedd...|[{category, 0, 21...|
|th hour is a very...| positive|[{document, 0, 35...|[{token, 0, 1, th...|[{word_embeddings...|[{sentence_embedd...|[{category, 0, 35...|
|femmes is such a ...| negative|[{

In [None]:
preds_df = pred_bert.select('review_detail','sentiment',"class.result").toPandas()

In [None]:
preds_df['result'] = preds_df['result'].apply(lambda x : x[0])

In [None]:
from sklearn.metrics import classification_report
print (classification_report(preds_df['result'], preds_df['sentiment']))

  _warn_prf(average, modifier, msg_start, len(result))


              precision    recall  f1-score   support

    negative       0.51      0.57      0.54      2682
     neutral       0.00      0.00      0.00         0
    positive       0.93      0.76      0.84     12291

    accuracy                           0.73     14973
   macro avg       0.48      0.44      0.46     14973
weighted avg       0.85      0.73      0.78     14973



#### Glove+ ClassifierDLApproach

In [None]:

use = UniversalSentenceEncoder.pretrained('tfhub_use', lang="en") \
    .setInputCols(["document"])\
    .setOutputCol("sentence_embeddings")

classifier = SentimentDLModel().pretrained('sentimentdl_use_imdb')\
    .setInputCols(["sentence_embeddings"])\
    .setOutputCol("class")
    .setLabelColumn("sentiment")

nlp_pipeline = Pipeline(stages=[document_assembler,
                                use,
                                classifier
                                ])

IndentationError: ignored

In [None]:
embeddings = WordEmbeddingsModel().pretrained("glove_100d")\
    .setInputCols(['document','tokens'])\
    .setOutputCol('word_embeddings')
sentence_embeddings = SentenceEmbeddings() \
      .setInputCols(["document", "word_embeddings"]) \
      .setOutputCol("sentence_embeddings") \
      .setPoolingStrategy("AVERAGE")
classifier = SentimentDLModel().pretrained('sentimentdl_glove_imdb')\
    .setInputCols(["sentence_embeddings"])\
    .setOutputCol("sentiment")

# Real-time sentiment

## Carwl Data review IMDB

In [None]:
! pip install selenium

In [None]:
from selenium import webdriver
import time, urllib.parse
from bs4 import BeautifulSoup as soup

In [None]:
d = webdriver.Chrome(r'C:\Program Files\crawldata\chromedriver')
d.get((l:='https://www.imdb.com/title/tt5034838/reviews/?ref_=tt_ql_urv'))
while int(d.execute_script("return Array.from(document.querySelectorAll('#main .review-container')).length")) < int((d.execute_script("return document.querySelector('.header span').textContent").split()[0]).replace(',','')):
   d.execute_script('document.querySelector(".ipl-load-more__button").click()')
   time.sleep(3)

r = [{#'score':i.select_one('span.rating-other-user-rating span:nth-of-type(1)').get_text(strip=True),
      #'title':i.select_one('a.title').get_text(strip=True),
      'reviewer_name':(j:=i.select_one('.display-name-link > a')).get_text(strip=True),
      #'reviewer_link':urllib.parse.urljoin(l, j['href']),
      #'date':(j:=i.select_one('.display-name-link > .review-date')).get_text(strip=True),
       #'date':i.select_one('.display-name-link > .review-date').get_date(),
       'date':i.select_one('.review-date').get_text(strip=True),
      'text':i.select_one('.content > .text').get_text(strip=True)
    } 
    for i in soup(d.page_source, 'html.parser').select('#main .review-container')]

len(r)

In [None]:
import json
with open('Downloads/GodzillavsKong.json', 'w') as json_file:
    json.dump(r, json_file)

## Real-time

In [None]:
import pandas as pd
from IPython.display import display, clear_output
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as f
from pyspark.ml import PipelineModel
from pyspark.sql.functions import udf
from pyspark.sql.streaming import DataStreamReader
import html
from pyspark.sql.functions import col, when , regexp_replace

### Load data

In [None]:
IN_PATH='/content/drive/MyDrive/bigdata/BigData_Project/CrawldataIMDB'
#timestampformat='EEE MMM dd HH:mm:ss zzzz yyyy'
spark.sql('set spark.sql.legacy.timeParserPolicy=LEGACY')
spark= SparkSession.builder.appName("StructuredStreamingExample").getOrCreate()
spark.conf.set('spark.sql.legacy.timeParserPolicy',"LEGACY")
schema=spark.read.json(IN_PATH).limit(10).schema
spark_reader=spark.readStream.schema(schema)

In [None]:
streaming_data_raw=(spark_reader.json(IN_PATH).select(f.col('date').alias('timestamp'),f.col('reviewer_name').alias('user'),f.col('text').alias('review_detail'),).coalesce(1))
stream_writer=(streaming_data_raw.writeStream.queryName('data').trigger(once=True).outputMode('append').format('memory'))
query=stream_writer.start()
display(spark.sql(f"SELECT * from {query.name}").show())

### Load model

In [None]:
sentiment_model=PipelineModel.load('/content/drive/MyDrive/bigdata/DL_model')

In [None]:
raw_sentiment=sentiment_model.transform(streaming_data_clean)
sentiment=raw_sentiment.select('timestamp','user','review_detail','probability')


In [None]:
from pyspark.ml.functions import vector_to_array
final = sentiment.select('timestamp','user','review_detail',"probability")\
.withColumn("probability",vector_to_array(f.col("probability")))\
.withColumn("probability",f.element_at(f.col("probability"),-1))
result = final.withColumn("sentiment",
                        f.when(f.col("probability") < 0.475,'negative')
                        .when((f.col("probability") >= 0.475) & (f.col("probability") <= 0.675),'neutral')
                        .when(f.col("probability") > 0.675,'positive')
                        .otherwise(f.col("probability")))

In [None]:
result=result.select('timestamp','user','review_detail','sentiment')
sentiment_count_result=result.groupBy("sentiment").agg(f.count("sentiment").alias("count")).sort("count")

In [None]:
stream_writer1=(result.writeStream.queryName('result').trigger(processingTime='5 seconds').outputMode('append').format('memory'))
query1=stream_writer1.start()
stream_writer2=(sentiment_count_result.writeStream.queryName('data').trigger(processingTime='5 seconds').outputMode('complete').format('memory'))
query2=stream_writer2.start()

### Demo real-time

In [None]:
if raw_sentiment.isStreaming:
    from time import sleep
    for x in range(0,2000):
        try:
            if not query1.isActive:
                print('Query not active')
                break
            print('Showing live view refreshed every 5 seconds')
            print(f"Seconds passed: {x*5}")
            result1=spark.sql(f"SELECT * from {query1.name}")
            result2=spark.sql(f"SELECT * from {query2.name}")
            print(len(result1.toPandas()))
            display(result1.toPandas())
            display(result2.toPandas())
            sleep(2)
            clear_output(wait=True)
        except KeyboardInterrupt:
            print('break')
            break
    print('Live view ended...')
else:
    print("Not streaming")