# Processing Data Streams with Spark

This chapter introduces the **Spark Structured Streaming API** - a convenient way to process streaming data without having to reason about the details of streaming.

## What is Streaming Data?

All examples we have seen so far have dealt with processing bounded data sets. In contrast, a **data stream** is an _unbounded sequence of data arriving continuously_. The following table illustrates the differences to the _batch processing_ we have handled so far. 


| | Batch processing |	Stream processing |
|---|---|--|
| **Data scope** | 	Queries or processing over all or most of the data in the dataset.|	Queries or processing over data within a rolling time window, or on just the most recent data record. |
|**Data size**	| Large batches of data.| Individual records or micro batches consisting of a few records. | 
| **Performance** | 	Latencies in minutes to hours. |	Requires latency in the order of seconds or milliseconds. | 


## Spark Structured Streaming

[**Spark Structured Streaming**](https://spark.apache.org/docs/2.3.0/structured-streaming-programming-guide.html) is a scalable stream processing engine built on top of [**📓 Spark SQL**](spark-structured-data.ipynb). Its main advantage is that stream processing applications can be written in a familiar, declarative way,  without having to reason about the minutiae of streaming. Internally Structured Streaming treats data streams as a series of small batch jobs. Processing each batch is handled efficiently by the Spark SQL engine.

![](graphics/third-party/spark-structured-streaming.png)
_Source: https://databricks.com_

## Minimal Example: Streaming Word Count

In this first example, we are going to build another word count application - with an important difference: The text input is not read from a file. It is sent through the network, arriving at a _TCP socket_ - a network endpoint defined by an IP address and a port. 

1. We start by writing th#e stream processing job as a script for PySpark.

In [None]:
%%file scripts/streaming-word-count.py

import pyspark
# functions for word count
from pyspark.sql.functions import explode, split, col


# create a Spark SQL session
spark = pyspark.sql.SparkSession \
    .builder \
    .appName("StreamingWordCount") \
    .getOrCreate()

# avoid lengthy log messages in output
spark.sparkContext.setLogLevel("ERROR")

# define streaming data source
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# define data transformation and output
streamingWordCount = lines \
    .select(    
       explode(
           split(lines.value, " ")
       ).alias("word")
    )\
    .groupBy("word") \
    .count() \
    .sort(col("count").desc()) \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()  

streamingWordCount.awaitTermination()

Before we discuss the code in detail, let's run it and demonstrate the behavior.

2. Before starting the Spark streaming job, the network connection needs to be opened over which streaming input can be sent. We supply input to this connection via the `netcat` command line utility.
      ```
      > nc -lk 9999
      ```
3. We start the streaming data processing job in the familiar way using `spark-submit`.

In [None]:
import os
os.environ["PYSPARK_PYTHON"] = "python3"

In [None]:
!spark-submit scripts/streaming-word-count.py

The following is a more detailed look at the code blocks that make up the stream processing job.

1. This call defines and connects to a data stream source via a _TCP socket_ - defined by a hostname and a port.


In [None]:
if False: # do not run this for now
    lines = spark \
        .readStream \
        .format("socket") \
        .option("host", "localhost") \
        .option("port", 9999) \
        .load()

2. This block contains the word count data processing step using familiar `pyspark.sql.DataFrame` operations:

In [None]:
if False: # do not run this for now
    streamingWordCount = lines \
        .select(    
           explode(
               split(lines.value, " ")
           ).alias("word")
        )\
        .groupBy("word") \
        .count() \
        .sort(col("count").desc()) 

3. Finally, we direct streaming output (generated whenever new incoming data arrives) to the terminal and start the processing:

In [None]:
if False:
    streamingWordCount \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()  



## Open Ended Project: Twitter Stream Analytics

**Use the Twitter API to read and analyze a stream of tweets associated with a specific hashtag in real time**

Prerequisites:
- a Twitter developer account to generate API keys

Useful Python libraries:
- `getpass`
- [`tweepy`](https://www.tweepy.org)

## References
- [What is Streaming Data?](https://aws.amazon.com/streaming-data/)
- [Structured Streaming Programming Guide](https://spark.apache.org/docs/2.3.0/structured-streaming-programming-guide.html)
- [Streaming + Scikit-Learn](https://towardsdatascience.com/streaming-scikit-learn-with-pyspark-c4806116a453)
