Spark Streaming

* Objectives:
    * Knowing use cases for Spark Streaming
    * How to process data via streaming
    * Pre-existing methods for problems before Spark Streaming
    * Understanding existing streaming systems
    * Knowing how Spark Streaming works
    * Knowing Spark Streaming Programming Models: Discretized Stream and Window-based Transformations
    * Arbitrary Combinations of Batch and Streaming Computations
    * Advantages of Unified Stack (Batch and Streaming)
    * Advantages of Spark Streaming (Performance, Fault-tolernace, Input Sources, Output Sinks)

1) Use Cases For Spark Streaming
* Fraud detection in bank transactions
* Anomalies in sensor data
* Identifying cats in videos from tweets

2) Processing, Storing, and Transforming Raw Data
* Scales to hundreds of nodes
* Achieves low latency
* Efficiently recover from failures
* Integrates with batch and interactive processing

3) Pre-existing methods before Spark Streaming
* Difficulties of Combined Batch and Streaming Frameworks
    * Build two stacks - one for batch and one for streaming
        * Often both process the same data
    * Existing frameworks cannot do both
        * Either, stream processing of 100s of MB/s with low latency
        * Or, batch processing of TBs of data with high latency
    * Extremely painful to maintain two stacks
        * Different programming models
        * Doubles implementation effort
        * Doubles operational effort
* Creating Fault-tolerant Streaming processing
    * Traditional processing model:
    ![mutable_stream_processing](mutable_stream_processing.png)
        * Pipeline of nodes
        * Each node maintains mutable state
        * Each input record updates the state and new records are sent out
        * Mutable state is lost if node fails
        * Making stateful stream processing fault-tolerant is challenging
    * Existing Streaming Systems:
        * Apache Storm
        ![apache_storm](https://image.slidesharecdn.com/june29550yahoopeng-160711212806/95/resource-aware-scheduling-in-apache-storm-5-638.jpg?cb=1468272514)
            * Replays record if not processed by a node
            * Processes each record **at least once**
            * May update mutable state twice!
            * Mutable state can e lost due to failure!
        * Apache Trident
        ![apache_trident](https://image.slidesharecdn.com/0490ef01-8e4e-4c29-b5f0-62c12f69bb97-150325220330-conversion-gate01/95/apache-storm-concepts-25-638.jpg?cb=1427321235)
            * Processes each record **exactly once**
            * Per-state transaction to external database is slow

4) What is Spark Streaming?
![spark_streaming](spark_streaming.png)
* Recieve data streams from input sources, process them in a cluster, push out to databases/dashboards
* Scalable, fault-tolerant, second-scale latencies
* Spark Streaming Steps:
![spark_streaming_steps](spark_streaming_steps.png)
    1. Chop up data streams into batches of few seconds
    2. Spark treats each batch of data as RDDs and processes them using RDD operations
    3. Processed results are pushed out in batches
* Spark Streaming Programming Model
    * **Discretized Stream (DStream)** - represents a stream of data and implemented as a sequence of RDDs
        * Create input DStreams from different sources
        * Apply parallel operations
        * Example: Get hashtags from Twitter
        ```scala
        val ssc = new StreamingContext(sparkContext, Seconds(1))
        val tweets = TwitterUtils.createStream(ssc, auth) 
        // tweets = Input DStream (stored in memory as RDDs)
        ```
        ![hashtags](hashtags.png)
        ```scala
        val hashTags = tweets.flatMap(status => getTags(status))
        // hashTags = transformed DStream
        // flatMap() = modifies data in one DStream to create another DStream
        ```
        ![hashtags_transform](hashtags_transform.png)
        ```scala
        hashTags.saveAsHadoopFiles("hdfs://...")
        // pushes data to external storage
        ```
        ![hashtags_hdfs](hashtags_hdfs.png)
        ```scala
        hashTags.foreachRDD(hashTagRDD => { ... })
        // do whatever you want with the processed data
        ```
        ![hashtags_foreach](hashtags_foreach.png)
        * **Window-based Transformations**
        ```scala
        val tagCounts = hashTags.window(Minutes(1), Seconds(5)).countByValue()
        // window() = sliding window operation
        // Minutes() = window length
        // Seconds() = sliding interval
        ```
        ![window_transformation](window_transformation.png)
    * **Arbitrary Stateful Computations** - specify function to generate new state based on previous state and new data
        * Example: Maintain per-user mood as state, and update it with their tweets
        ```scala
        def updateMood(newTweets, lastMood) => newMood
        val moods = tweetsByUser.updateStateByKey(updateMood _)
        ```
    * **Arbitrary Combinations of Batch and Streaming Computations** - intermix RDD and DStream operations
        * Example: Join incoming tweets with a spam HDFS file to filter out bad tweets
        ```scala
        tweets.transform(tweetsRDD => {
            tweetsRDD.join(spamFile).filter(...)
        })
        ```
        * Combine live data streams with historical data
            * Generate historical data models with Spark
            * Use data models to process live data stream
        * Combine streaming with MLlib, GraphX algorithms
            * Offline learning, online prediction
            * Online learning and prediction
        * Interactively query streaming data using SQL
        ```scala
        spark.sql("select * from table_from_streaming_data")
        ```
        * Advantage of an Unified Stack
            * Explore data interactively to identify problems
            * Use same code in Spark for processing large logs
            * Use similar code in Spark Streaming for realtime processing

5) Performance / Fault-tolerance / Input & Output Sources
* **Performance** - can process 60M records/sec (6 GB/sec) on 100 nodes at sub-second latency
![streaming_performance](streaming_performance.png)
* **Fault-tolerance**
    * Batches of input data are replicated in memory for fault-tolerance
    * Data lost due to worker failure, can be recomputed from replicated input data
    * All transformations are fault-tolerant, and **exactly-once** transformations
    ![fault_tolerant_streaming](fault_tolerant_streaming.png)
* **Input Sources**
    * Natively Supported: Kafka, Flume, Kinesis, Raw TCP sockets, HDFS, etc.
    * Easy to write customer receiver (define when receiver is started and stopped)
    * Generate own sequence of RDDs and push them in as "stream"
* **Output Sinks**
    * HDFS, S3 (Hadoop API compatible filesystems)
    * Cassandra (using Spark-Cassandra connector)
    * HBase
    * Directly push data anywhere

6) Spark Streaming vs Structured Streaming
* (-) Prior to Spark 2.2, if you wanted to write a production streaming app you had to code against spark’s original streaming API.
* (-) Spark Streaming API made no guarantees about out of order delivery of information and made no delivery guarantees (e.g. at least once, at most once, exactly once.) In addition, the API was very close to the classic RDD model, meaning you operated on a single micro-batch at a time and any maintenance of state across batches had to be manually handled.
* (+) Structured Streaming API allows for "standing queries" over our streams, and create/update query state in a distributed database when processing every batch.
* (+) Structured streaming API allows use of familiar DataFrame API to define standing queries that refresh their state in a consistent way. (This is very similar to what Microsoft has been doing for years with StreamInsight.)

```scala
val kafkaStream = spark.createStream(kafkaConsumer)
 
// Process the stream and write to postgres
kafkaStream.groupBy(col("organization"), window(col("time"), "5 minute"))
    .sum()
    .writeStream.format("jdbc")
    .start("jdbc:postgresql:…")
```
* (+) Spark Streaming API works better on less structured data, and performance on less structured tasks (enrichment, security, machine learning) is better.

7) Competitors To Structured Streaming
* **Apache Flink**
    * (+) Has lower latency than Spark Structured Streaming
    * Spark Structured Streaming has still microbatches used in background. However, it supports event-time processing, quite low latency (but not as low as Flink), supports SQL and type-safe queries on the streams in one API; no distinction, every Dataset can be queried both with SQL or with typesafe operators. It has end-to-end exactly-one semantics
    * Spark Continous Processing Mode is in development and it will give Spark ~1ms latency, comparable to those from Flink. However, as I said, it's still in progress. The API is ready for non-batch jobs, so it's easier to do than in previous Spark Streaming.
    * Spark relies on micro-batching now and Flink is has pre-scheduled operators. That means, Flink's latency is lower, but Spark Community works on Continous Processing Mode, which will work similar (as far as I understand) to receivers.
* **Apache Beam** - is an API that separates the building of a data processing pipeline from the actual engine on which it would run. It includes abstractions for specifying the data pipeline, the actual data stream (akin to Spark's RDDs), transformation functions, "runners" (the compute engine), and the sources and targets. It's one of a growing number of approaches for flattening the Lambda architecture, so you can combine real time and batch processing (and interactive as well) on the same code base and cluster.
    * (+) The biggest gap is with functions dependent on true stream processing (the ability to process one event at time and set discrete time windows), where Spark Streaming's microbatch capabilities either fall short or require workarounds. (Although Structured Streaming fixes this issue.)
    * Each of these compute engines -- Google Cloud Dataflow, Spark, Flink, and Apex, all want to be your one-stop shop. And that's where Beam becomes coopetition with Spark -- it will work with Spark, but theoretically, it will work with other engines.

8) **Structured Streaming** - fast, scalable, fault-tolerant, end-to-end exactly-once stream processing **without the user having to reason about streaming** (Production Ready in Spark 2.2)
* Overview:
    * a scalable and fault-tolerant stream processing engine built on the Spark SQL engine 
    * allows taking the same operations that perform in batch mode and perform the in a streaming fashion
        * Allows for simple switches (express your streaming computation the same way you would express a batch computation on static data)
    * Reduces latency and allow for incremental processing (The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive)
    * The system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs
* Programming Model:
    * The key idea in Structured Streaming is to treat a live data stream as a table that is being **continuously appended**
        * This leads to a new stream processing model that is very similar to a batch processing model. 
    * Spark runs it as an **incremental** query on the **unbounded** input table
    ![input_table](https://spark.apache.org/docs/latest/img/structured-streaming-stream-as-a-table.png)
        * Consider the input data stream as the “Input Table”. 
        * Every data item that is arriving on the stream is like **a new row being appended** to the Input Table.
    * A query on the input will generate the "Result Table". Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table.
    ![result_table](https://spark.apache.org/docs/latest/img/structured-streaming-model.png)
        * Whenever the result table gets updated, we would want to write the changed result rows to an external sink.
    * **Output** - defined as what gets written out to the external storage. The output can be defined in a different mode:
        * **Complete Mode** - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table.
        * **Append Mode** - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.
        * **Update Mode** - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it will be equivalent to Append mode.
    * Example: Words over time
    ![output](https://spark.apache.org/docs/latest/img/structured-streaming-example-model.png)
        * If there is new data, Spark will run an "incremental" query that combines the previous running counts with the new data to compute updated counts
        * This model is significantly different from many other stream processing engines.
            * Many streaming systems require the user to **maintain running aggregations themselves**, thus having to reason about fault-tolerance, and data consistency (at-least-once, or at-most-once, or exactly-once).
            * In this model, **Spark is responsible for updating the Result Table when there is new data**, thus relieving the users from reasoning about it.
    * Handling Event-time and Late Data
        * **Handling Event-time** - time embedded in the data itself
            * Example: get the number of events generated by IoT devices every minute
                * use the time when the data was generated (that is, event-time in the data), rather than the time Spark receives them
            * This event-time is very naturally expressed in this model – each event from the devices is a row in the table, and event-time is a column value in the row.
            * Allows window-based aggregations (e.g. number of events every minute) to be just a special type of grouping and aggregation on the event-time column
                * each time window is a group and each row can belong to multiple windows/groups
                * such event-time-window-based aggregation queries can be defined consistently on both a static dataset (e.g. from collected device events logs) as well as on a data stream
            * Example: Instead of running word counts, we want to count words within 10 minute windows, updating every 5 minutes
            ![window_op_event_time](https://spark.apache.org/docs/latest/img/structured-streaming-window.png)
                * word counts in words received between 10 minute windows 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20, etc.
                * Note that 12:00 - 12:10 means data that arrived after 12:00 but before 12:10.
                * consider a word that was received at 12:07. This word should increment the counts corresponding to two windows 12:00 - 12:10 and 12:05 - 12:15
        * **Handling Late Data** - this model naturally handles data that has arrived later than expected based on its event-time
            * Since Spark is updating the Result Table, it has full control over updating old aggregates when there is late data, as well as cleaning up old aggregates to limit the size of intermediate state data
            * Since Spark 2.1, we have support for **watermarking** which allows the user to specify the threshold of late data, and allows the engine to accordingly clean up old state
            * Example: Text Over Time with Late Data
            ![window_op_late_data](https://spark.apache.org/docs/latest/img/structured-streaming-late-data.png)
                * a word generated at 12:04 (i.e. event time) could be received by the application at 12:11
                * The application should use the time 12:04 instead of 12:11 to update the older counts for the window 12:00 - 12:10
                * it’s necessary for the system to bound the amount of intermediate in-memory state it accumulates (means the system needs to know when an old aggregate can be dropped from the in-memory state because the application is not going to receive late data for that aggregate any more)
                * **watermarking** - lets the engine automatically track the current event time in the data and attempt to clean up old state accordingly
                

9) Example For Structured Streaming
* Example: Retail Store Dataset (Pay attention to code changes)
    * Read as **static** data using DataFrame API
    ```scala
    %scala
    val staticDataFrame = spark.read.format(“csv”)
       .option(“header”, “true”)
       .option(“inferSchema”, “true”)
       .load("dbfs:/mnt/defg/retail-data/by-day/*.csv")
    staticDataFrame.createOrReplaceTempView("retail_data")
    val staticSchema = staticDataFrame.schema
    ```
    ```python
    %python
    staticDataFrame = spark.read.format("csv") \
       .option("header", "true") \
       .option("inferSchema", "true") \
       .load("dbfs:/mnt/defg/retail-data/by-day/*.csv")
    staticDataFrame.createOrReplaceTempView("retail_data")
    staticSchema = staticDataFrame.schema
    ```
    * Aggregating static time-series data by largest sale hours where a given customer (`CustomerId`) makes a large purchase (total cost column and days a customer spent the most)
    ```scala
    %scala
    import org.apache.spark.sql.functions.{window, column, desc, col}
    staticDataFrame.selectExpr(
       "CustomerId",
       "(UnitPrice * Quantity) as total_cost",
       "InvoiceDate")
       .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day"))
       .sum("total_cost")
       .orderBy(desc("sum(total_cost)"))
       .take(5)
    ```
    ```python
    %python
    from pyspark.sql.functions import window, column, desc, col
    staticDataFrame.selectExpr(
        "CustomerId",
        "(UnitPrice * Quantity) as total_cost", 
        "InvoiceDate") \
        .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day")) \
        .sum("total_cost") \
        .orderBy(desc("sum(total_cost)")) \
        .take(5)
    ```
    ```sql
    %sql
    SELECT
       sum(total_cost),
       CustomerId,
       to_date(InvoiceDate)
    FROM (SELECT
       CustomerId,
       (UnitPrice * Quantity) as total_cost,
       InvoiceDate
    FROM
       retail_data)
    GROUP BY
       CustomerId, to_date(InvoiceDate)
    ORDER BY
       sum(total_cost) DESC
    ```
    * Read as **streaming** data using DataFrame API
        * use `readStream` instead of `read`
        * `maxFilesPerTrigger` - specifies number of files we should read in at once
            * In production scenario, this would be omitted
        * More executors the better for high number of partitions (by default is 200 for DataFrame API)
    ```scala
    %scala
    spark.conf.set("spark.sql.shuffle.partitions", "5") // less partitions for local mode with less executors
    val streamingDataFrame = spark.readStream
        .schema(staticSchema)
        .option("maxFilesPerTrigger", 1)
        .format("csv")
        .option("header", "true")
        .load("dbfs:/mnt/defg/retail-data/by-day/*.csv")
    streamingDataFrame.isStreaming // returns true 
    ```
    ```python
    %python
    streamingDataFrame = spark.readStream \
        .schema(staticSchema) \
        .option("maxFilesPerTrigger", 1) \
        .format("csv") \
        .option("header", "true") \
        .load("dbfs:/mnt/defg/retail-data/by-day/*.csv")
    streamingDataFrame.isStreaming() # returns true
    ```