# Case 1 Part 2: Anomaly Detection

This notebook showcases the 2nd part of the analytics use case to be tackled in a real-time alerting system:

`Hourly consumption for a household is higher than 1 standard deviation of that household's historical mean consumption for that hour.`

The second part here is to read the data stream from both `readings_prepared` and `alert_1_stats` and use them to detect data anomalies -- the ones that go above 1 standard deviation from the mean. The detected anomalies are then stored in a persistent data store, which in this case is Google BigQuery.

BigQuery is chosen for the fit of further analytical queries down the line. We might be interested to do some BI or advanced analysis down the line. It is also serverless -- we just need to define the Datasets and Tables.


## Setup

Import all the required libraries and set the stream configuration variables.

In [1]:
import spark.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql._

Intitializing Scala interpreter ...

Spark Web UI available at http://spark-alert-1-detect-m:8088/proxy/application_1583148924695_0001
SparkContext available as 'sc' (version = 2.4.5, master = yarn, app id = application_1583148924695_0001)
SparkSession available as 'spark'


import spark.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql._


In [2]:
val kafkaBootstrapServer = "kafka-m:9092"
val kafkaReadingsTopic = "readings_prepared"
val kafkaStatsTopic = "alert_1_stats"
val kafkaDedupWatermarkTime = "1 minute"
val joinWatermarkTime = "1 minute"
val bigQueryTargetTable = "smartplugs.alert_1_anomaly"
val bigQueryTempBucket = "pandora-sde-case/alert_1"
val outputTriggerTime = "1 minute"

kafkaBootstrapServer: String = kafka-m:9092
kafkaReadingsTopic: String = readings_prepared
kafkaStatsTopic: String = alert_1_stats
kafkaDedupWatermarkTime: String = 1 minute
joinWatermarkTime: String = 1 minute
bigQueryTargetTable: String = smartplugs.alert_1_anomaly
bigQueryTempBucket: String = pandora-sde-case
outputTriggerTime: String = 1 minute


## Define The Required Schema

In [3]:
// This will be used to give the source `readings_prepared` stream data a schema
val readingsSchema = StructType(Seq(
    StructField("message_id", StringType, false),
    StructField("reading_ts", TimestampType, false),
    StructField("reading_value", FloatType, false),
    StructField("reading_type", IntegerType, false),
    StructField("plug_id", IntegerType, false),
    StructField("household_id", IntegerType, false),
    StructField("house_id", IntegerType, false)
))

val statsSchema = StructType(Seq(
    StructField("house_id", IntegerType, false),
    StructField("hour", IntegerType, false),
    StructField("mean", FloatType, false),
    StructField("m2", FloatType, false),
    StructField("variance", FloatType, false),
    StructField("std_dev", FloatType, false),
    StructField("count", LongType, false),
    StructField("last_ts", TimestampType, false)
))

readingsSchema: org.apache.spark.sql.types.StructType = StructType(StructField(message_id,StringType,false), StructField(reading_ts,TimestampType,false), StructField(reading_value,FloatType,false), StructField(reading_type,IntegerType,false), StructField(plug_id,IntegerType,false), StructField(household_id,IntegerType,false), StructField(house_id,IntegerType,false))
statsSchema: org.apache.spark.sql.types.StructType = StructType(StructField(house_id,IntegerType,false), StructField(hour,IntegerType,false), StructField(mean,FloatType,false), StructField(m2,FloatType,false), StructField(variance,FloatType,false), StructField(std_dev,FloatType,false), StructField(count,LongType,false), StructField(last_ts,TimestampType,false))


### Read and Parse The Input Data Streams

There are 2 input streams this time, `readings_prep` and `alert_1_stats` topic. They will be joined to detect anomalies

In [None]:
// Drop duplicates if seen in an arbitrary watermark. Bounds are necessary so that Spark does not store 
// ALL records in the state
val readings = spark
    .readStream 
    .format("kafka")
    .option("kafka.bootstrap.servers", kafkaBootstrapServer)
    .option("subscribe", kafkaReadingsTopic)
    .load()
    .selectExpr("CAST(value AS STRING)")
    .select(from_json($"value", readingsSchema).as("data"))
    .select($"data.*")
    .withWatermark("reading_ts", kafkaDedupWatermarkTime) 
    .dropDuplicates()
    .filter($"reading_type" === 1) // Only take the "current load" measurement

Intitializing Scala interpreter ...

In [5]:
// We don't need to deduplicate the stats, but we drop the ones with mean=0
val stats = spark
    .readStream 
    .format("kafka")
    .option("kafka.bootstrap.servers", kafkaBootstrapServer)
    .option("subscribe", kafkaStatsTopic)
    .load()
    .selectExpr("CAST(value AS STRING)")
    .select(from_json($"value", statsSchema).as("data"))
    .select($"data.house_id", $"data.hour", $"data.mean", $"data.std_dev", $"data.last_ts")
    .filter($"mean" > 0.0)


stats: org.apache.spark.sql.DataFrame = [house_id: int, hour: int ... 6 more fields]


#### Peek at The Input Data Streams

##### Readings

In [6]:
val readingsQuery = readings.writeStream.format("memory").queryName("readings").start()
Thread.sleep(10000)
readingsQuery.status

readingsQuery: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@7e4fdee9
res0: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
  "message" : "Getting offsets from KafkaV2[Subscribe[readings_prepared]]",
  "isDataAvailable" : false,
  "isTriggerActive" : true
}


In [24]:

spark.sql("select * from readings").show()

+----------+-------------------+-------------+------------+-------+------------+--------+
|message_id|         reading_ts|reading_value|reading_type|plug_id|household_id|house_id|
+----------+-------------------+-------------+------------+-------+------------+--------+
|  21267584|2013-09-01 01:35:00|       35.099|           0|      1|           0|       1|
|  21371713|2013-09-01 01:36:00|       11.757|           0|      2|           0|       0|
|  22405161|2013-09-01 01:46:20|        0.355|           0|      1|           0|       4|
|  22471510|2013-09-01 01:47:00|          0.0|           1|      0|           0|       4|
|  22636827|2013-09-01 01:48:40|          0.0|           1|      1|           0|       3|
|  23903171|2013-09-01 02:01:20|        0.161|           0|      0|           0|       9|
|  24134854|2013-09-01 02:03:40|        9.853|           1|      2|           0|       0|
|  24302832|2013-09-01 02:05:20|         2.25|           0|      1|           0|       7|
|  2443508

In [8]:
// readingsQuery.stop()
readingsQuery.lastProgress

res2: org.apache.spark.sql.streaming.StreamingQueryProgress =
{
  "id" : "9a99c6a0-e13f-4654-824c-d8fd89919a13",
  "runId" : "9b858307-236e-4829-8378-b927c61ba527",
  "name" : "readings",
  "timestamp" : "2020-03-02T12:48:06.186Z",
  "batchId" : 0,
  "numInputRows" : 33,
  "processedRowsPerSecond" : 0.9723613648417703,
  "durationMs" : {
    "addBatch" : 30895,
    "getBatch" : 11,
    "getEndOffset" : 0,
    "queryPlanning" : 868,
    "setOffsetRange" : 1839,
    "triggerExecution" : 33936,
    "walCommit" : 75
  },
  "eventTime" : {
    "avg" : "2013-09-01T02:30:00.606Z",
    "max" : "2013-09-01T03:18:40.000Z",
    "min" : "2013-09-01T01:35:00.000Z",
    "watermark" : "1970-01-01T00:00:00.000Z"
  },
  "stateOperators" : [ {
    "numRowsTotal" : 33,
    "numRowsUpdated" : 33,
    "memo...

In [9]:
readingsQuery.status

res3: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
  "message" : "Processing new data",
  "isDataAvailable" : true,
  "isTriggerActive" : true
}


##### Stats

In [10]:
val statsQuery = stats.writeStream.format("memory").queryName("stats").start()
statsQuery.status

statsQuery: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@57537f72
res4: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
  "message" : "Getting offsets from KafkaV2[Subscribe[alert_1_stats]]",
  "isDataAvailable" : false,
  "isTriggerActive" : true
}


In [25]:
// Thread.sleep(10000)
spark.sql("select * from stats").show()

+--------+----+-----------+---------+-----------+----------+-----+-------------------+
|house_id|hour|       mean|       m2|   variance|   std_dev|count|            last_ts|
+--------+----+-----------+---------+-----------+----------+-----+-------------------+
|       6|   1|0.040391304|1.0778269|0.023951707|0.15476339|   46|2013-09-01 01:39:40|
|       3|   1|0.042192988|1.8278245|0.032639723|0.18066467|   57|2013-09-01 01:51:20|
|       7|   4|  2.4282303| 8838.108|  58.530518|  7.650524|  152|2013-09-01 04:04:00|
|       2|   2|        0.0|      0.0|        0.0|       0.0|  180|2013-09-01 02:48:40|
|       9|   4|  2.1643322|11563.692|  60.227566|  7.760642|  193|2013-09-01 04:36:40|
|       8|   3|  11.005864|2108.1162|  10.593549| 3.2547731|  200|2013-09-01 03:44:20|
|       2|   3|        0.0|      0.0|        0.0|       0.0|  172|2013-09-01 03:13:20|
|       7|   1|        0.0|      0.0|        0.0|       0.0|   97|2013-09-01 01:47:00|
|       1|   2|  2.9568827| 9.634522| 0.095

In [26]:
// statsQuery.stop()
statsQuery.lastProgress

res17: org.apache.spark.sql.streaming.StreamingQueryProgress =
{
  "id" : "122ae385-ca91-44de-89a9-9825212998a1",
  "runId" : "366dcf43-8124-4bb6-8038-459d7d185705",
  "name" : "stats",
  "timestamp" : "2020-03-02T12:51:12.133Z",
  "batchId" : 7,
  "numInputRows" : 1,
  "inputRowsPerSecond" : 83.33333333333333,
  "processedRowsPerSecond" : 0.05623031938821413,
  "durationMs" : {
    "addBatch" : 17107,
    "getBatch" : 0,
    "getEndOffset" : 0,
    "queryPlanning" : 50,
    "setOffsetRange" : 1,
    "triggerExecution" : 17784,
    "walCommit" : 99
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[alert_1_stats]]",
    "startOffset" : {
      "alert_1_stats" : {
        "0" : 110
      }
    },
    "endOffset" : {
      "alert_1_stats" : {
        "...

In [13]:
statsQuery.status

res7: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
  "message" : "Processing new data",
  "isDataAvailable" : true,
  "isTriggerActive" : true
}


## Join Both Data Streams


We can only compare the readings with the past statistics. The `reading_ts` value of `readings_prepared` stream needs to be bigger than the `last_ts` value of the `alert_1_stats` stream. We need to ensure we are comparing the stats from the right `house_id` and `hour`.

`owever, as the stream runs, the size of streaming state will keep growing indefinitely as all past input must be saved as any new input can match with any input from the past. To avoid unbounded state, you have to define additional join conditions such that indefinitely old inputs cannot match with future inputs and therefore can be cleared from the state. In other words, you will have to do the following additional steps in the join.`

After joining, we need to ensure that we only compare the readings with their latest past statistics.

The alert_1_stats stream is assumed to be slower to arrive compared to the readings_prepared stream, since it needs to wait at least 2 records to start calculating, and the default write trigger is every 30 seconds. 


The alert_1_stats may contain multiple rows with different `last_ts` and stats values as well, and we're going to solve that with an aggregation after joining. We're going to take the max `last_ts` that is still earlier than the `reading_ts` value.




In [14]:
// Anomaly detection is done by getting the latest std_dev and mean value
// Then act on it by comparing to the 
val anomaly = readings
    .withColumnRenamed("house_id", "readings_house_id")
    .withWatermark("reading_ts", joinWatermarkTime)
    .join(
        stats.withWatermark("last_ts", joinWatermarkTime),
        // Join conditions
        $"reading_ts" > $"last_ts" &&
        hour($"reading_ts") === $"hour" &&
        $"readings_house_id" === $"house_id",
        // Join type
        "inner"
    )
    .filter($"reading_value" > $"mean" + $"std_dev")
    .drop("readings_house_id")

anomaly: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [message_id: string, reading_ts: timestamp ... 13 more fields]


In [19]:
val peekQuery = anomaly
    .writeStream
    .format("memory")
    .queryName("anomaly")
    .start()

peekQuery: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@529e235


In [51]:
spark.sql("select * from anomaly").show()

+----------+-------------------+-------------+------------+-------+------------+-----------------+--------+----+--------+---------+---------+--------+-----+-------------------+
|message_id|         reading_ts|reading_value|reading_type|plug_id|household_id|readings_house_id|house_id|hour|    mean|       m2| variance| std_dev|count|            last_ts|
+----------+-------------------+-------------+------------+-------+------------+-----------------+--------+----+--------+---------+---------+--------+-----+-------------------+
|  53392037|2013-09-01 06:56:40|      138.496|           1|      1|           0|                1|       1|   6|26.55637|225374.89|2889.4216|53.75334|   79|2013-09-01 06:22:00|
|  53025749|2013-09-01 06:53:00|      139.305|           1|      1|           0|                1|       1|   6|26.55637|225374.89|2889.4216|53.75334|   79|2013-09-01 06:22:00|
|  56113430|2013-09-01 07:24:00|        0.788|           0|      0|           0|                4|       4|   7|   

In [28]:
peekQuery.lastProgress

res19: org.apache.spark.sql.streaming.StreamingQueryProgress =
{
  "id" : "cd2b8764-9438-4402-b5a8-6ec321691974",
  "runId" : "b2bcd94b-6fa9-466b-bd94-072e6d9ffa89",
  "name" : "anomaly",
  "timestamp" : "2020-03-02T12:51:19.851Z",
  "batchId" : 2,
  "numInputRows" : 13150,
  "inputRowsPerSecond" : 211.21444289179075,
  "processedRowsPerSecond" : 233.37533497790477,
  "durationMs" : {
    "addBatch" : 55768,
    "getBatch" : 0,
    "getEndOffset" : 0,
    "queryPlanning" : 381,
    "setOffsetRange" : 3,
    "triggerExecution" : 56347,
    "walCommit" : 66
  },
  "eventTime" : {
    "avg" : "2013-09-01T12:33:51.803Z",
    "max" : "2013-09-01T14:47:20.000Z",
    "min" : "2013-09-01T10:20:40.000Z",
    "watermark" : "2013-09-01T09:16:20.000Z"
  },
  "stateOperators" : [ {
    "numRowsTotal...

In [21]:
peekQuery.status

res12: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
  "message" : "Processing new data",
  "isDataAvailable" : true,
  "isTriggerActive" : true
}


In [39]:
// peekQuery.stop()

## Write to BigQuery

Write to BigQuery once a minute. BigQuery inserts work better with mini-batched data and this ensures that we give enough time for `alert_1_stats` to accumulate. The [BigQuery connector](https://github.com/GoogleCloudDataproc/spark-bigquery-connector) is installed during cluster setup and loaded automatically when spark-shell session is initiated.

In [54]:
val ingestQuery = anomaly
    .writeStream
    .trigger(Trigger.ProcessingTime(outputTriggerTime))
    .foreachBatch{ 
        (batchDF: DataFrame, batchId: Long) =>
            batchDF.write.format("bigquery")
                .option("table", bigQueryTargetTable)
                .option("temporaryGcsBucket", bigQueryTempBucket)
                .mode(SaveMode.Append)
                .save()
    }.start()

ingestQuery: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@379de019


In [None]:
ingestQuery.lastProgress.stateOperators

In [56]:
ingestQuery.status

res42: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
  "message" : "Getting offsets from KafkaV2[Subscribe[alert_1_stats]]",
  "isDataAvailable" : false,
  "isTriggerActive" : true
}


In [53]:
ingestQuery.stop()