# **Mount Google Drive**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# **Import Library**

In [None]:
# import os

# # Install java
# ! apt-get update -qq
# ! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null

# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
# ! java -version

In [None]:
!wget http://setup.johnsnowlabs.com/colab.sh -O - | bash

In [None]:
!pip install emoji

In [None]:
import emoji
import pandas as pd
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as f
from pyspark.sql.functions import col, when
from IPython.display import display, clear_output
from pyspark.ml import PipelineModel
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Spark NLP
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *

In [None]:
import sparknlp
# SETTINGS
IN_PATH = "/content/drive/MyDrive/TwitterData/tw*.json"
# spark = SparkSession.builder.appName("NLP").getOrCreate()
spark = sparknlp.start(gpu=False)
timestampformat = "EEE MMM dd HH:mm:ss zzzz yyyy" 
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [None]:
schema = spark.read.json("/content/drive/MyDrive/TwitterData/tw1.json").limit(10).schema
spark_reader = spark.readStream.schema(schema)

# **Preprocess Streaming Data**

In [None]:
def emoji2text(text):
    return emoji.demojize(text, delimiters=(" ", " "))
udf_emoji2text = udf(emoji2text,StringType())

In [None]:
user_regex = r"(@\w{1,15})"
hashtag_replace_regex = "#(\w{1,})"
url_regex = r"(https?:\/\/\S+|www\.\S+)"
email_regex = r"[\w.-]+@[\w.-]+.[a-zA-Z]{1,}"
# RT_regex = r"RT"
# dot_regex = r"(\.){2,}"

def cleaning_process(data):
            # Loại bỏ @Mention khỏi text
    data=(data.withColumn("text",f.regexp_replace(f.col("text"), user_regex, "")) 
            # Loại bỏ dấu # khỏi Hashtag khỏi text
            .withColumn("text",f.regexp_replace(f.col("text"), hashtag_replace_regex, "$1"))
            # Loại bỏ URL khỏi text
            .withColumn("text",f.regexp_replace(f.col("text"), url_regex, "")) 
            # Loại bỏ Email khỏi text
            .withColumn("text",f.regexp_replace(f.col("text"), email_regex, ""))
            # # Loại bỏ RT khỏi text
            # .withColumn("text",f.regexp_replace(f.col("text"), RT_regex, ""))
            # Emoji 2 text
            .withColumn('text', udf_emoji2text(f.col('text')))
            # Loại bỏ số cũng như các ký tự khỏi đoạn text
            .withColumn("text",f.regexp_replace(f.col("text"), "[^a-zA-Z]", " "))
            # Loại bỏ các khoảng trắng thừa trong câu
            .withColumn("text",f.regexp_replace(f.col("text"), " +", " "))
            # Loại vỏ các khoảng trắng đầu và cuối câu
            .withColumn("text",f.trim(f.col("text")))
            # Chuẩn hoá viết thường
            .withColumn("text",f.lower(f.col("text")))
            # Giữ lại các dòng mà đoạn text có nội dung 
            .filter(f.col("text") != ""))
    return data

In [None]:
# Twitter
streaming_data_raw = (spark_reader.json(IN_PATH).select(
                                                      f.to_timestamp(f.col("created_at"), timestampformat).alias("timestamp"),
                                                      f.col("user.screen_name").alias("user"),f.col("full_text").alias("text"),"Sinopharm","Pfizer","Sinovac","Moderna","AstraZeneca","Covaxin","Sputnik").coalesce(1)).dropDuplicates()

In [None]:
streaming_data_clean = cleaning_process(streaming_data_raw)

# **Use ours pre-trained model to sentiment text**

In [None]:
!wget https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/models/labse_xx_2.6.0_2.4_1600858075633.zip

--2021-07-25 11:57:25--  https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/models/labse_xx_2.6.0_2.4_1600858075633.zip
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.217.161.64
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.217.161.64|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1772022983 (1.6G) [application/zip]
Saving to: ‘labse_xx_2.6.0_2.4_1600858075633.zip’


2021-07-25 11:59:14 (15.5 MB/s) - ‘labse_xx_2.6.0_2.4_1600858075633.zip’ saved [1772022983/1772022983]



In [None]:
!unzip /content/labse_xx_2.6.0_2.4_1600858075633.zip -d /content/pretrain_sen_emb

Archive:  /content/labse_xx_2.6.0_2.4_1600858075633.zip
  inflating: /content/pretrain_sen_emb/bert_sentence_tensorflow  
  inflating: /content/pretrain_sen_emb/fields/vocabulary/part-00010  
  inflating: /content/pretrain_sen_emb/fields/vocabulary/part-00011  
  inflating: /content/pretrain_sen_emb/fields/vocabulary/.part-00006.crc  
  inflating: /content/pretrain_sen_emb/fields/vocabulary/._SUCCESS.crc  
  inflating: /content/pretrain_sen_emb/fields/vocabulary/.part-00007.crc  
  inflating: /content/pretrain_sen_emb/fields/vocabulary/.part-00011.crc  
  inflating: /content/pretrain_sen_emb/fields/vocabulary/.part-00005.crc  
  inflating: /content/pretrain_sen_emb/fields/vocabulary/.part-00004.crc  
  inflating: /content/pretrain_sen_emb/fields/vocabulary/.part-00010.crc  
  inflating: /content/pretrain_sen_emb/fields/vocabulary/.part-00000.crc  
  inflating: /content/pretrain_sen_emb/fields/vocabulary/.part-00001.crc  
  inflating: /content/pretrain_sen_emb/fields/vocabulary/part-000

In [None]:
# Define Spark NLP pipleline
documentAssembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

embeddings = BertSentenceEmbeddings.load("/content/pretrain_sen_emb").setInputCols(["document"]).setOutputCol("sentence_embeddings")


classifierdl = ClassifierDLModel().load("/content/drive/MyDrive/bert_labse_model").setInputCols(["sentence_embeddings"]).setOutputCol("prediction")

nlpPipeline = Pipeline(
  stages = [documentAssembler,
            embeddings,
            classifierdl])

In [None]:
empty_df = spark.createDataFrame([['']]).toDF("text")

pipelineModel = nlpPipeline.fit(empty_df)

sentiment_result = pipelineModel.transform(streaming_data_clean)

result = sentiment_result.withColumn("predicted_at", f.unix_timestamp()).withColumn("created_at", f.unix_timestamp("timestamp"))\
                          .withColumn("latency", col("predicted_at") - col("created_at"))

result = result.select("timestamp", "created_at", "predicted_at", "user", "text", "latency", f.explode(f.arrays_zip('document.result', 'prediction.result')).alias("cols"),
                       "Pfizer", "Sinopharm", "Sinovac", "Moderna", "AstraZeneca", "Covaxin", "Sputnik") \
                .withColumn("sentiment", f.expr("cols['1']"))\
                .withColumn('sentiment',when(col('sentiment')==0.0, 'Negative').when(col('sentiment')==1.0, 'Neutral').otherwise('Positive'))

result_count = result.groupBy("sentiment").agg(f.count("sentiment").alias("count"))

In [None]:
result_latency = result.filter(result.latency < 100).agg(f.avg(col("latency")).alias("Avg latency"))

In [None]:
Pfizer = result.filter(result.Pfizer == 1).groupBy("sentiment").agg(f.count("sentiment").alias("count"))
Sinopharm = result.filter(result.Sinopharm == 1).groupBy("sentiment").agg(f.count("sentiment").alias("count"))
Sinovac = result.filter(result.Sinovac == 1).groupBy("sentiment").agg(f.count("sentiment").alias("count"))
Moderna = result.filter(result.Moderna == 1).groupBy("sentiment").agg(f.count("sentiment").alias("count"))
AstraZeneca = result.filter(result.AstraZeneca == 1).groupBy("sentiment").agg(f.count("sentiment").alias("count"))
Covaxin = result.filter(result.Covaxin == 1).groupBy("sentiment").agg(f.count("sentiment").alias("count"))
Sputnik = result.filter(result.Sputnik == 1).groupBy("sentiment").agg(f.count("sentiment").alias("count"))

# **Write streaming data**

In [None]:
stream_writer = (result.select("timestamp", "created_at", "predicted_at", "user", "text", "sentiment", "latency")\
                 .writeStream.queryName("sentiment5").trigger(processingTime="5 seconds").outputMode("append").format("memory"))
query = stream_writer.start()

In [None]:
stream_writer_count = (result_count.writeStream.queryName("sentiment_count").trigger(processingTime="5 seconds").outputMode("complete").format("memory"))
query_count = stream_writer_count.start()

In [None]:
stream_latency = (result_latency.writeStream.queryName("sentiment_latency").trigger(processingTime="5 seconds").outputMode("complete").format("memory"))
query_latency = stream_latency.start()

In [None]:
stream_Pfizer = (Pfizer.writeStream.queryName("Pfizer11111").trigger(processingTime="5 seconds").outputMode("complete").format("memory"))
query_Pfizer = stream_Pfizer.start()

stream_Sinopharm = (Sinopharm.writeStream.queryName("Sinopharm1").trigger(processingTime="5 seconds").outputMode("complete").format("memory"))
query_Sinopharm = stream_Sinopharm.start()

stream_Sinovac = (Sinovac.writeStream.queryName("Sinovac11").trigger(processingTime="5 seconds").outputMode("complete").format("memory"))
query_Sinovac = stream_Sinovac.start()

stream_Moderna = (Moderna.writeStream.queryName("Moderna1").trigger(processingTime="5 seconds").outputMode("complete").format("memory"))
query_Moderna = stream_Moderna.start()

stream_AstraZeneca = (AstraZeneca.writeStream.queryName("AstraZeneca1").trigger(processingTime="5 seconds").outputMode("complete").format("memory"))
query_AstraZeneca = stream_AstraZeneca.start()

stream_Covaxin = (Covaxin.writeStream.queryName("Covaxin11").trigger(processingTime="5 seconds").outputMode("complete").format("memory"))
query_Covaxin = stream_Covaxin.start()

stream_Sputnik = (Sputnik.writeStream.queryName("Sputnik11").trigger(processingTime="5 seconds").outputMode("complete").format("memory"))
query_Sputnik = stream_Sputnik.start()

In [None]:
if streaming_data_raw.isStreaming:
    from time import sleep
    for x in range(0,1000):
        try:
            if not query.isActive:
                print("Query not active")
                break
            print("Showing live new refreshed every 5 seconds")
            print(f"Second passed: {x*5}")

            result = spark.sql(f"SELECT * from {query.name}")
            result_count = spark.sql(f"SELECT * from {query_count.name}")
            result_latency = spark.sql(f"SELECT * from {query_latency.name}")
            Pfizer = spark.sql(f"SELECT * from {query_Pfizer.name}")
            Sinopharm = spark.sql(f"SELECT * from {query_Sinopharm.name}")
            Sinovac = spark.sql(f"SELECT * from {query_Sinovac.name}")
            Moderna = spark.sql(f"SELECT * from {query_Moderna.name}")
            AstraZeneca = spark.sql(f"SELECT * from {query_AstraZeneca.name}")
            Covaxin = spark.sql(f"SELECT * from {query_Covaxin.name}")
            Sputnik = spark.sql(f"SELECT * from {query_Sputnik.name}")
            
            result = result.toPandas().sort_values(by='timestamp', ascending=False)
            # result = result.toPandas()
            display(result)
            print("\n")

            result_count = result_count.toPandas()
            result_count.rename(columns={"sentiment": "All vaccine"}, inplace=True)
            result_count.sort_values(by='count', ascending=False, inplace=True)
            display(result_count)
            print("\n")
            
            display(result_latency.toPandas())
            print("\n")
            
            Pfizer = Pfizer.toPandas() 
            Pfizer['percentage'] = Pfizer['count']/Pfizer['count'].sum()
            Pfizer.rename(columns={"sentiment": "Pfizer"}, inplace=True)
            Pfizer.sort_values(by='count', ascending=False, inplace=True)
            display(Pfizer)
            print("\n")

            Sinopharm = Sinopharm.toPandas() 
            Sinopharm['percentage'] = Sinopharm['count']/Sinopharm['count'].sum()
            Sinopharm.rename(columns={"sentiment": "Sinopharm"}, inplace=True)
            Sinopharm.sort_values(by='count', ascending=False, inplace=True)
            display(Sinopharm)
            print("\n")

            Sinovac = Sinovac.toPandas() 
            Sinovac['percentage'] = Sinovac['count']/Sinovac['count'].sum()
            Sinovac.rename(columns={"sentiment": "Sinovac"}, inplace=True) 
            Sinovac.sort_values(by='count', ascending=False, inplace=True)
            display(Sinovac)
            print("\n")

            Moderna = Moderna.toPandas() 
            Moderna['percentage'] = Moderna['count']/Moderna['count'].sum()
            Moderna.rename(columns={"sentiment": "Moderna"}, inplace=True) 
            Moderna.sort_values(by='count', ascending=False, inplace=True)
            display(Moderna)
            print("\n")

            AstraZeneca = AstraZeneca.toPandas() 
            AstraZeneca['percentage'] = AstraZeneca['count']/AstraZeneca['count'].sum()
            AstraZeneca.rename(columns={"sentiment": "AstraZeneca"}, inplace=True) 
            AstraZeneca.sort_values(by='count', ascending=False, inplace=True)
            display(AstraZeneca)
            print("\n")

            Covaxin = Covaxin.toPandas() 
            Covaxin['percentage'] = Covaxin['count']/Covaxin['count'].sum()
            Covaxin.rename(columns={"sentiment": "Covaxin"}, inplace=True) 
            Covaxin.sort_values(by='count', ascending=False, inplace=True)
            display(Covaxin)
            print("\n")

            Sputnik = Sputnik.toPandas() 
            Sputnik['percentage'] = Sputnik['count']/Sputnik['count'].sum()
            Sputnik.rename(columns={"sentiment": "SputnikV"}, inplace=True) 
            Sputnik.sort_values(by='count', ascending=False, inplace=True)
            display(Sputnik)
            print("\n")

            Count = [Pfizer['count'].sum(), Sinopharm['count'].sum(), Sinovac['count'].sum(), Moderna['count'].sum(), AstraZeneca['count'].sum(), Covaxin['count'].sum(), Sputnik['count'].sum()]
            Vaccine = ['Pfizer', 'Sinopharm', 'Sinovac', 'Moderna', 'AstraZeneca', 'Covaxin', 'SputnikV']
            VC = pd.DataFrame()
            VC['Vaccine'] = Vaccine
            VC['Count'] = Count
            display(VC.sort_values(by=['Count'],ascending=False))
            print("\n")

            Pfizer1 = Pfizer
            Pfizer1.rename(columns={"Pfizer": "sentiment"}, inplace=True)
            Pfizer1['Vaccine'] = "Pfizer"
            Pfizer1.set_index(['Vaccine'], inplace=True)
            Sinopharm1 = Sinopharm
            Sinopharm1.rename(columns={"Sinopharm": "sentiment"}, inplace=True)
            Sinopharm1['Vaccine'] = "Sinopharm"
            Sinopharm1.set_index(['Vaccine'], inplace=True)
            Sinovac1 = Sinovac
            Sinovac1.rename(columns={"Sinovac": "sentiment"}, inplace=True)
            Sinovac1['Vaccine'] = "Sinovac"
            Sinovac1.set_index(['Vaccine'], inplace=True)
            Moderna1 = Moderna
            Moderna1.rename(columns={"Moderna": "sentiment"}, inplace=True)
            Moderna1['Vaccine'] = "Moderna"
            Moderna1.set_index(['Vaccine'], inplace=True)
            AstraZeneca1 = AstraZeneca
            AstraZeneca1.rename(columns={"AstraZeneca": "sentiment"}, inplace=True)
            AstraZeneca1['Vaccine'] = "AstraZeneca"
            AstraZeneca1.set_index(['Vaccine'], inplace=True)
            Covaxin1 = Covaxin
            Covaxin1.rename(columns={"Covaxin": "sentiment"}, inplace=True)
            Covaxin1['Vaccine'] = "Covaxin"
            Covaxin1.set_index(['Vaccine'], inplace=True)
            Sputnik1 = Sputnik
            Sputnik1.rename(columns={"SputnikV": "sentiment"}, inplace=True)
            Sputnik1['Vaccine'] = "SputnikV"
            Sputnik1.set_index(['Vaccine'], inplace=True)

            df = pd.concat([Pfizer1, Sinopharm1, Sinovac1, Moderna1, AstraZeneca1, Covaxin1, Sputnik1])
            temp2 = df.copy().pivot_table(index = "Vaccine",columns='sentiment',values="percentage")
            temp2['total'] = df.copy().groupby(['Vaccine']).sum()['count']
            display(temp2.sort_values(by=['total'],ascending=False))
                     

            sleep(5000)
            clear_output(wait=True)
        except KeyboardInterrupt:
            print("break")
            break
    print("Live view ended...")
else:
    print("Not streaming, showing static output instead")

Showing live new refreshed every 5 seconds
Second passed: 0


Unnamed: 0,timestamp,created_at,predicted_at,user,text,sentiment,latency
832,2021-07-25 17:15:40,1627233340,1627233407,theRealJZsag,in the final analysis and considering that lif...,Neutral,67
844,2021-07-25 17:15:39,1627233339,1627233407,Pearson_Warren,spot on jill instead of accepting their first ...,Neutral,68
809,2021-07-25 17:15:36,1627233336,1627233407,Chuppacadabra,for decades vaccines contained a dead or weake...,Neutral,71
857,2021-07-25 17:15:34,1627233334,1627233407,SkyNetPlatform,japan s shionogi has started human trials for ...,Neutral,73
879,2021-07-25 17:15:34,1627233334,1627233407,mammutly,the pfizer vaccine is about effective at preve...,Positive,73
...,...,...,...,...,...,...,...
3,2021-07-25 16:29:09,1627230549,1627230585,Jennatoolz1,very obvious from the picture sire look at all...,Positive,36
2,2021-07-25 16:29:05,1627230545,1627230585,peggymel2001,the found pfizer doses are the stock of nd dos...,Neutral,40
1,2021-07-25 16:29:04,1627230544,1627230585,kary_cee,am in australia the vast majority do want to g...,Neutral,41
5,2021-07-25 16:29:02,1627230542,1627230585,Brian7ins,oh look like we didn t already know,Neutral,43






Unnamed: 0,All vaccine,count
0,Neutral,530
1,Positive,262
2,Negative,96






Unnamed: 0,Avg latency
0,63.497976






Unnamed: 0,Pfizer,count,percentage
0,Neutral,261,0.564935
1,Positive,147,0.318182
2,Negative,54,0.116883






Unnamed: 0,Sinopharm,count,percentage
0,Neutral,10,0.833333
1,Positive,2,0.166667






Unnamed: 0,Sinovac,count,percentage
0,Neutral,16,0.842105
1,Positive,3,0.157895






Unnamed: 0,Moderna,count,percentage
0,Neutral,104,0.590909
1,Positive,61,0.346591
2,Negative,11,0.0625






Unnamed: 0,AstraZeneca,count,percentage
0,Neutral,43,0.811321
1,Positive,7,0.132075
2,Negative,3,0.056604






Unnamed: 0,Covaxin,count,percentage
0,Neutral,63,0.913043
1,Positive,5,0.072464
2,Negative,1,0.014493






Unnamed: 0,SputnikV,count,percentage
1,Positive,45,0.703125
0,Neutral,17,0.265625
2,Negative,2,0.03125






Unnamed: 0,Vaccine,Count
0,Pfizer,462
3,Moderna,176
5,Covaxin,69
6,SputnikV,64
4,AstraZeneca,53
2,Sinovac,19
1,Sinopharm,12






sentiment,Negative,Neutral,Positive,total
Vaccine,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
Pfizer,0.116883,0.564935,0.318182,462
Moderna,0.0625,0.590909,0.346591,176
Covaxin,0.014493,0.913043,0.072464,69
SputnikV,0.03125,0.265625,0.703125,64
AstraZeneca,0.056604,0.811321,0.132075,53
Sinovac,,0.842105,0.157895,19
Sinopharm,,0.833333,0.166667,12


break
Live view ended...
