# 10: SparkStreaming

This example shows the original implementation of streaming in Spark, the _Spark streaming_ capability that is based on the `RDD` API. We construct a simple "word count" server. This example watches a directory for new files and reads them as they arrive. The corresponding program version of this example, [SparkStreaming10.scala](https://github.com/deanwampler/spark-scala-tutorial/blob/master/src/main/scala/sparktutorial/SparkStreaming10.scala), supports this input source and a second option, input from a socket. See the [Tutorial.markdown](https://github.com/deanwampler/spark-scala-tutorial/blob/master/Tutorial.markdown), for details.

The newer streaming module is called _Structured Streaming_. It is based on the `Dataset` API, for better performance and convenience. It has supports much lower-latency processing. Examples of this API are TBD here, but see the [Apache Spark Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) for more information.

Watching a directory for new files supports a workflow where some process outputs new files to a "staging" directory where this job will do subsequent processing.

Note that Spark Streaming does not use the `_SUCCESS` marker file we mentioned in an earlier notebook for batch processing, in part because that mechanism can only be used once *all* files are written to the directory. Hence, Spark can't know when writing the file has actually completed. This means you should only use this ingestion mechanism with files that "appear instantly" in the directory, i.e., through renaming from another location in the file system.

For the example, a temporary directory is created and a second process writes the user-specified data file(s) (default: Enron emails) to a temporary directory every second. `SparkStreaming10` does *Word Count* on the data. Hence, the data would eventually repeat, but for convenience, we also stop after 200 iterations (the number of email files).

In [3]:
import java.io.File

In [4]:
val dataSource = new File("../data/enron-spam-ham")
val watchedDirectory = new File("tmp/streaming-input")
val outputPathRoot = new File("streaming-output/")
outputPathRoot.mkdirs()
val outputPath = new File(outputPathRoot, "wc-streaming")
val iterations = 200            // Terminate after N iterations
val sleepIntervalMillis = 1000  // How often to wait between writes of files to the directory
val batchSeconds = 2            // Size of batch intervals

dataSource = ../data/enron-spam-ham
watchedDirectory = tmp/streaming-input
outputPathRoot = streaming-output
outputPath = streaming-output/wc-streaming
iterations = 200
sleepIntervalMillis = 1000
batchSeconds = 2


2

A function to delete a file or a directory and its contents

In [5]:
def rmrf(root: File): Unit = {
  if (root.isFile) root.delete()
  else if (root.exists) {
    root.listFiles.foreach(rmrf)
    root.delete()
  }
}

rmrf: (root: java.io.File)Unit


Use it to remove the watched directory, if one exists from a previous run. Then recreate it.

In [6]:
rmrf(watchedDirectory)

In [7]:
watchedDirectory.mkdirs()

true

We need a second process or dedicated thread to write new files to the watch directory. To support we'll insert here a striped-down version of [util.streaming.DataDirectoryServer.scala](https://github.com/deanwampler/spark-scala-tutorial/blob/master/src/main/scala/sparktutorial/util/streaming/DataDirectoryServer.scala) in the application version of the tutorial. It runs its logic in a separate thread. It Serves data to be used by this notebook by periodically writing a new file to a watched directory, as discussed below.

In [8]:
case class DataServerError(msg: String, cause: Throwable = null) extends RuntimeException(msg, cause)

defined class DataServerError


In [9]:
import java.nio.file.{Files, FileSystems, Path}
import java.nio.file.attribute.BasicFileAttributes
import java.util.function.BiPredicate
import scala.util.control.NonFatal
import scala.collection.JavaConverters._

def makePath(pathString: String): Path = FileSystems.getDefault().getPath(pathString)
def makePath(file: java.io.File): Path = makePath(file.getAbsolutePath)
def makePath(parent: Path, name: String): Path = FileSystems.getDefault().getPath(parent.toString, name)

case class DataDirectoryServer(destinationDirectoryPath: Path, sourceRootPath: Path) extends Runnable {

  def run: Unit = try {
    val sources = getSourcePaths(sourceRootPath)
    if (sources.size == 0) throw DataServerError(s"No sources for path $sourceRootPath!")

    sources.zipWithIndex.foreach { case (source, index) =>
      val destination = makePath(destinationDirectoryPath, source.getFileName.toString)
      println(s"\nIteration ${index+1}: destination: ${destination}")
      Files.copy(source, destination)
      Thread.sleep(sleepIntervalMillis)
    }
  } catch {
    case NonFatal(ex) => throw DataServerError("Data serving failed!", ex)
  }

  /**
   * Get the paths for the source files.
   */
  protected def getSourcePaths(sourcePath: Path): Seq[Path] =
    Files.find(sourcePath, 5,
      new BiPredicate[Path, BasicFileAttributes]() {
        def test(path: Path, attribs: BasicFileAttributes): Boolean = attribs.isRegularFile
      }).iterator.asScala.toSeq
}

defined class DataDirectoryServer


makePath: (pathString: String)java.nio.file.Path <and> (file: java.io.File)java.nio.file.Path <and> (parent: java.nio.file.Path, name: String)java.nio.file.Path
makePath: (pathString: String)java.nio.file.Path <and> (file: java.io.File)java.nio.file.Path <and> (parent: java.nio.file.Path, name: String)java.nio.file.Path
makePath: (pathString: String)java.nio.file.Path <and> (file: java.io.File)java.nio.file.Path <and> (parent: java.nio.file.Path, name: String)java.nio.file.Path


Here is the Spark code for processing the stream. Start by creating the `StreamingContext`.

In [10]:
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.scheduler.{
  StreamingListener, StreamingListenerReceiverError, StreamingListenerReceiverStopped}

val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(batchSeconds))

sc = org.apache.spark.SparkContext@31da4833
ssc = org.apache.spark.streaming.StreamingContext@79b3680f


org.apache.spark.streaming.StreamingContext@79b3680f

Define a listener for the end of the stream.

> **Note:** We have to repeat import statements because of scoping idiosyncrasies in the way cells are converted to Scala.

In [11]:
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.scheduler.{
  StreamingListener, StreamingListenerReceiverError, StreamingListenerReceiverStopped}

class EndOfStreamListener(sc: StreamingContext) extends StreamingListener {
  override def onReceiverError(error: StreamingListenerReceiverError):Unit = {
    println(s"Receiver Error: $error. Stopping...")
    sc.stop()
  }
  override def onReceiverStopped(stopped: StreamingListenerReceiverStopped):Unit = {
    println(s"Receiver Stopped: $stopped. Stopping...")
  }
}

defined class EndOfStreamListener


In [12]:
ssc.addStreamingListener(new EndOfStreamListener(ssc))

Now add the logic to process to the data.

We do _Word Count_, splitting on non-alphabetic characters.

In [13]:
val lines = ssc.textFileStream(watchedDirectory.getAbsolutePath)
val words = lines.flatMap(line => line.split("""[^\p{IsAlphabetic}]+"""))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

lines = org.apache.spark.streaming.dstream.MappedDStream@75d6a390
words = org.apache.spark.streaming.dstream.FlatMappedDStream@3879ca1b
pairs = org.apache.spark.streaming.dstream.MappedDStream@299e0184
wordCounts = org.apache.spark.streaming.dstream.ShuffledDStream@47565b39


org.apache.spark.streaming.dstream.ShuffledDStream@47565b39

Calling print will cause some useful diagnostic output to be printed during every mini-batch:

```text
-------------------------------------------
Time: 1413724627000 ms
-------------------------------------------
(limitless,2)
(grand,2)
(someone,4)
(priority,2)
(goals,1)
(ll,5)
(agree,1)
(offer,2)
(yahoo,3)
(ebook,3)
...
```

The time stamp will increment by 2000 ms each time, because we're running with 2-second batch intervals (or whatever you set `batchSeconds` to above). This particular output comes from the `print` method we added above, which is a useful debug tool for seeing the first 10 or so values in the current batch `RDD`.

In [14]:
wordCounts.print()  // print a few counts...

Calling `saveAsTextFile` will cause new directories to be written under the `outputPath` directory, one new directory per mini-batch. They have names like `output/wc-streaming-1413724628000.out`, with a timestamp appended to our default output argument `output/wc-streaming`, and the extension we add, `out`. Each of these will contain the usual `_SUCCESS` and `part-0000N` files, one for each core that the task is given.

In [15]:
// Generates a separate subdirectory for each interval!!
wordCounts.saveAsTextFiles(outputPath.getAbsolutePath, "out")

Now start the background thread:

In [16]:
val directoryServerThread = new Thread(new DataDirectoryServer(makePath(watchedDirectory), makePath(dataSource)))
directoryServerThread.start()


Iteration 1: destination: /home/jovyan/notebooks/tmp/streaming-input/0003.2004-08-01.BG.spam.txt


directoryServerThread = Thread[Thread-22,5,restricted-b87dcd55-e2f0-41e7-982e-fe2d581c6bcd]


Thread[Thread-22,5,restricted-b87dcd55-e2f0-41e7-982e-fe2d581c6bcd]


Iteration 2: destination: /home/jovyan/notebooks/tmp/streaming-input/0018.2004-08-03.BG.spam.txt

Iteration 3: destination: /home/jovyan/notebooks/tmp/streaming-input/0031.2001-08-03.SA_and_HP.spam.txt

Iteration 4: destination: /home/jovyan/notebooks/tmp/streaming-input/0016.2001-07-06.SA_and_HP.spam.txt

Iteration 5: destination: /home/jovyan/notebooks/tmp/streaming-input/0024.2001-08-01.SA_and_HP.spam.txt

Iteration 6: destination: /home/jovyan/notebooks/tmp/streaming-input/0009.2003-12-18.GP.spam.txt

Iteration 7: destination: /home/jovyan/notebooks/tmp/streaming-input/0018.2001-07-13.SA_and_HP.spam.txt

Iteration 8: destination: /home/jovyan/notebooks/tmp/streaming-input/0024.2003-12-21.GP.spam.txt

Iteration 9: destination: /home/jovyan/notebooks/tmp/streaming-input/0029.2004-08-03.BG.spam.txt

Iteration 10: destination: /home/jovyan/notebooks/tmp/streaming-input/0003.2003-12-18.GP.spam.txt

Iteration 11: destination: /home/jovyan/notebooks/tmp/streaming-input/0037.2001-08-05.SA

Start the streaming process and wait forever. To have it exit after a certain number of milliseconds, pass a number for the milliseconds as the argument to `awaitTermination`. 

We'll wrap this in a separate thread so we can retain some control for stopping everything.

In [17]:
val streamRunnable = new Runnable {
  def run(): Unit = {
    ssc.start()
    ssc.awaitTermination()
  }
}
val streamThread = new Thread(streamRunnable)
streamThread.start()

streamRunnable = $anon$1@17c9de6c
streamThread = Thread[Thread-23,5,restricted-b87dcd55-e2f0-41e7-982e-fe2d581c6bcd]


Thread[Thread-23,5,restricted-b87dcd55-e2f0-41e7-982e-fe2d581c6bcd]

-------------------------------------------
Time: 1527527578000 ms
-------------------------------------------
(amnis,2)
(crappie,1)
(someone,8)
(goals,2)
(ll,11)
(agree,2)
(xeni,2)
(forex,2)
(greater,1)
(order,8)
...

-------------------------------------------
Time: 1527527580000 ms
-------------------------------------------
(sent,1)
(blood,1)
(this,2)
(near,1)
(bone,1)
(healing,1)
(angrily,1)
(upon,1)
(dish,1)
(self,1)
...

-------------------------------------------
Time: 1527527582000 ms
-------------------------------------------
(coverage,1)
(amnis,1)
(play,1)
(profit,1)
(stock,3)
(this,4)
(starting,1)
(is,2)
(its,1)
(otcbb,1)
...

-------------------------------------------
Time: 1527527584000 ms
-------------------------------------------
(tantalum,1)
(under,2)
(health,1)
(its,10)
(arrange,2)
(ivernia,2)
(ore,1)
(forecasts,2)
(have,4)
(line,1)
...

-------------------------------------------
Time: 1527527586000 ms
-------------------------------------------
(someone,3)
(ll,7)

Evaluate the next cell to stop the serving thread and streaming process. (If the cell evaluation hangs, stop or reset the kernel to kill it.)

In [18]:
directoryServerThread.stop()
ssc.stop(stopSparkContext = true)
streamThread.stop()



When finished with it, clean up the watched directory...

In [19]:
rmrf(watchedDirectory)

... and the streaming output directory

In [20]:
rmrf(outputPathRoot)