In [1]:
%%javascript
$.getScript('http://asimjalis.github.io/ipyn-ext/js/ipyn-present.js')

<IPython.core.display.Javascript object>

<!-- 
This file was auto-generated from markdown using notedown.
Instead of modifying the ipynb modify the markdown source. 
-->

<h1 class="tocheading">Spark Streaming</h1>
<div id="toc"></div>

Spark Streaming
===============

Why Spark Streaming
-------------------

Q: What problem does Spark Streaming solve?

- Spark like MapReduce is designed to process data as a batch job.

- Nightly batch jobs process large amounts of data and generate insights.

- What if we want to react immediately instead of wait 24 hours.

- Spark Streaming solves this problem.

- It lets you process data immediately in near-realtime.

Spark Streaming Applications
----------------------------

Q: What is an example scenario?

- Suppose you have an intrusion detection system.

- You process log files to determine if the system is under attack.

- Batch processing will take 24 hours to raise an intrusion alert.

- Spark Streaming can detect an intrustion in minutes or seconds.

Micro-Batch Concept
-------------------

Q: How does Spark Streaming work?

- Events are grouped into micro batched RDDs.

- Each RDD contains events from the last few seconds.

- Incoming event stream is turned into RDD stream.

- These micro batched RDDs are joined with existing data to raise alerts.

Spark Streaming RDDs
--------------------

Q: How does Spark Streaming integrate with Spark?

- Spark Streaming converts incoming events into micro batched RDDs.

- These are then processed by the regular Spark APIs.

<img src="images/streaming-arch.png">

<img src="images/streaming-flow-micro-batches.png">

Spark Stack
-----------

Q: How does Spark Streaming fit into the rest of Spark?

- Spark Streaming is a subsystem of Spark.

- Spark Streaming enables handling realtime events.

<img src="images/spark-stack.png">

Spark Streaming Big Picture
---------------------------

- Spark Streaming can consume events from multiple sources.

- These are processed and written out to HDFS, databases, and other
  systems.

<img src="images/streaming-input-output-components.png">


DStream Concept
---------------

- A DStream is a stream of RDDs.

- Think of a DStream as an infinite sequence of RDDs.

<img src="images/streaming-dstream-as-rdds.png">

- The incoming events are batched together into RDDs.

<img src="images/streaming-dstream-time-i.png">


Spark Streaming vs Storm
------------------------

Q: How does Spark Streaming compare with Storm?

- Storm is another system for realtime processing of events.

- Here is a comparison of Storm and Spark Streaming.

Comparison           |Winner     |Spark Streaming      |Storm
----------           |------     |---------------      |-----
Processing Model     |  -        |Mini batches         |Record-at-a-time
Latency              |Storm      |Few seconds          |Sub-second
Fault tolerance      |Spark      |Exactly once         |At least once (may be duplicates)
Batch integration    |Spark      |Spark                |Requires different framework
API                  |Spark      |Simpler              |Complex
Production use       |Storm      |2013                 |2011


Pop Quiz
--------

<details><summary>
Q: What happens to an event that is half in batch `time=1` and half in
batch `time=2`? Which batch does it go to?
</summary>
1. It goes to batch `time=2`.<br>
2. Incomplete events are meaningless.<br>
3. RDDs are formed from fully-formed events.
</details>

Spark Streaming Code
--------------------

Q: How can I write a Spark Streaming app?

- Here is an example of a Spark Streaming app.

```python
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create StreamingContext with 2 threads, and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

# Create DStream to listen to hostname:port
lines = ssc.socketTextStream("localhost", 9999)

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Count words in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print first 10 elements of each RDD in DStream 
wordCounts.pprint()

# Start computation
ssc.start()

# Wait streaming to terminate
ssc.awaitTermination()
```

In [4]:
$SPARK_HOME/bin/spark-submit /Users/Alexander/DSCI6007-student/week6/6.4/network_wordcount2.py localhost 9999

SyntaxError: invalid syntax (<ipython-input-4-fc3a2577df05>, line 1)

In [1]:
# %load "/Users/Alexander/DSCI6007-student/week6/6.4/network_wordcount2.py"
from __future__ import print_function
import pyspark
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import sys



if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingNetworkWordCount")
    ssc = StreamingContext(sc, 1)

    lines = ssc.socketTextStream("localhost", 9999)
    counts = lines.flatMap(lambda line: line.split(" "))\
                  .map(lambda word: (word, 1))\
                  .reduceByKey(lambda a, b: a+b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()

Usage: network_wordcount.py <hostname> <port>


Py4JJavaError: An error occurred while calling o17.awaitTermination.
: java.net.ConnectException: Connection refused
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:579)
	at java.net.Socket.connect(Socket.java:528)
	at java.net.Socket.<init>(Socket.java:425)
	at java.net.Socket.<init>(Socket.java:241)
	at py4j.CallbackConnection.start(CallbackConnection.java:104)
	at py4j.CallbackClient.getConnection(CallbackClient.java:134)
	at py4j.CallbackClient.getConnectionLock(CallbackClient.java:146)
	at py4j.CallbackClient.sendCommand(CallbackClient.java:229)
	at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:111)
	at com.sun.proxy.$Proxy14.call(Unknown Source)
	at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:63)
	at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:197)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
	at scala.Option.orElse(Option.scala:257)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:243)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
	at scala.util.Try$.apply(Try.scala:161)
	at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


In [1]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create StreamingContext with 2 threads, and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

# Create DStream to listen to hostname:port
lines = ssc.socketTextStream("localhost", 9999)

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Count words in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print first 10 elements of each RDD in DStream 
wordCounts.pprint()

# Start computation
ssc.start()

# Wait streaming to terminate
ssc.awaitTermination()

Py4JJavaError: An error occurred while calling o18.awaitTermination.
: java.net.ConnectException: Connection refused
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:579)
	at java.net.Socket.connect(Socket.java:528)
	at java.net.Socket.<init>(Socket.java:425)
	at java.net.Socket.<init>(Socket.java:241)
	at py4j.CallbackConnection.start(CallbackConnection.java:104)
	at py4j.CallbackClient.getConnection(CallbackClient.java:134)
	at py4j.CallbackClient.getConnectionLock(CallbackClient.java:146)
	at py4j.CallbackClient.sendCommand(CallbackClient.java:229)
	at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:111)
	at com.sun.proxy.$Proxy14.call(Unknown Source)
	at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:63)
	at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:197)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
	at scala.Option.orElse(Option.scala:257)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:243)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
	at scala.util.Try$.apply(Try.scala:161)
	at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


Network Source
--------------

Q: How can I create data to feed into this stream?

- You need to write to the network socket 9999 on localhost for this
  streaming app to pick up the events.

- Here is some shell code to do this.

```sh
nc -lk 9999
```

- You can type words into this or write a script that pipes data into
  this periodically.


Notes
-----

- The `StreamingContext` is stored in `ssc`.

- `ssc.socketTextStream` creates a `DStream`.

- DStreams transformations like `flatMap`, `map`, `reduceByKey` 
  create new DStreams.

- DStreams output operations like `pprint` are like RDD actions.

- Except DStream output operations do not cause execution.

Pop Quiz
--------

<details><summary>
Q: When you execute `pprint` on a DStream will anything be printed?
</summary>
1. Nothing is printed.<br>
2. The printing happens when we call `ssc.start()` and when data flows in.
</details>

RDDs vs DStreams
----------------

Q: How are DStream different from RDDs?

- DStream transformations and output operations define an assembly line.
  
- Nothing happens until data comes in.

- When data comes in DStream output operations trigger the execution
  of DStream transformations.

<img src="images/donuts.jpg">

Transformations and Output Operations
=====================================

DStream Transformations
-----------------------

Q: How are DStream transformations different from RDD transformations?

- DStream transformations define what will happen to RDDs when they
  arrive.
  
- DStream transformations produce new DStreams that will contain 
  transformed RDDs.

- Nothing happens until data arrives.

<img src="images/streaming-dstream-ops.png">

Transforming DStreams
---------------------

Transformation                                 |For Each Incoming RDD
--------------                                 |---------------------
`ds.map(lambda line: line.upper())`            |Uppercase `line` 
`ds.flatMap(lambda line: line.split())`        |Split `line` into words
`ds.filter(lambda line: line.strip() != '')`   |Exclude `line` if it is empty
`ds.repartition(10)`                           |Repartition RDD into 10 partitions
`ds.reduceByKey(lambda v1,v2: v1+v2)`          |For each key sum values 
`ds.groupByKey()`                              |For each key group values into iterable

Generic Transformations
-----------------------

Q: How can I apply an arbitrary transformation on the incoming RDDs?

- DStreams have some but not all of the transformations as RDDs.

- For example, `sortByKey()` is not supported on DStreams.

- Instead DStreams provide `transform()` 

- `transform()` lets you translate any RDD transformation to DStreams.

- These two have the same effect.

```python
ds.transform(lambda rdd: rdd.flatMap(lambda line: line.split()))
```

```python
ds.flatMap(lambda line: line.split())
```

Pop Quiz
--------

<details><summary>
Q: How can you write `sortByKey()` for DStreams?
</summary>
```python
ds.transform(lambda rdd: rdd.sortByKey())
```
</details>

Pop Quiz
--------

Consider this code:

```python
ds.transform(lambda rdd: rdd.flatMap(lambda line: line.split()))
```

<details><summary>
Q: Where does `lambda line: ...` execute? 
</summary>
On the executors.
</details>


<details><summary>
Q: Where does `lambda rdd: ...` execute? 
</summary>
On the driver.
</details>


DStream Output Operations
-------------------------

Expression                                     |Meaning
----------                                     |-------
`ds.foreachRDD(lambda rdd: func(rdd.first()))` |Call `func()` on `first()` of each incoming RDD
`ds.pprint(num=10)`                            |Print first 10 elements of each incoming RDD
`ds.saveAsTextFiles('foo',suffix=None)`        |Save each incoming RDD's partitions to disk

Notes
-----

- These output operations only execute when RDDs start arriving.

- `foreachRDD` is a generic output operation.

- `foreachRDD` lets you define arbitrary output operations on incoming RDDs.

Pop Quiz
--------

<details><summary>
Q: Print the count of incoming RDDs.
</summary>
```python
# Enable print as a function
from __future__ import print_function

# Define the output operation
ds.foreachRDD(lambda rdd: print(rdd.count()))
```
</details>

<details><summary>
Q: Where will the lambda inside the `foreachRDD` execute?
</summary>
1. It will execute on the driver.<br>
2. This is because RDDs are defined on the driver, not on the executors.<br>
</details>


Testing Streaming Apps Using QueueStream
----------------------------------------

Q: Manually testing apps using `nc` is quite tedious. Is there an
easier more automatable way to do this?

- *Queue streams* enable you to create preprogrammed streams perfect
  for automated testing and test-driven development.

Counting Event Types
--------------------

Q: Count how many events of different types are in incoming stream in
each micro-batch.

- Here is the code.

In [2]:
%%file test_queue_stream.py
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

from pprint import pprint

import time
import random

print 'Initializing ssc'
ssc = StreamingContext(SparkContext(), batchDuration=1) # batchDuration is 1 second

print 'Initializing event_rdd_queue'
event_rdd_queue = []
for i in xrange(5):
    # range(10) * 10 means "output 0,1,2,...,9 ten times
    events = range(10) * 10
    event_rdd = ssc.sparkContext.parallelize(events)
    event_rdd_queue.append(event_rdd)
pprint(event_rdd_queue)

print 'Building DStream pipeline'
ds = ssc\
    .queueStream(event_rdd_queue) \
    .map(lambda event: (event, 1)) \
    .reduceByKey(lambda v1,v2: v1+v2)
ds.pprint()

print 'Starting ssc'
ssc.start()
time.sleep(6)

print 'Stopping ssc'
ssc.stop(stopSparkContext=True, stopGraceFully=True)

Writing test_queue_stream.py


- Lets run this and see what happens.

In [6]:
%%sh
$SPARK_HOME/bin/spark-submit test_queue_stream.py

Process is terminated.


Aggregating RDD
===============

Merging DStreams
----------------

Transformation      |Effect
--------------      |------
`ds1.union(ds2)`    |Combine RDD in `ds1` with RDD in same batch in `ds2`
`ds1.join(ds2)`     |Join RDD in `ds1` with RDD in same batch in `ds2`

Note
----

- For `union` or `join` the DStreams must have identical batch
  durations.

- The batches are matched up based on timestamps.


Windowing Operations
--------------------

Q: How can I process multiple RDDs within a window of time?

```python
ds2 = ds1.window(windowDuration=30, slideDuration=10)
```

- Batches RDDs into 30-second windows 

- Produces new window every 10 seconds

<img src="images/streaming-dstream-window.png">

Windowing Operations
--------------------

Q: Calculate the average of a series of heads and tails using a
window.

In [5]:
%%file test_window.py

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

from pprint import pprint

import time

print 'Initializing ssc'
ssc = StreamingContext(SparkContext(), batchDuration=1)

print 'Initializing rdd_queue'
rdd_queue = []
for i in xrange(5): 
    rdd_data = xrange(1000)
    rdd = ssc.sparkContext.parallelize(rdd_data)
    rdd_queue.append(rdd)
pprint(rdd_queue)

print 'Creating queue stream'
ds = ssc\
    .queueStream(rdd_queue)\
    .map(lambda x: (x % 10, 1))\
    .window(windowDuration=4,slideDuration=2)\
    .reduceByKey(lambda v1,v2:v1+v2)
ds.pprint()

print 'Starting ssc'
ssc.start()
time.sleep(20)

print 'Stopping ssc'
ssc.stop(stopSparkContext=True, stopGraceFully=True)

Writing test_window.py


- Lets run this and see what happens.

In [5]:
%%sh
$SPARK_HOME/bin/spark-submit test_window.py

Initializing ssc
Initializing rdd_queue
[PythonRDD[5] at RDD at PythonRDD.scala:43,
 PythonRDD[6] at RDD at PythonRDD.scala:43,
 PythonRDD[7] at RDD at PythonRDD.scala:43,
 PythonRDD[8] at RDD at PythonRDD.scala:43,
 PythonRDD[9] at RDD at PythonRDD.scala:43]
Creating queue stream
Starting ssc
-------------------------------------------
Time: 2015-10-04 18:06:49
-------------------------------------------
(0, 200)
(8, 200)
(4, 200)
(1, 200)
(5, 200)
(9, 200)
(2, 200)
(6, 200)
(3, 200)
(7, 200)
()
-------------------------------------------
Time: 2015-10-04 18:06:51
-------------------------------------------
(0, 400)
(8, 400)
(4, 400)
(1, 400)
(5, 400)
(9, 400)
(2, 400)
(6, 400)
(3, 400)
(7, 400)
()
-------------------------------------------
Time: 2015-10-04 18:06:53
-------------------------------------------
(0, 300)
(8, 300)
(4, 300)
(1, 300)
(5, 300)
(9, 300)
(2, 300)
(6, 300)
(3, 300)
(7, 300)
()
-------------------------------------------
Time: 2015-10-04 18:06:55
--------------

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/10/04 18:06:44 INFO SparkContext: Running Spark version 1.4.1
15/10/04 18:06:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/10/04 18:06:45 INFO SecurityManager: Changing view acls to: Alexander
15/10/04 18:06:45 INFO SecurityManager: Changing modify acls to: Alexander
15/10/04 18:06:45 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Alexander); users with modify permissions: Set(Alexander)
15/10/04 18:06:46 INFO Slf4jLogger: Slf4jLogger started
15/10/04 18:06:46 INFO Remoting: Starting remoting
15/10/04 18:06:46 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.0.0.9:52443]
15/10/04 18:06:46 INFO Utils: Successfully started service 'sparkDriver' on port 52443.
15/10/04 18:06:46 INFO SparkEnv: Registering MapOutputTracker
1

Windowing Operations With Inverse
---------------------------------

Q: How can I avoid the overhead of adding or averaging over the same
values in a window?

```python
windows_word_counts = pair_ds.reduceByKeyAndWindow(
    func=lambda x, y: x + y,
    invFunc=lambda x, y: x - y, 
    windowDuration=30,
    slideDuration=10)
```

- Creates window of length `windowDuration` (30 seconds)

- Moves window every `slideDuration` (10 seconds)

- Merges incoming values using `func`

- Eliminates outgoing values using `invFunc`

- `windowDuration` and `slideDuration` are in seconds

- These must be multiples of the `batchDuration` of the DStream

- This requires that *checkpointing* is enabled on the StreamingContext.

<img src="images/streaming-windowed-stream.png">

<img src="images/streaming-windowed-stream-with-inv.png">

Streaming Durations
-------------------

Q: What are the different durations in a DStream and which one should
I use?

Type               |Meaning
----               |-------
Batch Duration     |How many seconds until next incoming RDD
Slide Duration     |How many seconds until next window RDD
Window Duration    |How many seconds to include in window RDD

Duration Impact
---------------

Q: What is the impact of increasing these durations?

Type                 |Increase                                   |Effect 
----                 |--------                                   |------ 
Batch Duration       |Larger but less frequent incoming RDDs     |Less Processing 
Slide Duration       |Less frequent window RDDs                  |Less Processing
Window Duration      |Larger window RDDs                         |More Processing

Duration Summary
----------------

- Batch and window duration control RDD size

- Batch and slide duration control RDD frequency

- Larger RDDs have more context and produce better insights.

- Larger RDDs might require more processing.

- Bundling frequent small RDDs into infrequent larger ones can reduce processing.

State DStreams
--------------

Q: How can I aggregate a value over the lifetime of a streaming
application?

- You can do this with the `updateStateByKey` transform.

```python
# add new values with previous running count to get new count
def updateFunction(newValues, runningCount):
    if runningCount is None:
       runningCount = 0
    return sum(newValues, runningCount)  

runningCounts = pairs.updateStateByKey(updateFunction)
```

- This takes a DStream made up of key-value RDDs

- For each incoming RDD for each key it aggregates the values with the
  previous values seen for that key.

- Like the windowing transformations, this requires that checkpointing
  be enabled on the StreamingContext.

Testing Streaming Apps Using TextFileStream
-------------------------------------------

Q: The QueueStream does not work with windowing operations or any
other operations that require checkpointing. How can code that uses
`updateStateByKey` be tested? 

- We can use TextFileStream instead.
- Lets define a function `xrange_write` which we will use for the following examples.
- This will write numbers 0, 1, 2, ... to directory `input`.
- It will write 5 numbers per second, one per line.

In [1]:
%%file text_file_util.py
import itertools
import time
import random
import uuid

from distutils import dir_util 

# Every batch_duration write a file with batch_size numbers, forever.
# Start at 0 and keep incrementing. (For testing.)

def xrange_write(
        batch_size = 5,
        batch_dir = 'input',
        batch_duration = 1):
    dir_util.mkpath('./input')
    
    # Repeat forever
    for i in itertools.count():
        # Generate data
        min = batch_size * i 
        max = batch_size * (i + 1)
        batch_data = xrange(min,max)
      
        # Write to the file
        unique_file_name = str(uuid.uuid4())
        file_path = batch_dir + '/' + unique_file_name
        with open(file_path,'w') as batch_file: 
            for element in batch_data:
                line = str(element) + "\n"
                batch_file.write(line)
    
        # Give streaming app time to catch up
        time.sleep(batch_duration)

Writing text_file_util.py


Counting Events
---------------

Q: How can I count a certain type of event in incoming data?

- You can use state DStreams.

- This code takes a mod by 10 of the incoming numbers.

- Then it counts how many times each number between 0 and 9 is seen.

In [3]:
%%file test_count.py
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from text_file_util import xrange_write

from pprint import pprint

# add new values with previous running count to get new count
def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)  

print 'Initializing ssc'
ssc = StreamingContext(SparkContext(), batchDuration=1)
ssc.checkpoint('ckpt')

ds = ssc.textFileStream('input') \
    .map(lambda x: int(x) % 10) \
    .map(lambda x: (x,1)) \
    .updateStateByKey(updateFunction)

ds.pprint()
ds.count().pprint()

print 'Starting ssc'
ssc.start()

# Write data to textFileStream
xrange_write()

Writing test_count.py


- Lets run this and see what happens.

In [4]:
%%sh
$SPARK_HOME/bin/spark-submit test_count.py

Process is terminated.


- The program will run forever. To terminate hit `Ctrl-C`.

Pop Quiz
--------

<details><summary>
Q: How can you calculate a running average using a state DStream?
</summary>
1. In the above example, for the RDD key-value pair, replace `value`
with `(sum,count)`. <br>
2. In `updateStateByKey` add both to `sum` and `count`.<br>
3. Use `map` to calculate `sum/count` which is the average.<br>
</details>

<!--
Pop Quiz
--------

<details><summary>
Q: How can you calculate a running standard deviation using a state DStream?
</summary>
1. See https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online_algorithm<br>  <-- Wrong!
</details>
-->



Join 
----

Q: How can I detect if an incoming credit card transaction is from a
canceled card?

- You can join DStreams against a batch RDD.
- Store the historical data in the batch RDD.
- Join it with the incoming DStream RDDs to determine next action.
- Note: You must get the batch RDD using the `ssc.SparkContext`.

```python
dataset = ... # some RDD
windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))
```

Detecting Bad Customers
-----------------------
Q: Create a streaming app that can join the incoming orders with our
previous knowledge of whether this customer is good or bad.

- Create the streaming app.

In [5]:
%%file test_join.py
# Import modules.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

from pprint import pprint

import time

# Create the StreamingContext.

print 'Initializing ssc'
ssc = StreamingContext(SparkContext(), batchDuration=1)


# For testing create prepopulated QueueStream of streaming customer orders. 

print 'Initializing queue of customer transactions'
transaction_rdd_queue = []
for i in xrange(5): 
    transactions = [(customer_id, None) for customer_id in xrange(10)]
    transaction_rdd = ssc.sparkContext.parallelize(transactions)
    transaction_rdd_queue.append(transaction_rdd)
pprint(transaction_rdd_queue)

# Batch RDD of whether customers are good or bad. 

print 'Initializing bad customer rdd from batch sources'
# (customer_id, is_good_customer)
customers = [
        (0,True),
        (1,False),
        (2,True),
        (3,False),
        (4,True),
        (5,False),
        (6,True),
        (7,False),
        (8,True),
        (9,False) ]
customer_rdd = ssc.sparkContext.parallelize(customers)

# Join the streaming RDD and batch RDDs to filter out bad customers.
print 'Creating queue stream'
ds = ssc\
    .queueStream(transaction_rdd_queue)\
    .transform(lambda rdd: rdd.join(customer_rdd))\
    .filter(lambda (customer_id, (customer_data, is_good_customer)): is_good_customer)

ds.pprint()

ssc.start()
time.sleep(6)
ssc.stop()

Writing test_join.py


- Lets run this and see what happens.

In [9]:
%%sh
$SPARK_HOME/bin/spark-submit test_join.py

Initializing ssc
Initializing queue of customer transactions
[ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:396,
 ParallelCollectionRDD[1] at parallelize at PythonRDD.scala:396,
 ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:396,
 ParallelCollectionRDD[3] at parallelize at PythonRDD.scala:396,
 ParallelCollectionRDD[4] at parallelize at PythonRDD.scala:396]
Initializing bad customer rdd from batch sources
Creating queue stream
-------------------------------------------
Time: 2015-10-03 19:02:41
-------------------------------------------
(0, (None, True))
(8, (None, True))
(2, (None, True))
(4, (None, True))
(6, (None, True))
()
-------------------------------------------
Time: 2015-10-03 19:02:42
-------------------------------------------
(0, (None, True))
(8, (None, True))
(2, (None, True))
(4, (None, True))
(6, (None, True))
()
-------------------------------------------
Time: 2015-10-03 19:02:43
-------------------------------------------
(0, (None, True

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/10/03 19:02:36 INFO SparkContext: Running Spark version 1.4.1
15/10/03 19:02:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/10/03 19:02:37 INFO SecurityManager: Changing view acls to: Alexander
15/10/03 19:02:37 INFO SecurityManager: Changing modify acls to: Alexander
15/10/03 19:02:37 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Alexander); users with modify permissions: Set(Alexander)
15/10/03 19:02:38 INFO Slf4jLogger: Slf4jLogger started
15/10/03 19:02:38 INFO Remoting: Starting remoting
15/10/03 19:02:38 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.0.0.9:65029]
15/10/03 19:02:38 INFO Utils: Successfully started service 'sparkDriver' on port 65029.
15/10/03 19:02:38 INFO SparkEnv: Registering MapOutputTracker
1

Pop Quiz
--------

<details><summary>
Q: If you are joining with a large batch RDD how can you minimize the
shuffling of the records?
</summary>
1. Use `partitionBy` on the incoming RDDs as well as on the batch
RDD.<br>
2. This will ensure that records are partitioned by their keys.<br>
3. This can make a real difference in the performance of your Big Data
streaming app.<br>
</details>

Cluster View
------------

<img src="images/streaming-daemons.png">

Checkpointing
-------------

Q: How can I protect my streaming app against failure?

- Streaming apps run for much longer than batch apps.

- They can run for days and weeks.

- So fault-tolerance is important for them.

- To enable recovery from failure you must enable checkpointing.

- If a checkpointed application crashes, you restart it and it
  recovers the state of the RDDs when it crashed.

In [10]:
%%file test_checkpointing.py
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from text_file_util import xrange_write
    
from pprint import pprint
    
def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)  
    
checkpointDir = 'ckpt'
    
def functionToCreateContext():
    ssc = StreamingContext(SparkContext(), batchDuration=2)
    
    # Add new values with previous running count to get new count
    ds = ssc.textFileStream('input') \
        .map(lambda x: int(x) % 10) \
        .map(lambda x: (x,1)) \
        .updateStateByKey(updateFunction)
    ds.pprint()
    ds.count().pprint()
    
    # Set up checkpoint
    ssc.checkpoint(checkpointDir)
    return ssc
    
print 'Initializing ssc'
ssc = StreamingContext.getOrCreate(
    checkpointDir, functionToCreateContext)
    
print 'Starting ssc'
ssc.start()
    
# Write data to textFileStream
xrange_write()

Writing test_checkpointing.py


- Lets run this and see what happens.

In [None]:
%%sh
$SPARK_HOME/bin/spark-submit test_checkpointing.py

- The program will run forever. To terminate hit `Ctrl-C`.