Skip to content

Commit

Permalink
Example for anomaly detection (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
sscdotopen committed Oct 1, 2018
1 parent 4ddd083 commit 0893c68
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 3 deletions.
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -116,15 +116,15 @@ The test found that our assumptions are violated! Only 4 out of 5 (80%) of the v

We are in the process of adding [more examples](src/main/scala/com/amazon/deequ/examples/), especially for the advanced features listed below. So far, we showcase the following functionality:

* [Storing computed metrics of the data in a MetricsRepository](https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/examples/metrics_repository_example.md) and retrieving them again
* [Persistence and querying of computed metrics of the data with a MetricsRepository](https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/examples/metrics_repository_example.md)
* [Data profiling](https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/examples/data_profiling_example.md) of large data sets
* [Anomaly detection](https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/examples/anomaly_detection_example.md) on data quality metrics over time
* [Incremental metrics computation on growing data and metric updates on partitioned data](https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/examples/algebraic_states_example.md) (advanced)

## Advanced features

Our library contains much more than what we showed in the basic example. We will add examples for the following advanced features soon:

* [Anomaly detection](src/main/scala/com/amazon/deequ/anomalydetection)
* Automatic [suggestion of constraints](https://github.com/awslabs/deequ/tree/master/src/main/scala/com/amazon/deequ/suggestions)

## Citation
Expand Down
@@ -0,0 +1,94 @@
/**
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
* is located at
*
* http://aws.amazon.com/apache2.0/
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.deequ.examples

import com.amazon.deequ.VerificationSuite
import com.amazon.deequ.analyzers.Size
import com.amazon.deequ.anomalydetection.RateOfChangeStrategy
import com.amazon.deequ.examples.ExampleUtils.{itemsAsDataframe, withSpark}
import com.amazon.deequ.repository.ResultKey
import com.amazon.deequ.repository.memory.InMemoryMetricsRepository
import com.amazon.deequ.checks.CheckStatus._

private[examples] object AnomalyDetectionExample extends App {

withSpark { session =>

/* In this simple example, we assume that we compute metrics on a dataset every day and we want
to ensure that they don't change drastically. For sake of simplicity, we just look at the
size of the data */

/* Anomaly detection operates on metrics stored in a metric repository, so lets create one */
val metricsRepository = new InMemoryMetricsRepository()

/* This is the key which we use to store the metrics for the dataset from yesterday */
val yesterdaysKey = ResultKey(System.currentTimeMillis() - 24 * 60 * 1000)

/* Yesterday, the data had only two rows */
val yesterdaysDataset = itemsAsDataframe(session,
Item(1, "Thingy A", "awesome thing.", "high", 0),
Item(2, "Thingy B", "available at http://thingb.com", null, 0))

/* We test for anomalies in the size of the data, it should not increase by more than 2x. Note
that we store the resulting metrics in our repository */
VerificationSuite()
.onData(yesterdaysDataset)
.useRepository(metricsRepository)
.saveOrAppendResult(yesterdaysKey)
.addAnomalyCheck(
RateOfChangeStrategy(maxRateIncrease = Some(2.0)),
Size()
)
.run()

/* Todays data has five rows, so the data size more than doubled and our anomaly check should
catch this */
val todaysDataset = itemsAsDataframe(session,
Item(1, "Thingy A", "awesome thing.", "high", 0),
Item(2, "Thingy B", "available at http://thingb.com", null, 0),
Item(3, null, null, "low", 5),
Item(4, "Thingy D", "checkout https://thingd.ca", "low", 10),
Item(5, "Thingy E", null, "high", 12))

/* The key for today's result */
val todaysKey = ResultKey(System.currentTimeMillis())

/* Repeat the anomaly check for today's data */
val verificationResult = VerificationSuite()
.onData(todaysDataset)
.useRepository(metricsRepository)
.saveOrAppendResult(todaysKey)
.addAnomalyCheck(
RateOfChangeStrategy(maxRateIncrease = Some(2.0)),
Size()
)
.run()

/* Did we find an anomaly? */
if (verificationResult.status != Success) {
println("Anomaly detected in the Size() metric!")

/* Lets have a look at the actual metrics. */
metricsRepository
.load()
.forAnalyzers(Seq(Size()))
.getSuccessMetricsAsDataFrame(session)
.show()
}
}

}
Expand Up @@ -25,7 +25,7 @@ private[examples] object DataProfilingExample extends App {

withSpark { session =>

// We profile raw data, mostly in string format (e.g., from a csv file)
/* We profile raw data, mostly in string format (e.g., from a csv file) */
val rows = session.sparkContext.parallelize(Seq(
RawData("thingA", "13.0", "IN_TRANSIT", "true"),
RawData("thingA", "5", "DELAYED", "false"),
Expand Down
@@ -0,0 +1,80 @@
# Anomaly detection

Very often, it is hard to exactly define what constraints we want to evaluate on our data. However, we often have a better understanding of how much change we expect in certain metrics of our data. Therefore, **deequ** supports anomaly detection for data quality metrics. The idea is that we regularly store the metrics of our data in a [MetricsRepository](https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/examples/metrics_repository_example.md). Once we do that, we can run anomaly checks that compare the current value of the metric to its values in the past and allow us to detect anomalous changes.

In this simple example, we assume that we compute the size of a dataset every day and we want to ensure that it does not change drastically: the number of rows on a given day should not be more than double of what we have seen on the day before.

Anomaly detection operates on metrics stored in a metrics repository, so lets create one.
```scala
val metricsRepository = new InMemoryMetricsRepository()
```

This is our fictious data from yesterday which only has only two rows.
```scala
val yesterdaysDataset = itemsAsDataframe(session,
Item(1, "Thingy A", "awesome thing.", "high", 0),
Item(2, "Thingy B", "available at http://thingb.com", null, 0))
```

We test for anomalies in the size of the data, and want to enforce that it should not increase by more than 2x. We define a check for this by using the [RateOfChangeStrategy](https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/anomalydetection/RateOfChangeStrategy.scala) for detecting anomalies. Note that we store the resulting metrics in our repository via `useRepository` and `saveOrAppendResult` under a result key `yesterdaysKey` with yesterdays timestamp.
```scala
val yesterdaysKey = ResultKey(System.currentTimeMillis() - 24 * 60 * 1000)

VerificationSuite()
.onData(yesterdaysDataset)
.useRepository(metricsRepository)
.saveOrAppendResult(yesterdaysKey)
.addAnomalyCheck(
RateOfChangeStrategy(maxRateIncrease = Some(2.0)),
Size())
.run()
```

The fictious data of today has five rows, so the data size more than doubled and our anomaly check should
catch this.
```scala
val todaysDataset = itemsAsDataframe(session,
Item(1, "Thingy A", "awesome thing.", "high", 0),
Item(2, "Thingy B", "available at http://thingb.com", null, 0),
Item(3, null, null, "low", 5),
Item(4, "Thingy D", "checkout https://thingd.ca", "low", 10),
Item(5, "Thingy E", null, "high", 12))
```
We repeat the anomaly check using our metrics repository.
```scala
val todaysKey = ResultKey(System.currentTimeMillis())

val verificationResult = VerificationSuite()
.onData(todaysDataset)
.useRepository(metricsRepository)
.saveOrAppendResult(todaysKey)
.addAnomalyCheck(
RateOfChangeStrategy(maxRateIncrease = Some(2.0)),
Size())
.run()
```

We can now have a look at the `status` of the result of the verification to see if your check caught an anomaly (it should have). We print the contents of our metrics repository in that case.
```scala
if (verificationResult.status != Success) {
println("Anomaly detected in the Size() metric!")

metricsRepository
.load()
.forAnalyzers(Seq(Size()))
.getSuccessMetricsAsDataFrame(session)
.show()
}
```

We see that the following metrics are stored in the repository, which shows us the reason the anomaly: the data size increased from 2 to 5!
```
+-------+--------+----+-----+-------------+
| entity|instance|name|value| dataset_date|
+-------+--------+----+-----+-------------+
|Dataset| *|Size| 2.0|1538384009558|
|Dataset| *|Size| 5.0|1538385453983|
+-------+--------+----+-----+-------------+
```

An [executable version of this example](https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/examples/AnomalyDetectionExample.scala) is available as part of our code base. We also provide more [anomaly detection strategies](https://github.com/awslabs/deequ/tree/master/src/main/scala/com/amazon/deequ/anomalydetection).
Expand Up @@ -27,6 +27,7 @@ class ExamplesTest extends WordSpec {
MetricsRepositoryExample.main(Array.empty)
UpdateMetricsOnPartitionedDataExample.main(Array.empty)
DataProfilingExample.main(Array.empty)
AnomalyDetectionExample.main(Array.empty)
}
}

Expand Down

0 comments on commit 0893c68

Please sign in to comment.