<a href="https://colab.research.google.com/github/zw2497/Twitter_Stream_Processing/blob/master/PySpark_Structured_Streaming.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Install Java, Spark, and Findspark
This installs Apache Spark 2.4.0, Java 8, and [Findspark](https://github.com/minrk/findspark), a library that makes it easy for Python to find Spark.

In [1]:
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://www-us.apache.org/dist/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz
# !tar xf spark-2.4.2-bin-hadoop2.7.tgz
# !pip -q install findspark

# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "./spark-2.4.2-bin-hadoop2.7"

#### Set Environment Variables
Set the locations where Spark and Java are installed.

In [1]:
from pyspark.sql.functions import udf, get_json_object
from pyspark.sql import SparkSession
from textblob import TextBlob
from pyspark.sql.types import IntegerType, FloatType
from pyspark.streaming import StreamingContext
import time
import os
os.environ["PYSPARK_SUBMIT_ARGS"] = "--master local --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.2 pyspark-shell"

In [2]:
spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()
spark.sparkContext.setLogLevel('FATAL')

In [3]:
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "35.243.144.79:9092") \
  .option("subscribe", "tweepyv1") \
  .option("startingOffsets", "latest") \
  .option("failOnDataLoss", "false") \
  .load()

In [4]:
df.createOrReplaceTempView("raw")

In [5]:
df = spark.sql("""select decode(value, 'utf-8') as value, timestamp 
                  from raw""");

In [6]:
def polarity(x):
    blob = TextBlob(x)
    s = []
    for sentence in blob.sentences:
        s.append(sentence.sentiment.polarity)
    return sum(s)/len(s)

sent = udf(polarity, FloatType())

In [7]:
df = df.select('timestamp',\
               get_json_object('value', '$.entities.hashtags[0].text').alias("hashtag"), \
               sent(get_json_object('value', '$.text')).alias("sentiment"))
df = df.filter(df.hashtag.isNotNull())
df.createOrReplaceTempView("datas")

In [8]:
df = spark.sql("""
select distinct hashtag, count(*) as count, avg(sentiment) as sentiment, window(timestamp, "600 seconds", "60 seconds") as key
from datas
group by hashtag, window(timestamp, "600 seconds", "60 seconds")
""")

In [9]:
query = df.writeStream.outputMode("complete").queryName("test").format("memory").option("truncate", "False").start()

In [10]:
query.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

In [13]:
res = spark.table("test").toPandas()

In [14]:
res.sort_values(by='count',ascending=False)

Unnamed: 0,hashtag,count,sentiment,key
612,nothingisordinary,3,0.100000,"(2019-05-03 05:04:00, 2019-05-03 05:14:00)"
4,nothingisordinary,3,0.100000,"(2019-05-03 05:08:00, 2019-05-03 05:18:00)"
1114,nothingisordinary,3,0.100000,"(2019-05-03 05:05:00, 2019-05-03 05:15:00)"
824,nothingisordinary,3,0.100000,"(2019-05-03 05:10:00, 2019-05-03 05:20:00)"
1492,nothingisordinary,3,0.100000,"(2019-05-03 05:09:00, 2019-05-03 05:19:00)"
1377,nothingisordinary,3,0.100000,"(2019-05-03 05:07:00, 2019-05-03 05:17:00)"
99,nothingisordinary,3,0.100000,"(2019-05-03 05:11:00, 2019-05-03 05:21:00)"
803,nothingisordinary,3,0.100000,"(2019-05-03 05:12:00, 2019-05-03 05:22:00)"
1077,nothingisordinary,3,0.100000,"(2019-05-03 05:06:00, 2019-05-03 05:16:00)"
1129,LTBU,2,0.000000,"(2019-05-03 05:05:00, 2019-05-03 05:15:00)"


In [15]:
query.stop()

# Start a SparkSession
This will start a local Spark session.

In [5]:
%%writefile ./app.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from textblob import TextBlob
from pyspark.sql.types import IntegerType


spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()
spark.sparkContext.setLogLevel('FATAL')

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "35.243.144.79:9092") \
  .option("subscribe", "tweepyv1") \
  .option("startingOffsets", "latest") \
  .option("failOnDataLoss", "false") \
  .load()

"""
UDF
"""
def polarity(x):
    blob = TextBlob(x)
    s = []
    for sentence in blob.sentences:
        s.append(sentence.sentiment.polarity)
    return sum(s)/len(s)

"""
type cast
"""
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
print("Streaming: ", df.isStreaming)
print("Start receiving ... ...")
df.createOrReplaceTempView("raw")

"""
decode
"""
df = spark.sql("""select decode(value, 'utf-8') as value, timestamp 
                  from raw""");
df.createGlobalTempView("decode_data")


"""
add sentiment
"""
dfsentiment = spark.sql("""
select polarity(get_json_object(value, '$.text')) as sentiment, value, timestamp
from decode_data
""");

dfsentiment.createGlobalTempView("addsentiment");


# """
# client Application
# """
# dfclient = spark.sql("""select get_json_object(value, '$.source') as key, count(*) as value
#                         from decode
#                         group by get_json_object(value, '$.source')
#                         order by value DESC""");

"""
get hashtag
"""
dfhashtag = spark.sql("""
select get_json_object(value, '$.entities.hashtags[0].text') as hashtag, timestamp, sentiment
from addsentiment 
where length(get_json_object(value, '$.entities.hashtags[0].text')) > 2
""");
dfhashtag.createOrReplaceTempView("tag");


"""
compute window
"""
# dffast = spark.sql("""
# select concat_ws(' ', hashtag, string(count(*)), avg(sentiment)) as value, now() as key
# from tag
# group by window(timestamp, "10 seconds", "5 seconds"), hashtag
# """)

dfslow = spark.sql("""
select concat_ws(' ', hashtag, string(count(*)), avg(sentiment)) as value, now() as key
from tag
group by window(timestamp, "600 seconds", "60 seconds"), hashtag
""")


"""
Write to Console
"""
query1 = dfslow \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", "False") \
    .option("checkpointLocation", "./logslow") \
    .start()

# query2 = dffast \
#     .writeStream \
#     .outputMode("update") \
#     .format("console") \
#     .option("truncate", "False") \
#     .option("checkpointLocation", "./logfast") \
#     .start()

# query3 = dfclient \
#     .writeStream \
#     .outputMode("complete") \
#     .format("console") \
#     .option("truncate", "False") \
#     .option("checkpointLocation", "./logclient") \
#     .start()

"""
Write to kafka
"""
# query1 = dffast \
#   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
#   .writeStream \
#   .format("kafka") \
#   .outputMode("update") \
#   .option("kafka.bootstrap.servers", "35.243.144.79:9092") \
#   .option("topic", "topk1") \
#   .option("checkpointLocation", "./logfast") \
#   .start()

# query2 = dfslow \
#   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
#   .writeStream \
#   .format("kafka") \
#   .outputMode("update") \
#   .option("kafka.bootstrap.servers", "35.243.144.79:9092") \
#   .option("topic", "topk1") \
#   .option("checkpointLocation", "./logslow") \
#   .start()

# query3 = dfclient \
#   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
#   .writeStream \
#   .format("kafka") \
#   .outputMode("complete") \
#   .option("kafka.bootstrap.servers", "35.243.144.79:9092") \
#   .option("topic", "source") \
#   .option("checkpointLocation", "./logclient") \
#   .start()



query1.awaitTermination()
# query2.awaitTermination()
# query3.awaitTermination()

Overwriting ./app.py


In [6]:
!./spark-2.4.2-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.2 app.py

Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/spark-2.4.2-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ef1a3b3f-8f6c-4add-8eaa-401ad719eb6f;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;2.4.2 in central
	found org.apache.kafka#kafka-clients;2.0.0 in central
	found org.lz4#lz4-java;1.4.0 in central
	found org.xerial.snappy#snappy-java;1.1.7.3 in central
	found org.slf4j#slf4j-api;1.7.16 in central
	found org.spark-project.spark#unused;1.0.0 in central
:: resolution report :: resolve 361ms :: artifacts dl 8ms
	:: modules in use:
	org.apache.kafka#kafka-clients;2.0.0 from central in [default]
	org.apache.spark#spark-sql-kafka-0-10_2.12;2.4.2 from central in [default]
	org.lz4#lz4-java;1.4