<a href="https://colab.research.google.com/github/zw2497/Twitter_Stream_Processing/blob/master/TopkAppJupyter.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# In colab, please uncomment these commands

This installs Apache Spark 2.4.0, Java 8, and Findspark, a library that makes it easy for Python to find Spark.

In [0]:
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://www-us.apache.org/dist/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz
# !tar xf spark-2.4.2-bin-hadoop2.7.tgz
# !pip -q install findspark

# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "./spark-2.4.2-bin-hadoop2.7"

In [0]:
# !pip install -U textblob
# !python -m textblob.download_corpora

# Tweeter Topk 
Top k popular hashtags in two different windows, post sentiment analysis, trend detection

In [0]:
from pyspark.sql.functions import udf, get_json_object, explode, window
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, FloatType
from pyspark.sql.functions import pandas_udf, PandasUDFType

from textblob import TextBlob

from sklearn import linear_model

import pandas as pd
import numpy as np

import time
import os
os.environ["PYSPARK_SUBMIT_ARGS"] = "--master local --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.2 pyspark-shell"

In [0]:
spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()
spark.sparkContext.setLogLevel('FATAL')

In [0]:
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "35.243.144.79:9092") \
  .option("subscribe", "tweepyv1") \
  .option("startingOffsets", "latest") \
  .option("failOnDataLoss", "false") \
  .load()

df.createOrReplaceTempView("raw")
df = spark.sql("""select decode(value, 'utf-8') as value, timestamp 
                  from raw""");

In [0]:
@udf(FloatType())
def senti(x):
    blob = TextBlob(x)
    s = []
    for sentence in blob.sentences:
        s.append(sentence.sentiment.polarity)
    return sum(s)/len(s)

In [0]:
df = df.select('timestamp',\
               get_json_object('value', '$.entities.hashtags[0].text').alias("hashtag"), \
               senti(get_json_object('value', '$.text')).alias("sentiment"))
df = df.filter(df.hashtag.isNotNull())
df.createOrReplaceTempView("datas")

## Trend detection

In [0]:
dftrend = spark.sql("""
select distinct hashtag, count(*) as count_num, avg(sentiment) as sentiment, now() as timestamp
from datas
group by hashtag, window(timestamp, "120 seconds", "30 seconds")
""")

In [0]:
@pandas_udf("key string, value double", PandasUDFType.GROUPED_MAP)  # doctest: +SKIP
def trend_udf(key, pdf):
    reg = linear_model.LinearRegression()
    reg.fit(np.array(pd.to_datetime(pdf.timestamp).astype('int')).reshape(-1,1), np.array(pdf.count_num).reshape(-1,1))
    return pd.DataFrame([key + (reg.coef_[0][0],)])

dftrend = dftrend.groupby('hashtag').apply(trend_udf)

# Top k

In [0]:
dfslow = spark.sql("""
select distinct concat_ws(' ',hashtag, count(*), avg(sentiment)) as value, now() as key
from datas
group by hashtag, window(timestamp, "600 seconds", "120 seconds")
""")

In [0]:
dffast = spark.sql("""
select distinct concat_ws(' ',hashtag, count(*), avg(sentiment)) as value, now() as key
from datas
group by hashtag, window(timestamp, "30 seconds", "5 seconds")
""")

In [0]:
query = dftrend.writeStream.outputMode("complete").queryName("trend").format("memory").option("truncate", "False").start()
query1 = dfslow.writeStream.outputMode("complete").queryName("slow").format("memory").option("truncate", "False").start()
query2 = dffast.writeStream.outputMode("complete").queryName("fast").format("memory").option("truncate", "False").start()

In [0]:
print(query.status)
print(query1.status)
print(query2.status)

{'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}


In [0]:
res = spark.table("trend").toPandas()
res1 = spark.table("slow").toPandas()
res2 = spark.table("fast").toPandas()

In [0]:
res

Unnamed: 0,key,value
0,EndGame,0.0
1,KentuckyDerby,0.0
2,Germany,0.0
3,PrudentialCenterROCK,0.0
4,deznat,0.0
5,ATXWX,0.0
6,PeyTen,0.0
7,USGS303401097374700,0.0
8,LACED,0.0
9,7minutes,0.0


In [0]:
res1

Unnamed: 0,value,key
0,TedBundy 1 -0.03750000149011612,2019-05-04 02:26:00.852
1,SoundHound 1 0.0,2019-05-04 02:26:00.852
2,SSNCT 1 0.5,2019-05-04 02:26:00.852
3,foodphotography 1 0.0,2019-05-04 02:26:00.852
4,pomona 1 0.0,2019-05-04 02:26:00.852
5,BoomerSooner 1 0.0,2019-05-04 02:26:00.852
6,Houston 1 0.06818182021379471,2019-05-04 02:26:00.852
7,RealTime 1 0.10000000149011612,2019-05-04 02:26:00.852
8,7minutes 1 0.0,2019-05-04 02:26:00.852
9,goodNIGHTTwitterWorld 1 0.0,2019-05-04 02:26:00.852


In [0]:
res2

Unnamed: 0,value,key
0,USGS08178050 1 -0.10000000149011612,2019-05-04 02:25:15.238
1,goodNIGHTTwitterWorld 1 0.0,2019-05-04 02:25:15.238
2,USGS08104500 1 -0.1875,2019-05-04 02:25:15.238
3,Nanaimo 1 0.0,2019-05-04 02:25:15.238
4,KentuckyDerby 1 -0.13333334028720856,2019-05-04 02:25:15.238
5,USGS08018500 1 0.0,2019-05-04 02:25:15.238
6,ImpeachBarr 1 0.0,2019-05-04 02:25:15.238
7,LivePD 1 0.0,2019-05-04 02:25:15.238
8,AnuragBasu 1 0.0,2019-05-04 02:25:15.238
9,USGS08020000 1 0.0,2019-05-04 02:25:15.238


In [0]:
query = dftrend \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .outputMode("complete") \
  .option("kafka.bootstrap.servers", "35.243.144.79:9092") \
  .option("topic", "trend") \
  .option("checkpointLocation", "./logtrend") \
  .start()

query1 = dfslow \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .outputMode("update") \
  .option("kafka.bootstrap.servers", "35.243.144.79:9092") \
  .option("topic", "slow") \
  .option("checkpointLocation", "./logslow") \
  .start()

query2 = dffast \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .outputMode("update") \
  .option("kafka.bootstrap.servers", "35.243.144.79:9092") \
  .option("topic", "fast") \
  .option("checkpointLocation", "./logfast") \
  .start()

In [0]:
query.stop()
query1.stop()
query2.stop()