##Streaming Data Quality using AWS Deequ

This notebook uses the `Deequ` package from AWS to run analysis on a streaming data source, and to derive key quality metrics about the data. Deequ is able to provide a variety of quantitative statistics and metrics about a dataset, and has utilities to generate, track, and interpret these metrics. See [this blog](https://aws.amazon.com/blogs/big-data/test-data-quality-at-scale-with-deequ/) for more detailed examples, or check the [GitHub repo](https://github.com/awslabs/deequ/).

For this notebook, we use structured streaming, combined with Delta tables and the Deequ package, to provide a live view of a dataset's "health".

<div style="text-align: center; line-height: 0; padding-top: 9px;"><img src="https://i.imgur.com/zrMP9oM.png" height="700" width = "700"/></div>

We'll use several Deequ metrics in our analysis; some of these are explained below. The full list can be found in the above links.
- `ApproxCountDistinct`: returns the approximate count of distinct values in a column
- `Distinctness`: returns the fraction of (distinct values / total values) in a column
- `Completeness`: returns the fraction of values that are non-null in a column
- `Compliance`: returns the fraction of values in a column that meet a given constraint

_Note: this notebook requires the Deequ package; add the package from Maven Central using com.amazon.deequ. For Slack notifications, [spark-slack](https://github.com/MrPowers/spark-slack) or a similar package is required._

Before we begin, we need to do some cleanup; we'll also need to download some data.

In [0]:
%fs
mkdirs /tmp/StreamingDataQuality/

In [0]:
%sh
# clear the delta checkpoint
rm -rf /dbfs/tmp/StreamingDataQuality/checkpoint

# download some generated stock tick data; this is a public Mockaroo endpoint- as such, we can't guarantee availability!
curl "https://api.mockaroo.com/api/2aedaa80?count=1000&key=8eb06b50" > /dbfs/tmp/StreamingDataQuality/stockTicks.json

In [0]:
# read the raw JSON, then repartition and write into a tmp parquet folder
spark.read.json("/tmp/StreamingDataQuality/stockTicks.json").repartition(100).write.mode("overwrite").parquet("/tmp/StreamingDataQuality/source/")

First we'll set up our delta tables and any necessary temporary views, as well as importing the packages to be used.

In [0]:
%scala
import spark.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.concat
import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel, CheckStatus}
import com.amazon.deequ.suggestions.{ConstraintSuggestionRunner, Rules}
import com.amazon.deequ.analyzers._
import com.amazon.deequ.analyzers.runners.AnalysisRunner
import com.amazon.deequ.analyzers.runners.AnalyzerContext.successMetricsAsDataFrame
import com.amazon.deequ.analyzers.{Analysis, ApproxCountDistinct, Completeness, Compliance, Distinctness, InMemoryStateProvider, Size}

val data_path = "/tmp/StreamingDataQuality/source/"
val checkpoint_path = "/tmp/StreamingDataQuality/checkpoint/"
val base_df = spark.read.parquet(data_path)
val empty_df = base_df.where("0 = 1")
val l1: Long = 0

spark.sql("DROP TABLE IF EXISTS trades_delta")
spark.sql("DROP TABLE IF EXISTS bad_records")
spark.sql("DROP TABLE IF EXISTS deequ_metrics")

base_df.createOrReplaceTempView("trades_historical")
empty_df.write.format("delta").saveAsTable("trades_delta")
empty_df.withColumn("batchID",lit(l1)).write.format("delta").saveAsTable("bad_records")
dbutils.fs.mkdirs(checkpoint_path)

First, we'll take a look at the suggested quality constraints that Deequ can automatically generate. Deequ will inspect the data you give it, and generate constraints that assume future data should look similar.

In [0]:
%scala
val suggestionResult = ConstraintSuggestionRunner()
  .onData(spark.sql("SELECT * FROM trades_historical"))
  .addConstraintRules(Rules.DEFAULT)
  .run()

suggestionResult.constraintSuggestions.foreach { case (column, suggestions) =>
  suggestions.foreach { suggestion =>
    println(s"Constraint suggestion for '$column':\t${suggestion.description}\n" +
      s"The corresponding scala code is ${suggestion.codeForConstraint}\n")
  }
}

Currently, Deequ leaves it to us to decide which of these constraints to actually use. We'll choose a few to run on our full dataset. We'll also set up a few other pieces provided by Deequ to hold our stateful metrics.

In [0]:
%scala
// create a stateStore to hold our stateful metrics
val stateStoreCurr = InMemoryStateProvider()
val stateStoreNext = InMemoryStateProvider()

// create the analyzer to run on the streaming data
val analysis = Analysis()
.addAnalyzer(Size())
.addAnalyzer(ApproxCountDistinct("symbol"))
.addAnalyzer(Distinctness("symbol"))
.addAnalyzer(Completeness("ipaddr"))
.addAnalyzer(Completeness("quantity"))
.addAnalyzer(Completeness("price"))
.addAnalyzer(Compliance("top quantity", "quantity >= 0"))

Now that everything is in place, we can run the stream to populate our delta table. 

Note that before running this cell, it is preferable to run the other streaming cells below first, so that they will consume all of the records from this producer.

In [0]:
%scala
// parse the schema for the source parquet
val schema = base_df.schema

// start the stream
spark.readStream
.schema(schema)
.format("parquet")
.option("maxFilesPerTrigger",1)
.load(data_path)
.writeStream.format("delta")
.option("failOnDataLoss", false)
.option("checkpointLocation", checkpoint_path)
.format("delta").table("trades_delta")

We now need to read the delta table we just created, so that we can apply the Deequ analysis to this data. To do this, we first read the previous delta table as a stream, and then use foreachBatch to do the following:
- Set up the stateStores
- Run our analysis on the current batch
- Run a unit validation on the current batch
- If unit verification fails, add the batch to the bad records table
- Update the metrics table with the current batch

This cell writes to two tables: bad_records (which contains records from any batch that fails validation) and deequ_metrics (which contains the latest aggregated metrics from all streaming records).

In [0]:
%scala
// read the delta table and analyze
spark.readStream
.format("delta")
.table("trades_delta")
.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  
  // reassign our current state to the previous next state
  val stateStoreCurr = stateStoreNext
  
  // run our analysis on the current batch, aggregate with saved state
  val metricsResult = AnalysisRunner.run(
    data = batchDF,
    analysis = analysis,
    aggregateWith = Some(stateStoreCurr),
    saveStatesWith = Some(stateStoreNext))
  
  // verify critical metrics for this microbatch i.e., trade quantity, ipaddr not null, etc.
  val verificationResult = VerificationSuite()
  .onData(batchDF)
  .addCheck(
    Check(CheckLevel.Error, "unitTest")
      .hasMax("quantity", _ <= 10000) // max is 10000
      .hasCompleteness("ipaddr", _ >= 0.95) // 95%+ non-null IPs
      .isNonNegative("quantity")) // should not contain negative values
    .run()
  
  // if verification fails, write batch to bad records table
  if (verificationResult.status != CheckStatus.Success) {
    batchDF.withColumn("batchID",lit(batchId))
    .write.format("delta").mode("append").saveAsTable("bad_records")
  }
  
  // get the current metrics as a dataframe
  val metric_results = successMetricsAsDataFrame(spark, metricsResult)
  .withColumn("ts", current_timestamp())
  
  // write the current results into the metrics table
  metric_results.write.format("delta").mode("Overwrite").saveAsTable("deequ_metrics")

}
.start()

Now, we can visualize the metrics. Note that because we are updating the table, we need to set `ignoreChanges` to `true`. This means each update of the metrics will be written as a duplicate entry; we can parse this out to only take the latest view, or we can use this to create a time series view of the data quality.

In [0]:
%scala
display(spark.readStream.format("delta")
        .option("ignoreChanges", "true")
        .table("deequ_metrics")
        .where($"name" === "Size" || $"name" === "ApproxCountDistinct"))

entity,instance,name,value,ts
Column,symbol,ApproxCountDistinct,778.0,2019-10-14T14:27:04.009+0000
Dataset,*,Size,798.0,2019-10-14T14:27:04.009+0000
Column,symbol,ApproxCountDistinct,2124.0,2019-10-14T14:27:26.832+0000
Dataset,*,Size,2798.0,2019-10-14T14:27:26.832+0000
Dataset,*,Size,4500.0,2019-10-14T14:27:43.096+0000
Column,symbol,ApproxCountDistinct,3054.0,2019-10-14T14:27:43.096+0000
Dataset,*,Size,5300.0,2019-10-14T14:28:00.349+0000
Column,symbol,ApproxCountDistinct,3355.0,2019-10-14T14:28:00.349+0000
Dataset,*,Size,6798.0,2019-10-14T14:28:17.592+0000
Column,symbol,ApproxCountDistinct,3965.0,2019-10-14T14:28:17.592+0000


In [0]:
%scala
display(spark.readStream.format("delta")
        .option("ignoreChanges", "true")
        .table("deequ_metrics")
        .where($"name" === "Completeness" || $"name" === "Distinctness"))

entity,instance,name,value,ts
Column,quantity,Completeness,0.9473684210526316,2019-10-14T14:27:04.009+0000
Column,price,Completeness,0.9799498746867168,2019-10-14T14:27:04.009+0000
Column,symbol,Distinctness,0.943609022556391,2019-10-14T14:27:04.009+0000
Column,ipaddr,Completeness,0.9473684210526316,2019-10-14T14:27:04.009+0000
Column,price,Completeness,0.9796283059328093,2019-10-14T14:27:26.832+0000
Column,ipaddr,Completeness,0.9610436025732666,2019-10-14T14:27:26.832+0000
Column,quantity,Completeness,0.9610436025732666,2019-10-14T14:27:26.832+0000
Column,symbol,Distinctness,0.8070050035739814,2019-10-14T14:27:26.832+0000
Column,price,Completeness,0.9793333333333332,2019-10-14T14:27:43.096+0000
Column,quantity,Completeness,0.9631111111111113,2019-10-14T14:27:43.096+0000


In [0]:
%scala
display(spark.readStream.format("delta")
        .option("ignoreChanges", "true")
        .table("deequ_metrics")
        .where($"name" === "Completeness" || $"name" === "Distinctness"))

entity,instance,name,value,ts
Column,ipaddr,Completeness,0.9473684210526316,2019-10-14T14:27:04.009+0000
Column,symbol,Distinctness,0.943609022556391,2019-10-14T14:27:04.009+0000
Column,price,Completeness,0.9799498746867168,2019-10-14T14:27:04.009+0000
Column,quantity,Completeness,0.9473684210526316,2019-10-14T14:27:04.009+0000
Column,symbol,Distinctness,0.8070050035739814,2019-10-14T14:27:26.832+0000
Column,quantity,Completeness,0.9610436025732666,2019-10-14T14:27:26.832+0000
Column,ipaddr,Completeness,0.9610436025732666,2019-10-14T14:27:26.832+0000
Column,price,Completeness,0.9796283059328093,2019-10-14T14:27:26.832+0000
Column,price,Completeness,0.9793333333333332,2019-10-14T14:27:43.096+0000
Column,symbol,Distinctness,0.7202222222222222,2019-10-14T14:27:43.096+0000


In [0]:
%scala
val batchCounts = spark.read.format("delta").table("bad_records")
.groupBy($"batchId").count().withColumnRenamed("batchId", "batchId2").withColumnRenamed("count", "total")

display(spark.read.format("delta").table("bad_records")
        .filter($"quantity" < 0 || $"quantity" > 10000 || $"ipaddr" === null)
        .groupBy($"batchId").count()
        .join(batchCounts, $"batchId2" === $"batchId", "inner")
        .withColumn("percent_bad", bround(lit(100)*$"count"/$"total",3))
        .drop("batchId2").orderBy(desc("percent_bad")))

batchId,count,total,percent_bad
3,6,800,0.75
2,6,1702,0.353
4,5,1498,0.334
1,6,2000,0.3
0,2,798,0.251
8,3,1300,0.231
5,4,2000,0.2
6,3,1701,0.176
7,2,1299,0.154


In [0]:
%scala
display(spark.readStream.format("delta").table("bad_records")
        .filter($"quantity" < 0 || $"quantity" > 10000 || $"ipaddr" === null))

timestamp,symbol,price,quantity,ordertype,ipaddr,buysell,batchID
2019-09-17T14:09:06.370+0000,PLT,26.691,-1.0,limit,25.248.90.194,buy,1
2019-09-17T14:09:06.395+0000,DGAS,10.88,-1.0,cmo,169.57.229.38,buy,1
2019-09-17T14:09:06.400+0000,TWLO,22.1793,-1.0,cmo,215.5.66.144,buy,1
2019-09-17T14:09:06.403+0000,HIMX,12.8466,-1.0,marketToLimit,144.109.213.132,sell,1
2019-09-17T14:09:25.672+0000,BOCH,12.881,-1.0,oco,218.55.44.89,sell,3
2019-09-17T14:09:25.682+0000,CXP,23.5872,-1.0,market,217.18.88.207,buy,3
2019-09-17T14:09:25.706+0000,HL,17.7628,-1.0,batch,236.38.102.150,sell,3
2019-09-17T14:09:25.707+0000,SEE,16.1892,-1.0,batch,48.174.126.90,sell,3
2019-09-17T14:09:49.856+0000,KIRK,24.9568,-1.0,cross,20.229.3.126,buy,7
2019-09-17T14:09:39.977+0000,SFR,28.7648,-1.0,limit,30.135.43.27,buy,5


In [0]:
%scala
val verificationResult: VerificationResult = { VerificationSuite()
  .onData(spark.sql("select * from trades_delta"))
  .addCheck(
    Check(CheckLevel.Error, "Review Check") 
      .hasMax("quantity", _ <= 10000) // max is 10000
      .hasCompleteness("quantity", _ >= 0.95) // should never be NULL
      .isUnique("ipaddr") // should not contain duplicates
      .hasCompleteness("ipaddr", _ >= 0.95)
      .isContainedIn("buysell", Array("buy","sell")) // contains only the listed values
      .isNonNegative("quantity")) // should not contain negative values
  .run()
}

// convert check results to a Spark data frame
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
display(resultDataFrame)

check,check_level,check_status,constraint,constraint_status,constraint_message
Review Check,Error,Error,"MaximumConstraint(Maximum(quantity,None))",Success,
Review Check,Error,Error,"CompletenessConstraint(Completeness(quantity,None))",Success,
Review Check,Error,Error,UniquenessConstraint(Uniqueness(List(ipaddr))),Failure,Value: 0.9276539132030461 does not meet the constraint requirement!
Review Check,Error,Error,"CompletenessConstraint(Completeness(ipaddr,None))",Success,
Review Check,Error,Error,"ComplianceConstraint(Compliance(buysell contained in buy,sell,`buysell` IS NULL OR `buysell` IN ('buy','sell'),None))",Success,
Review Check,Error,Error,"ComplianceConstraint(Compliance(quantity is non-negative,COALESCE(quantity, 0.0) >= 0,None))",Failure,Value: 0.996559317368566 does not meet the constraint requirement!


In [0]:
%scala
display(resultDataFrame)

check,check_level,check_status,constraint,constraint_status,constraint_message
Review Check,Error,Error,"MaximumConstraint(Maximum(quantity,None))",Success,
Review Check,Error,Error,"CompletenessConstraint(Completeness(quantity,None))",Success,
Review Check,Error,Error,UniquenessConstraint(Uniqueness(List(ipaddr))),Failure,Value: 0.9276539132030461 does not meet the constraint requirement!
Review Check,Error,Error,"CompletenessConstraint(Completeness(ipaddr,None))",Success,
Review Check,Error,Error,"ComplianceConstraint(Compliance(buysell contained in buy,sell,`buysell` IS NULL OR `buysell` IN ('buy','sell'),None))",Success,
Review Check,Error,Error,"ComplianceConstraint(Compliance(quantity is non-negative,COALESCE(quantity, 0.0) >= 0,None))",Failure,Value: 0.996559317368566 does not meet the constraint requirement!


MLFlow also works well as a tracking tool for quality metrics- we can use Databricks' built-in MLFlow runs to directly log parameters and metrics against our notebook. Cells 21 and 22 can be run multiple times to capture snapshots across delta versions and timestamps.

In [0]:
%scala
import io.delta.tables._
// get the path of the deequ_metrics delta table
val fullPath = spark.read.table("deequ_metrics").select(input_file_name).take(1)(0)(0).toString
val regPattern = "^(.+)/([^/]+)$".r
val regPattern(tablePath, fileName) = fullPath

// pull the current delta history and create a temp table (to be read by python)
val deltaTable = DeltaTable.forPath(spark, tablePath)
val lastOperationDF = deltaTable.history(1)
lastOperationDF.createOrReplaceTempView("deltaVersion")

In [0]:
import mlflow
import time
from pyspark.sql.functions import col

# get the deequ metrics table as a dataframe
rows = spark.read.table("deequ_metrics").collect()

# get the delta table version
delta_df = spark.read.table("deltaVersion")
ts = delta_df.select("timestamp").take(1)[0][0]
deltaVersion = delta_df.select("version").take(1)[0][0]

# get number of bad records
num_bad_recs = (spark.read.table("bad_records")
.filter("quantity < 0" or "quantity > 10000" or "ipaddr == ''").count())

# start a new mlflow run
with mlflow.start_run():

  mlflow.log_param("timestamp", ts)
  mlflow.log_param("delta_version", deltaVersion)
  mlflow.log_metric("num_bad_records", num_bad_recs)

  for i in range(len(rows)):

    # build the key-value pairs for the metrics
    instance = rows[i][1]
    name = rows[i][2]
    key = instance.replace("*","all") + "_" + name
    val = rows[i][3]

    # log the metric
    mlflow.log_metric(key, val)

We can even send alerts based on our data quality checks. Use Slack, email, or the client of your choice!

In [0]:
%scala
import com.github.mrpowers.spark.slack.Notifier

val webhookUrl = "<my.slack.webhook>"
val notifier = new Notifier(webhookUrl)

val num_fail = resultDataFrame.filter($"constraint_status" === "Failure").count()
val error_string = s""":rotating_light: Looks like you've got some Data Quality errors! :rotating_light: 
There were ${num_fail} errors in the latest unit test."""

if(num_fail > 0){
  notifier.speak(error_string,"Slackbot","alert","user.name")
}

Finally, we'll clean up the assets we created.

In [0]:
%scala
spark.sql("DROP TABLE IF EXISTS trades_delta")
spark.sql("DROP TABLE IF EXISTS bad_records")
spark.sql("DROP TABLE IF EXISTS deequ_metrics")
dbutils.fs.rm(checkpoint_path, true)