## Spark Streaming
Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.

Internally, by default, Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees. However, since Spark 2.3, we have introduced a new low-latency processing mode called Continuous Processing, which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees. Without changing the Dataset/DataFrame operations in your queries, you will be able to choose the mode based on your application requirements.

In this guide, we are going to walk you through the programming model and the APIs. We are going to explain the concepts mostly using the default micro-batch processing model, and then later discuss Continuous Processing model. First, let’s start with a simple example of a Structured Streaming query - a streaming word count.

In [2]:
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, to_timestamp
from pyspark.sql.functions import window
from pyspark.sql.functions import avg

### Reading From Kinesis Stream

We will start our streaming application buy reading the data from Kinesis. Kinesis is a message queue, similar to Kafka, provided by AWS. You can read from Kinesis stream in the following way:

In [5]:
kinesisDF = spark \
  .readStream \
  .format("kinesis") \
  .option("streamName", "bitcoin-info") \
  .option("initialPosition", "earliest") \
  .option("region", "us-west-2") \
  .option("awsAccessKey", '') \
  .option("awsSecretKey", '') \
  .load()

Its a good idea to clear up old files, streaming application produces a lot of temp files, and there is a limit to how many temp files you can have for a free Databricks account.

In [7]:
dbutils.fs.rm('dbfs:/SOME_CHECKPOINT_DIRECTORY/', True)
dbutils.fs.rm(('dbfs:/tmp/'), True)

We will enforce a schema as it is more efficient, if we leave it blank Spark can figure out the Schema as well!

In [9]:
pythonSchema = StructType().add("timestamp", StringType()).add("euro_rate", FloatType()).add("usd_rate", FloatType()).add ("gbp_rate", FloatType())

Now we will read from the stream into our streaming dataframe!

In [11]:
bitcoinDF = kinesisDF.selectExpr("cast (data as STRING) jsonData").select(from_json("jsonData", pythonSchema).alias("bitcoin")).select("bitcoin.*")

We will also convert the timestamp column to the timestamp type so we can query with datetime object in Python

In [13]:
bitcoinDF = bitcoinDF.withColumn('timestamp', to_timestamp(bitcoinDF.timestamp, "yyyy-MM-dd'T'HH:mm:ss"))

### Quering! 
Now you can use all the things you learnt previously from Spark SQL! For example, you can groupBy certain attribute and aggregate, filter, or select as you wish! <br/>
We haven't been introduced to the concept of windowing, which we will briefly zoom in now.

A window function can also be applied to to Bucketize rows into one or more time windows given a timestamp specifying column. For that we will use window groupBy function (pyspark.sql.functions.window) <br/>
you can call the window groupby function in the following way: __window(timeColumn, windowDuration, slideDuration=None, startTime=None)__. The definition of slide interval and window interval are as follows:
* Window Duration: how far back in time the windowed transformation goes
* Slide Duration: how often a windowed intransformation is computed

In [16]:
windowedCounts = bitcoinDF.groupBy(window(bitcoinDF.timestamp, "10 minutes", "5 minutes").alias('time_window')).agg(avg(bitcoinDF.euro_rate).alias('window_avg_euro_rate'))
display(windowedCounts)

In [17]:
# to read the stream from memory we will set up a table in our memory called bitcoin_window
query = windowedCounts.writeStream.format("memory").queryName("bitcoin_window").outputMode("complete").start()

In [18]:
# once the stream is ready you can query in SQL using the following way
%sql select time_window.start, time_window.end, window_avg_euro_rate from bitcoin_window limit 5

In [19]:
# you can also take your stream to a dataframe using SQL queries in the following way
df = spark.sql('select time_window.start, time_window.end, window_avg_euro_rate from bitcoin_window')

In [20]:
# we can query on top of live data for average euro rate for the last hour
from datetime import datetime, timedelta
from pyspark.sql.functions import avg
timedelta_ten_mins = datetime.now() - timedelta(minutes=60)
last_hour_rate_query = df.filter(df.start > timedelta_ten_mins).select('window_avg_euro_rate').agg(avg('window_avg_euro_rate').alias('rate')).collect()
print(last_hour_rate_query[0].rate)

## Bitcoin Trading
We created a Bitcoin Trading simulator so you can play around with Spark Streaming! There is a virtual wallet per group which can be viewed at: "your amazon instance"
You can buy and sell bitcoin by using the API. To use the api first configure the user_id and access_key for your group.

In [22]:
user_id = '' # use user_id of your group
access_key = '' #use access_key of your group
api = ''

The following API's are available for you to use.
* __Info:__ can be used to get information about your virtual wallet. You can use this api by sending the following request: __api+'info/?user_id='+user_id__
* __Sell:__ can be used to sell bitcoin. Just define the amount you want to sell and call the api like: __api+'sell/?amount='+amount+'&user_id=+'+user_id+'&access_key='+access_key__
*  __Buy:__ can be used to buy bitcoin. Just define the amount you want to sell and call the api like: __api+'buy/?amount='+amount+'&user_id=+'+user_id+'&access_key='+access_key__

## Examples

We will use the info api to get our current bitcoin for test user in the following way

In [25]:
import json, requests
# do a HTTP rest request
rsp = requests.get( api+'info/?user_id='+user_id).text
# parse JSON data
bitcoin = json.loads(rsp)[0]['bitcoin']
euro = json.loads(rsp)[0]['euro']
# print
print(bitcoin, euro)

## Buy 
Lets buy a bitcoin!

In [27]:
# set amount to 1
amount = 1
amount = str(amount)
rsp = requests.get(api+'buy/?amount='+amount+'&user_id='+user_id+'&access_key='+access_key)
print (rsp.text,rsp.status_code)

In [28]:
rsp = requests.get( api+'info/?user_id='+user_id).text
bitcoin = json.loads(rsp)[0]['bitcoin']
euro = json.loads(rsp)[0]['euro']
print(bitcoin, euro)

## Sell 
Lets sell a bitcoin!

In [30]:
# set amount to 1
amount = 1
amount = str(amount)
rsp = requests.get(api+'sell/?amount='+amount+'&user_id='+user_id+'&access_key='+access_key)
print (rsp.text,rsp.status_code)

In [31]:
rsp = requests.get( api+'info/?user_id='+user_id).text
bitcoin = json.loads(rsp)[0]['bitcoin']
euro = json.loads(rsp)[0]['euro']
print(bitcoin, euro)