# Spark Streaming

## Import thư viện và packages cho Spark

---


In [1]:
import os 
packages = "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1"

os.environ["PYSPARK_PYTHON"] = '/usr/bin/python3'
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages {0} pyspark-shell".format(packages)
)
from pyspark.sql.functions import *
import json
import sys
import re

from pyspark.sql.types import *
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession


from textblob import TextBlob


spark = SparkSession.builder.master('spark://spark-master:7077').config('spark.cores.max','1').config("spark.executor.memory", "1g").getOrCreate()

## Structured Streaming từ Kafka

---

Xây dựng 2 streaming DataFrame từ Kafka source tương ứng với 2 topic được subscribe là **Trump** và **Biden**.



In [2]:
trumpDF = spark.readStream.format("kafka")\
.option("kafka.bootstrap.servers", "kafka:9092")\
.option("subscribe", "Trump")\
.option('failOnDataLoss', 'false') \
.load()


bidenDF = spark.readStream.format("kafka")\
.option("kafka.bootstrap.servers", "kafka:9092")\
.option("subscribe", "Biden")\
.option('failOnDataLoss', 'false') \
.load()

Clean dữ liệu với thư viện [tweet-preprocessor](https://github.com/s/preprocessor)
- Thay thế các kí tự HTML Entities (nếu có) bằng các kí tự thông thường
- Chuyển hashtag thành 1 từ trong tweet
- Loại bỏ emoji, URL và user tag có trong tweet

In [3]:
import preprocessor as p
p.set_options(p.OPT.URL, p.OPT.EMOJI)

def preprocess(text):
    tweet_text = re.sub('@[\w]+','',text)
    tweet_text = (tweet_text.replace('&amp;', '&').replace('&lt;', '<')\
                  .replace('&gt;', '>').replace('&quot;', '"')\
                  .replace('&#39;', "'").replace(';', " ")\
                  .replace(r'\u', " "))
    tweet_text = tweet_text.replace("#", "").replace("_", " ")
    tweet_text = p.clean(tweet_text)
    
    return tweet_text


In [4]:
print(preprocess('Preprocessor is #awesome 👍 https://github.com/s/preprocessor @username @hello'))

Preprocessor is awesome


---
Tiếp theo, sử dụng thư viện **TextBlob** để phục vụ cho việc phân loại tweet về Negative, Neutral và Positive.

TextBlob là một thư viện về NLP với nhiều chức năng như: part-of-speech tagging, noun phrase extraction, sentiment analysis, classification, translation,.... Ở trong bài toán này, chúng ta sẽ tập trung sử dụng chức năng sentiment analysis của TextBlob.

Phương thức **sentiment** trong **TextBlob** trả về  1 namedtuple có dạng `Sentiment(polarity, subjectivity)`. Polarity có khoảng giá trị từ [-1.0, 1.0], còn subjectivity nằm trong khoảng [0.0, 1.0] với 1.0 là rất chủ quan còn 0.0 là rất khách quan.

Để thực hiện phân loại tweet, ta chia polarity làm 3 khoảng:
- [-1.0, -0.1] là **Negative**
- [-0.1, 0.1] là **Neutral**
- [0.1, 1.0] là **Positive**

In [5]:
def predict_sentiment(tweet_text):
    tweet = TextBlob(tweet_text)
    if tweet.sentiment.polarity > 0.1:
        return "Positive"
    elif tweet.sentiment.polarity < -0.1:
        return "Negative"
    else:
        return "Neutral"

In [6]:
predict_sentiment("Trump is a good president")

'Positive'

---
Định nghĩa schema cho dữ liệu được stream từ Kafka:
- **time**: Thời gian tạo tweet
- **text**: Nội dung tweet
- **retweet_count**: Số lần tweet được retweet
- **favorite_count**: Số lượt like của tweet
- **user_id**: ID của người tạo tweet
- **location**: Vị trí địa lí của người tạo tweet
- **place**: Vị trí địa lí khi tweet được tạo (nếu người dùng bật định vị)
- **user_followers_count**: Số lượng follower của người tạo tweet

In [7]:
schema = StructType([   
        StructField("time", StringType(), True),
        StructField("text", StringType(), True),
        StructField("retweet_count", DoubleType(), True),
        StructField("location", StringType(), True),
        StructField("favorite_count", DoubleType(), True),
        StructField("user_id", StringType(), True),
        StructField("place", StringType(), True),
        StructField("user_followers_count", StringType(), True),
    
])

---
Xây dựng pipeline xử lí dữ liệu

Đưa dữ liệu về schema đã được định nghĩa => Clean dữ liệu text => Phân loại text

In [8]:
def castData(schema, df):
    df = df.selectExpr("CAST(value AS STRING)")
    df = df.select(from_json(col("value"), schema).alias("data")).select("data.*")
    pre_udf = udf(preprocess, StringType())
    df = df.withColumn('text', pre_udf(col('text')))
    one_row_udf = udf(predict_sentiment, StringType())
    df = df.withColumn('sentiment', one_row_udf(col('text')))
    
    return df

In [9]:
trumpDF = castData(schema, trumpDF)
bidenDF = castData(schema, bidenDF)

---
Kết quả sau khi áp dụng pipeline trên

In [18]:
query = trumpDF.writeStream.queryName("trump").format("memory")\
    .start()

In [21]:
spark.sql('SELECT * FROM trump').show()

+--------------------+--------------------+-------------+--------------------+--------------+-------------------+-----+--------------------+---------+
|                time|                text|retweet_count|            location|favorite_count|            user_id|place|user_followers_count|sentiment|
+--------------------+--------------------+-------------+--------------------+--------------+-------------------+-----+--------------------+---------+
|Wed Dec 30 05:32:...|He will betray Trump|          0.0|                null|           0.0|1072693591414333400| null|                 117|  Neutral|
|Wed Dec 30 05:32:...|This is CRAZY! Ju...|          0.0|                null|           0.0|1344096097166389200| null|                   0| Negative|
|Wed Dec 30 05:32:...|Doug bro, you are...|          0.0|                null|           0.0|1266941931856236500| null|                  14|  Neutral|
|Wed Dec 30 05:32:...|You wont believe ...|          0.0|                null|           0.0|1

---
## Lưu trữ dữ liệu sau khi xử lí

Cuối cùng, thực hiện ghi dữ liệu lên Hadoop với format là file CSV

In [10]:
trumpDF.writeStream.trigger(processingTime='5 seconds').queryName("trump_tweets")\
.format("csv").outputMode("append")\
.option("checkpointLocation", "hdfs://namenode:9000/checkpoints_Trump")\
.option('path', 'hdfs://namenode:9000/data/trump.csv').start()


bidenDF.writeStream.trigger(processingTime='5 seconds').queryName("biden_tweets")\
.format("csv").outputMode("append").option("checkpointLocation", "hdfs://namenode:9000/checkpoints_Biden")\
.option('path', 'hdfs://namenode:9000/data/biden.csv').start()

<pyspark.sql.streaming.StreamingQuery at 0x7f37a44224d0>