# Spark Streaming

## Import th∆∞ vi·ªán v√† packages cho Spark

---


In [1]:
import os 
packages = "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1"

os.environ["PYSPARK_PYTHON"] = '/usr/bin/python3'
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages {0} pyspark-shell".format(packages)
)
from pyspark.sql.functions import *
import json
import sys
import re

from pyspark.sql.types import *
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession


from textblob import TextBlob


spark = SparkSession.builder.master('spark://spark-master:7077').config('spark.cores.max','1').config("spark.executor.memory", "1g").getOrCreate()

## Structured Streaming t·ª´ Kafka

---

X√¢y d·ª±ng 2 streaming DataFrame t·ª´ Kafka source t∆∞∆°ng ·ª©ng v·ªõi 2 topic ƒë∆∞·ª£c subscribe l√† **Trump** v√† **Biden**.



In [2]:
trumpDF = spark.readStream.format("kafka")\
.option("kafka.bootstrap.servers", "kafka:9092")\
.option("subscribe", "Trump")\
.option('failOnDataLoss', 'false') \
.load()


bidenDF = spark.readStream.format("kafka")\
.option("kafka.bootstrap.servers", "kafka:9092")\
.option("subscribe", "Biden")\
.option('failOnDataLoss', 'false') \
.load()

Clean d·ªØ li·ªáu v·ªõi th∆∞ vi·ªán [tweet-preprocessor](https://github.com/s/preprocessor)
- Thay th·∫ø c√°c k√≠ t·ª± HTML Entities (n·∫øu c√≥) b·∫±ng c√°c k√≠ t·ª± th√¥ng th∆∞·ªùng
- Chuy·ªÉn hashtag th√†nh 1 t·ª´ trong tweet
- Lo·∫°i b·ªè emoji, URL v√† user tag c√≥ trong tweet

In [3]:
import preprocessor as p
p.set_options(p.OPT.URL, p.OPT.EMOJI)

def preprocess(text):
    tweet_text = re.sub('@[\w]+','',text)
    tweet_text = (tweet_text.replace('&amp;', '&').replace('&lt;', '<')\
                  .replace('&gt;', '>').replace('&quot;', '"')\
                  .replace('&#39;', "'").replace(';', " ")\
                  .replace(r'\u', " "))
    tweet_text = tweet_text.replace("#", "").replace("_", " ")
    tweet_text = p.clean(tweet_text)
    
    return tweet_text


In [4]:
print(preprocess('Preprocessor is #awesome üëç https://github.com/s/preprocessor @username @hello'))

Preprocessor is awesome


---
Ti·∫øp theo, s·ª≠ d·ª•ng th∆∞ vi·ªán **TextBlob** ƒë·ªÉ ph·ª•c v·ª• cho vi·ªác ph√¢n lo·∫°i tweet v·ªÅ Negative, Neutral v√† Positive.

TextBlob l√† m·ªôt th∆∞ vi·ªán v·ªÅ NLP v·ªõi nhi·ªÅu ch·ª©c nƒÉng nh∆∞: part-of-speech tagging, noun phrase extraction, sentiment analysis, classification, translation,.... ·ªû trong b√†i to√°n n√†y, ch√∫ng ta s·∫Ω t·∫≠p trung s·ª≠ d·ª•ng ch·ª©c nƒÉng sentiment analysis c·ªßa TextBlob.

Ph∆∞∆°ng th·ª©c **sentiment** trong **TextBlob** tr·∫£ v·ªÅ  1 namedtuple c√≥ d·∫°ng `Sentiment(polarity, subjectivity)`. Polarity c√≥ kho·∫£ng gi√° tr·ªã t·ª´ [-1.0, 1.0], c√≤n subjectivity n·∫±m trong kho·∫£ng [0.0, 1.0] v·ªõi 1.0 l√† r·∫•t ch·ªß quan c√≤n 0.0 l√† r·∫•t kh√°ch quan.

ƒê·ªÉ th·ª±c hi·ªán ph√¢n lo·∫°i tweet, ta chia polarity l√†m 3 kho·∫£ng:
- [-1.0, -0.1] l√† **Negative**
- [-0.1, 0.1] l√† **Neutral**
- [0.1, 1.0] l√† **Positive**

In [5]:
def predict_sentiment(tweet_text):
    tweet = TextBlob(tweet_text)
    if tweet.sentiment.polarity > 0.1:
        return "Positive"
    elif tweet.sentiment.polarity < -0.1:
        return "Negative"
    else:
        return "Neutral"

In [6]:
predict_sentiment("Trump is a good president")

'Positive'

---
ƒê·ªãnh nghƒ©a schema cho d·ªØ li·ªáu ƒë∆∞·ª£c stream t·ª´ Kafka:
- **time**: Th·ªùi gian t·∫°o tweet
- **text**: N·ªôi dung tweet
- **retweet_count**: S·ªë l·∫ßn tweet ƒë∆∞·ª£c retweet
- **favorite_count**: S·ªë l∆∞·ª£t like c·ªßa tweet
- **user_id**: ID c·ªßa ng∆∞·ªùi t·∫°o tweet
- **location**: V·ªã tr√≠ ƒë·ªãa l√≠ c·ªßa ng∆∞·ªùi t·∫°o tweet
- **place**: V·ªã tr√≠ ƒë·ªãa l√≠ khi tweet ƒë∆∞·ª£c t·∫°o (n·∫øu ng∆∞·ªùi d√πng b·∫≠t ƒë·ªãnh v·ªã)
- **user_followers_count**: S·ªë l∆∞·ª£ng follower c·ªßa ng∆∞·ªùi t·∫°o tweet

In [7]:
schema = StructType([   
        StructField("time", StringType(), True),
        StructField("text", StringType(), True),
        StructField("retweet_count", DoubleType(), True),
        StructField("location", StringType(), True),
        StructField("favorite_count", DoubleType(), True),
        StructField("user_id", StringType(), True),
        StructField("place", StringType(), True),
        StructField("user_followers_count", StringType(), True),
    
])

---
X√¢y d·ª±ng pipeline x·ª≠ l√≠ d·ªØ li·ªáu

ƒê∆∞a d·ªØ li·ªáu v·ªÅ schema ƒë√£ ƒë∆∞·ª£c ƒë·ªãnh nghƒ©a => Clean d·ªØ li·ªáu text => Ph√¢n lo·∫°i text

In [8]:
def castData(schema, df):
    df = df.selectExpr("CAST(value AS STRING)")
    df = df.select(from_json(col("value"), schema).alias("data")).select("data.*")
    pre_udf = udf(preprocess, StringType())
    df = df.withColumn('text', pre_udf(col('text')))
    one_row_udf = udf(predict_sentiment, StringType())
    df = df.withColumn('sentiment', one_row_udf(col('text')))
    
    return df

In [9]:
trumpDF = castData(schema, trumpDF)
bidenDF = castData(schema, bidenDF)

---
K·∫øt qu·∫£ sau khi √°p d·ª•ng pipeline tr√™n

In [10]:
query = trumpDF.writeStream.queryName("trump").format("memory")\
    .start()

In [24]:
spark.sql('SELECT * FROM trump').show()

+--------------------+--------------------+-------------+--------------------+--------------+-------------------+-----+--------------------+---------+
|                time|                text|retweet_count|            location|favorite_count|            user_id|place|user_followers_count|sentiment|
+--------------------+--------------------+-------------+--------------------+--------------+-------------------+-----+--------------------+---------+
|Wed Dec 30 09:42:...|GEORGIANS: Loeffl...|          0.0|       United States|           0.0|1241202797296504800| null|                  34|  Neutral|
|Wed Dec 30 09:42:...|Trump WINS $2000 ...|          0.0|        Florida, USA|           0.0|          814121478| null|               31281| Positive|
|Wed Dec 30 09:42:...|I agree. Still, w...|          0.0|         Boulder, co|           0.0|           14742275| null|                 463| Positive|
|Wed Dec 30 09:42:...|Trump Iran Threat...|          0.0|                null|           0.0|1

---
## L∆∞u tr·ªØ d·ªØ li·ªáu sau khi x·ª≠ l√≠

Cu·ªëi c√πng, th·ª±c hi·ªán ghi d·ªØ li·ªáu l√™n Hadoop v·ªõi format l√† file CSV

In [12]:
trumpDF.writeStream.trigger(processingTime='5 seconds').queryName("trump_tweets")\
.format("csv").outputMode("append")\
.option("checkpointLocation", "hdfs://namenode:9000/checkpoints_Trump")\
.option('path', 'hdfs://namenode:9000/data/trump.csv').start()


bidenDF.writeStream.trigger(processingTime='5 seconds').queryName("biden_tweets")\
.format("csv").outputMode("append").option("checkpointLocation", "hdfs://namenode:9000/checkpoints_Biden")\
.option('path', 'hdfs://namenode:9000/data/biden.csv').start()

<pyspark.sql.streaming.StreamingQuery at 0x7f7dcc6ae7d0>