### Crypto Streaming

If you set up the upstream part of our streaming pipeline, you should have near real-time trading data of different cryptocurrencies being sent to different Kafka topics. In this notebook, we will read the trading value of our cryptocurrencies (in USD) and do some fun stuff with them!  

#### Getting Started (Imports & Setting Variables)

First of all, to connect to Kafka from Pyspark, we need the right kind of extensions. These extensions are not built in, but luckily, using a neat trick we can define it within our notebook. More details: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4 pyspark-shell'

In [60]:
# Spark and Structured Streaming related imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, window, max, min, avg
from pyspark.sql.functions import split
from pyspark.sql.types import StringType, FloatType, StructType, StructField
from pyspark.sql.functions import from_json, col, to_timestamp

In [3]:
# get a spark session
spark = SparkSession.builder.appName("CryptoStreaming").getOrCreate()

#### Start reading a stream
Spark's new structured streaming means we can stream the data straight into a dataframe! To do that, first we use the readStream to read a topic from Kafka like below.

In [4]:
# read stream and subscribe to bitcoin topic
df = spark.readStream \
          .format("kafka") \
          .option("kafka.bootstrap.servers", "10.128.0.16:19092") \
          .option("startingOffsets", "earliest") \
          .option("subscribe", "BTC") \
          .load()

Keep in mind when we are reading the value from Kafka, we are also reading a lot of metadata that is internal to Kafka. You can take a look at these by using by using printSchema.

In [5]:
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



You can also take a look at the raw content of the data received from Kafka. To do that, first we write a query to a new sql dataframe. This takes a snapshot of the stream, and it can be written to disk or save to memory for followup sql operations.

In [6]:
raw_df = df \
         .writeStream \
         .queryName("rawdata")\
         .format("memory")\
         .start()

In [7]:
raw = spark.sql("select * from rawdata")
raw.show()

+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
+---+-----+-----+---------+------+---------+-------------+



#### Structuring The Value & Parsing JSON to Dataframe
We can use the select expression to select the value column and also use the from_json function to parse the JSON data.

In [8]:
# select only the value column
raw_value_df = df.selectExpr("CAST(value AS STRING)")

In [9]:
# write stream to memory
raw_value_query = raw_value_df.writeStream \
                              .queryName("raw_value")\
                              .format("memory")\
                              .start()

In [10]:
# use the select statement to take snapshot of the query
raw_value_query = spark.sql("select * from raw_value")
# print 20 values, False is so we can see the full value in the table
raw_value_query.show(20,False)

+-----+
|value|
+-----+
+-----+



In [11]:
# we need to define the schema for parsing json value
schema = StructType([StructField("timestamp", StringType(), True),
                     StructField("usd_value", StringType(), True)])

In [12]:
# parse json value and get bitcoin dataframe
json_value_df = raw_value_df.selectExpr("cast (value as STRING) json_data")\
                            .select(from_json("json_data", schema).alias("bitcoin"))\
                            .select("bitcoin.*")

Although we could have parsed the data to the right format at the time we were writing the structure, it is often a good practice not to. By converting to string first and later converting to the right format here, we make our code a little bit more robust.

In [13]:
# convert to timestamp and integer
json_value_df = json_value_df.withColumn('timestamp',to_timestamp(json_value_df.timestamp, 'dd-MM-yyyy HH:mm:ss'))\
                       .withColumn('usd_value', json_value_df.usd_value.cast("double"))

In [20]:
# print out the schema
json_value_df.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- usd_value: double (nullable = true)



In [14]:
# write to memory, take a snapshot, and show off our well-structured dataframe
bitcoin_query = json_value_df.writeStream.format("memory").queryName("bitcoin_value").start()

In [17]:
bitcoin_df = spark.sql("select * from bitcoin_value")
bitcoin_df.show()

+-------------------+---------+
|          timestamp|usd_value|
+-------------------+---------+
|2019-10-17 19:24:08|  8014.91|
|2019-10-17 19:24:44|  8014.91|
|2019-10-17 19:25:21|  8014.91|
|2019-10-17 19:26:04|  8018.71|
|2019-10-17 19:26:45|  8018.71|
|2019-10-17 19:27:26|  8018.71|
|2019-10-17 19:28:08|  8017.44|
|2019-10-17 19:28:50|  8018.16|
|2019-10-17 19:29:31|  8018.16|
|2019-10-17 19:30:12|  8018.07|
|2019-10-17 19:30:54|  8016.51|
|2019-10-17 19:31:35|  8016.03|
|2019-10-17 19:32:16|  8017.77|
|2019-10-17 19:32:58|  8015.45|
|2019-10-17 19:33:39|  8015.53|
|2019-10-17 19:34:20|  8016.71|
|2019-10-17 19:35:02|  8016.55|
|2019-10-17 19:35:43|  8016.09|
|2019-10-17 19:36:24|  8014.42|
|2019-10-17 19:37:06|  8015.75|
+-------------------+---------+
only showing top 20 rows



#### How much did bitcoin price fluctuate in the last ten minutes? 

Now we have our dataframe in the right format, lets write some interesting queries. We will start of by answering the simple question, how much did the value of Bitcoin (in terms of USD) fluctuate in the last ten minutes? 

In [21]:
from datetime import datetime, timedelta

In [22]:
ten_minutes_ago_dt = (datetime.now() - timedelta(minutes=10))
ten_mins_bitcoin_df = bitcoin_df.filter(bitcoin_df.timestamp > ten_minutes_ago_dt)

In [23]:
ten_min_count = ten_mins_bitcoin_df.count()
ten_min_max = ten_mins_bitcoin_df.agg({"usd_value": "max"}).collect()[0][0]
ten_min_min = ten_mins_bitcoin_df.agg({"usd_value": "min"}).collect()[0][0]

In [24]:
print('In the last ten minutes, we received {0} updates and the price fluctuated {1:.2f} USD' .format(ten_min_count, ten_min_max - ten_min_min))

In the last ten minutes, we received 14 updates and the price fluctuated 28.04 USD


In [25]:
ten_mins_bitcoin_df.agg({"usd_value": "max"}).collect()

[Row(max(usd_value)=7878.63)]

#### Window Functions
Window functions are one of the most useful things in Spark's collection of arsenals. a window function computes a value for each and every row in a window. In our case, for example, if we want to know the max value or min value per 5 minutes interval, we can use a window function on our streaming data to get the answer.

Some good further reading options:
* https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-functions-windows.html
* https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

In [50]:
# count the number of data received per 5 minutes
bitcoin_window_df = json_value_df.groupBy(window(json_value_df.timestamp, '5 minutes')).count()

In [51]:
query = bitcoin_window_df \
        .writeStream \
        .format("memory") \
        .queryName("window_count") \
        .outputMode("complete") \
        .start()

In [54]:
bitcoin_window_df = spark.sql("select * from window_count")
bitcoin_window_df.show(20,False)

+------------------------------------------+-----+
|window                                    |count|
+------------------------------------------+-----+
|[2019-10-17 19:45:00, 2019-10-17 19:50:00]|7    |
|[2019-10-18 02:20:00, 2019-10-18 02:25:00]|7    |
|[2019-10-18 04:05:00, 2019-10-18 04:10:00]|7    |
|[2019-10-17 21:30:00, 2019-10-17 21:35:00]|7    |
|[2019-10-18 03:20:00, 2019-10-18 03:25:00]|7    |
|[2019-10-18 02:55:00, 2019-10-18 03:00:00]|8    |
|[2019-10-18 00:20:00, 2019-10-18 00:25:00]|8    |
|[2019-10-17 23:30:00, 2019-10-17 23:35:00]|7    |
|[2019-10-18 00:35:00, 2019-10-18 00:40:00]|7    |
|[2019-10-18 01:30:00, 2019-10-18 01:35:00]|7    |
|[2019-10-18 04:00:00, 2019-10-18 04:05:00]|7    |
|[2019-10-17 20:30:00, 2019-10-17 20:35:00]|7    |
|[2019-10-18 03:00:00, 2019-10-18 03:05:00]|7    |
|[2019-10-18 03:50:00, 2019-10-18 03:55:00]|8    |
|[2019-10-17 23:55:00, 2019-10-18 00:00:00]|7    |
|[2019-10-18 00:50:00, 2019-10-18 00:55:00]|7    |
|[2019-10-18 05:00:00, 2019-10-

In [55]:
# get max per 5 minutes window
bitcoin_window_df = json_value_df.groupBy(window(json_value_df.timestamp, '5 minutes')).max()

In [56]:
query = bitcoin_window_df \
        .writeStream \
        .format("memory") \
        .queryName("window_max") \
        .outputMode("complete") \
        .start()

In [59]:
bitcoin_window_df = spark.sql("select * from window_max")
bitcoin_window_df.show(20,False)

+------------------------------------------+--------------+
|window                                    |max(usd_value)|
+------------------------------------------+--------------+
|[2019-10-17 19:45:00, 2019-10-17 19:50:00]|8010.24       |
|[2019-10-18 02:20:00, 2019-10-18 02:25:00]|8047.74       |
|[2019-10-18 04:05:00, 2019-10-18 04:10:00]|8024.53       |
|[2019-10-17 21:30:00, 2019-10-17 21:35:00]|8016.45       |
|[2019-10-18 03:20:00, 2019-10-18 03:25:00]|8035.64       |
|[2019-10-18 02:55:00, 2019-10-18 03:00:00]|8048.16       |
|[2019-10-18 00:20:00, 2019-10-18 00:25:00]|8029.64       |
|[2019-10-17 23:30:00, 2019-10-17 23:35:00]|8033.02       |
|[2019-10-18 00:35:00, 2019-10-18 00:40:00]|8045.99       |
|[2019-10-18 01:30:00, 2019-10-18 01:35:00]|8059.49       |
|[2019-10-18 04:00:00, 2019-10-18 04:05:00]|8031.42       |
|[2019-10-17 20:30:00, 2019-10-17 20:35:00]|8012.93       |
|[2019-10-18 03:00:00, 2019-10-18 03:05:00]|8048.16       |
|[2019-10-18 03:50:00, 2019-10-18 03:55:

In [66]:
# get max per 5 minutes window
bitcoin_window_df = json_value_df.groupBy(window(json_value_df.timestamp, '5 minutes'))\
                        .agg(max('usd_value').alias('max'), min('usd_value').alias('min'), avg('usd_value').alias('avg'))

In [67]:
query = bitcoin_window_df \
        .writeStream \
        .format("memory") \
        .queryName("window_aggs") \
        .outputMode("complete") \
        .start()

In [70]:
bitcoin_window_df = spark.sql("select * from window_aggs")
bitcoin_window_df.show(20,False)

+------------------------------------------+-------+-------+------------------+
|window                                    |max    |min    |avg               |
+------------------------------------------+-------+-------+------------------+
|[2019-10-17 19:45:00, 2019-10-17 19:50:00]|8010.24|8009.75|8010.0085714285715|
|[2019-10-18 02:20:00, 2019-10-18 02:25:00]|8047.74|8042.5 |8046.20857142857  |
|[2019-10-18 04:05:00, 2019-10-18 04:10:00]|8024.53|8017.61|8021.59142857143  |
|[2019-10-17 21:30:00, 2019-10-17 21:35:00]|8016.45|8015.15|8015.694285714286 |
|[2019-10-18 06:25:00, 2019-10-18 06:30:00]|7852.71|7842.62|7846.925          |
|[2019-10-18 03:20:00, 2019-10-18 03:25:00]|8035.64|8035.25|8035.557142857143 |
|[2019-10-18 02:55:00, 2019-10-18 03:00:00]|8048.16|8044.38|8046.2525000000005|
|[2019-10-18 00:20:00, 2019-10-18 00:25:00]|8029.64|8018.77|8024.7325         |
|[2019-10-17 23:30:00, 2019-10-17 23:35:00]|8033.02|8028.27|8030.597142857144 |
|[2019-10-18 00:35:00, 2019-10-18 00:40:

Important concepts to read further on:
* Checkpointing
* Watermarking
* Stream joins