I ran this notebook in Openshift, in a container image that has Spark pre-installed. Ammonite can download spark packages but I wanted to avoid duplicating the jar files, so I added this code to place the preinstalled Spark jars on the kernel's path. If you want or need to download Spark using Ammonite `$ivy` magic, you can comment this cell out.

In [1]:
// put the spark install from the base notebook image onto Ammonite's classpath
java.nio.file.Files.list(java.nio.file.Paths.get("/opt/spark/jars")).toArray.map(_.toString).foreach { fname =>
  val path = java.nio.file.FileSystems.getDefault().getPath(fname)
  val x = ammonite.ops.Path(path)
  interp.load.cp(x)
}

Library support for Spark in Ammonite and Kafka streaming:

In [2]:
// Load the ammonite-spark package to get AmmoniteSparkSession
import $ivy.`sh.almond::ammonite-spark:0.1.1`
// Load Spark's kafka streaming package
import $ivy.`org.apache.spark::spark-sql-kafka-0-10:2.2.0`

[32mimport [39m[36m$ivy.$                                
// Load Spark's kafka streaming package
[39m
[32mimport [39m[36m$ivy.$                                             [39m

This library provides UDAFs for sketching data distributions with T-Digests. I am loading a custom library rev that has my prototype aggregator for top-k (aka heavy-hitters, aka most-frequent-items):

In [3]:
//import $ivy.`org.isarnproject::isarn-sketches-spark:0.3.1-sp2.2-py2.7`
import $ivy.`org.isarnproject::isarn-sketches-spark:0.3.1-topk-1-sp2.2-py2.7`

[32mimport [39m[36m$ivy.$                                                                [39m

Kick off a spark session using Ammonite's custom shim library. I'm just running a couple workers directly in the container but you can also connect to an external cluster:

In [None]:
import org.apache.spark.sql._
val spark = {
    AmmoniteSparkSession.builder()
      .master("local[2]")
      .getOrCreate()
  }

All of my imports in one cell.
Mostly Spark DataFrame & Structured Streaming definitions, plus some sketching definitions
from `isarn-sketches-spark`

In [5]:
import spark.sqlContext.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.streaming.Trigger
import org.isarnproject.sketches._, org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._

[32mimport [39m[36mspark.sqlContext.implicits._
[39m
[32mimport [39m[36morg.apache.spark.sql.functions._
[39m
[32mimport [39m[36morg.apache.spark.sql.types._
[39m
[32mimport [39m[36morg.apache.spark.sql.expressions._
[39m
[32mimport [39m[36morg.apache.spark.sql.streaming.Trigger
[39m
[32mimport [39m[36morg.isarnproject.sketches._, org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._[39m

This bit of code just sets spark's logging so that only warnings or errors show up in the output cells,
which makes notebook output much more readable

In [6]:
val appender = org.apache.log4j.Logger.getRootLogger().getAppender("console").asInstanceOf[org.apache.log4j.ConsoleAppender]
appender.setThreshold(org.apache.log4j.Level.WARN)

[36mappender[39m: [32morg[39m.[32mapache[39m.[32mlog4j[39m.[32mConsoleAppender[39m = org.apache.log4j.ConsoleAppender@23b1156

Defines some UDFs and UDAF:

* A UDF to generate a wordcount from a field of text
* A UDAF to sketch the CDF of some numeric data using a t-digest
* Some UDFs that extract percentiles from a CDF sketch

In [7]:
spark.udf.register("wordcount", (text: String)=>text.split(" ").filter(_.length > 0).length)
val sketchCDF = tdigestUDAF[Double].delta(0.2).maxDiscrete(25)
spark.udf.register("p50", (c:Any)=>c.asInstanceOf[TDigestSQL].tdigest.cdfInverse(0.5))
spark.udf.register("p90", (c:Any)=>c.asInstanceOf[TDigestSQL].tdigest.cdfInverse(0.9))
spark.udf.register("p99", (c:Any)=>c.asInstanceOf[TDigestSQL].tdigest.cdfInverse(0.99))

[36mres6_0[39m: [32mUserDefinedFunction[39m = [33mUserDefinedFunction[39m(
  <function1>,
  IntegerType,
  [33mSome[39m([33mList[39m(StringType))
)
[36msketchCDF[39m: [32mTDigestUDAF[39m[[32mDouble[39m] = [33mTDigestUDAF[39m([32m0.2[39m, [32m25[39m)
[36mres6_2[39m: [32mUserDefinedFunction[39m = [33mUserDefinedFunction[39m(<function1>, DoubleType, [32mNone[39m)
[36mres6_3[39m: [32mUserDefinedFunction[39m = [33mUserDefinedFunction[39m(<function1>, DoubleType, [32mNone[39m)
[36mres6_4[39m: [32mUserDefinedFunction[39m = [33mUserDefinedFunction[39m(<function1>, DoubleType, [32mNone[39m)

This cell attaches to a kafka stream, unpacks the kafka message data, and returns a streaming data-frame called
`records` that has fields:

* user_id  (string user id)
* text (text of a social media post)
* wordcount (the number of words in text)

In [8]:
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka:9092")
  .option("subscribe", "social-firehose")
  .load()
val values = df.select(($"value").cast(StringType))
val structure = StructType(Seq("text","user_id","update_id").map{f=>StructField(f, StringType, true)})
val records = values.select(from_json($"value", structure).alias("json"))
    .select($"json.user_id", $"json.text")
    .select($"user_id",
            $"text",
            callUDF("wordcount", $"text").alias("wordcount"))

[36mdf[39m: [32mDataFrame[39m = [key: binary, value: binary ... 5 more fields]
[36mvalues[39m: [32mDataFrame[39m = [value: string]
[36mstructure[39m: [32mStructType[39m = [33mStructType[39m(
  [33mStructField[39m([32m"text"[39m, StringType, true, {}),
  [33mStructField[39m([32m"user_id"[39m, StringType, true, {}),
  [33mStructField[39m([32m"update_id"[39m, StringType, true, {})
)
[36mrecords[39m: [32mDataFrame[39m = [user_id: string, text: string ... 1 more field]

This cell demonstrates a streaming query that averages word-count, as grouped by user id:

In [9]:
val t = records.groupBy($"user_id")
    .agg(avg($"wordcount").alias("avg"))
    .orderBy($"avg".desc)
val query = t.writeStream
  .trigger(Trigger.ProcessingTime("15 seconds"))
  .outputMode("complete")
  .format("console")
  .start()
query.awaitTermination(50 * 1000)
Thread.sleep(3 * 1000)
query.stop()

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+---+
|user_id|avg|
+-------+---+
+-------+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+----------+----+
|   user_id| avg|
+----------+----+
|2064885917|26.0|
|5014661643|22.0|
|1623497379|21.0|
|2743183655|19.0|
|8488850176|16.0|
|1676389725|16.0|
|8348322486|15.0|
|2556837131|14.0|
|5157754624|10.0|
|1955219266| 9.0|
|0975976068| 9.0|
|3467819502| 4.0|
+----------+----+

-------------------------------------------
Batch: 2
-------------------------------------------
+----------+----+
|   user_id| avg|
+----------+----+
|8925539058|31.0|
|2064885917|26.0|
|4472301601|25.0|
|5014661643|22.0|
|1623497379|21.0|
|2743183655|19.0|
|2875279841|18.0|
|0153559785|16.0|
|8488850176|16.0|
|1676389725|16.0|
|8348322486|15.0|
|3161332818|15.0|
|2556837131|14.0|
|7358241084|14.0|
|7902498897|11.0|
|3932624939|10.0|
|5157754624|10.0|
|19

[36mt[39m: [32mDataset[39m[[32mRow[39m] = [user_id: string, avg: double]
[36mquery[39m: [32mstreaming[39m.[32mStreamingQuery[39m = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@30d3740c
[36mres8_2[39m: [32mBoolean[39m = false

This cell demonstrates the use of a custom UDAF to aggregate a sketch of the distribution of some streaming numeric
data. The aggregation is being grouped by a time window. 
Here we are sketching the distribution of word counts, and then calling some UDFs to extract percentiles from that sketch. 

In [10]:
val t = records.withColumn("time", current_timestamp()).groupBy(window($"time", "30 seconds", "10 seconds"))
    .agg(sketchCDF($"wordcount").alias("CDF"))
    .select(callUDF("p50", $"CDF").alias("p50"),
            callUDF("p90", $"CDF").alias("p90"))
val query = t.writeStream
  .trigger(Trigger.ProcessingTime("20 seconds"))
  .outputMode("complete")
  .format("console")
  .start()
query.awaitTermination(65 * 1000)
Thread.sleep(3 * 1000)
query.stop()

-------------------------------------------
Batch: 0
-------------------------------------------
+---+---+
|p50|p90|
+---+---+
+---+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------+----+
|               p50| p90|
+------------------+----+
|15.333333333333334|33.0|
|15.333333333333334|33.0|
|15.333333333333334|33.0|
+------------------+----+

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------+------------------+
|               p50|               p90|
+------------------+------------------+
|15.333333333333334|              33.0|
|15.333333333333334|              33.0|
|              18.0|30.200000000000003|
|              18.0|              30.6|
|              18.0|30.200000000000003|
+------------------+------------------+

-------------------------------------------
Batch: 3
-------------------------------------------
+------------------+-------

[36mt[39m: [32mDataFrame[39m = [p50: double, p90: double]
[36mquery[39m: [32mstreaming[39m.[32mStreamingQuery[39m = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@6498f72e
[36mres9_2[39m: [32mBoolean[39m = false

In this cell I am defining a utility function to make it more convenient for grouping strongly-typed `Dataset` streams by a time window.

In [11]:
object windowing {
    import java.sql.Timestamp
    import java.time.Instant
    def windowBy[R](f:R=>Timestamp, width: Int) = {
        val w = width.toLong * 1000L
        (row: R) => {
            val tsCur = f(row)
            val msCur = tsCur.getTime()
            val msLB = (msCur / w) * w
            val instLB = Instant.ofEpochMilli(msLB)
            val instUB = Instant.ofEpochMilli(msLB+w)
            (Timestamp.from(instLB), Timestamp.from(instUB))
        }
    }
}

defined [32mobject[39m [36mwindowing[39m

In this cell, a streaming query extracts hash-tags from the raw social media text.
Here, the data is treated as a strongly-typed streaming `Dataset`.
The data is grouped by time window, and a heavy-hitter aggregator is applied to identify the most
frequent hash-tags.

In [12]:
val windowBy60 = windowing.windowBy[(java.sql.Timestamp, String)](_._1, 60)
val tka = new org.isarnproject.sketches.udaf.TopKAggregator[(java.sql.Timestamp, String)](_._2).toColumn
val t = records
  .select(current_timestamp().alias("time"),
          explode(split($"text", " ")).alias("word"))
  .as[(java.sql.Timestamp, String)]
  .filter(_._2(0)=='#')
  .groupByKey(windowBy60).agg(tka)
  .map { case (tw, tk) => (tw._2, tk.toVector.toString)}
val query = t.writeStream
  .trigger(Trigger.ProcessingTime("20 seconds"))
  .option("truncate", false)
  .outputMode("complete")
  .format("console")
  .start()
query.awaitTermination(125 * 1000)
Thread.sleep(3 * 1000)
query.stop()

-------------------------------------------
Batch: 0
-------------------------------------------
+---+---+
|_1 |_2 |
+---+---+
+---+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+-------------------+---------------------------------------------------------+
|_1                 |_2                                                       |
+-------------------+---------------------------------------------------------+
|2018-09-28 23:52:00|Vector((#_,3), (#two,3), (#Elinor,2), (#first,2), (#3,2))|
+-------------------+---------------------------------------------------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+-------------------+------------------------------------------------------------+
|_1                 |_2                                                          |
+-------------------+------------------------------------------------------------+
|2018-09-28 23:52:

[36mwindowBy60[39m: ([32mjava[39m.[32msql[39m.[32mTimestamp[39m, [32mString[39m) => ([32mjava[39m.[32msql[39m.[32mTimestamp[39m, [32mjava[39m.[32msql[39m.[32mTimestamp[39m) = <function1>
[36mtka[39m: [32mTypedColumn[39m[([32mjava[39m.[32msql[39m.[32mTimestamp[39m, [32mString[39m), [32mArray[39m[([32mString[39m, [32mInt[39m)]] = topkaggregator()
[36mt[39m: [32mDataset[39m[([32mjava[39m.[32msql[39m.[32mTimestamp[39m, [32mString[39m)] = [_1: timestamp, _2: string]
[36mquery[39m: [32mstreaming[39m.[32mStreamingQuery[39m = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@6bcf6952
[36mres11_4[39m: [32mBoolean[39m = false