Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exposing Helpful Anomaly Detection Metadata from Anomaly Strategies (ie Anomaly Thresholds) #525

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.Calendar

import com.amazon.deequ.analyzers.{Analyzer, State}
import com.amazon.deequ.checks.Check
import com.amazon.deequ.constraints.{AnalysisBasedConstraint, Constraint, ConstraintDecorator}
import com.amazon.deequ.constraints.{AnalysisBasedConstraint, AnomalyBasedConstraint, Constraint, ConstraintDecorator}
import com.amazon.deequ.metrics.Metric
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
Expand Down Expand Up @@ -187,9 +187,13 @@ private[deequ] class Applicability(session: SparkSession) {
case (name, nc: ConstraintDecorator) => name -> nc.inner
case (name, c: Constraint) => name -> c
}
.collect { case (name, constraint: AnalysisBasedConstraint[_, _, _]) =>
val metric = constraint.analyzer.calculate(data).value
name -> metric
.collect {
case (name, constraint: AnalysisBasedConstraint[_, _, _]) =>
val metric = constraint.analyzer.calculate(data).value
name -> metric
case (name, constraint: AnomalyBasedConstraint[_, _, _]) =>
val metric = constraint.analyzer.calculate(data).value
name -> metric
}

val constraintApplicabilities = check.constraints.zip(namedMetrics).map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ trait AnomalyDetectionStrategy {
*/
def detect(
dataSeries: Vector[Double],
searchInterval: (Int, Int) = (0, Int.MaxValue)): Seq[(Int, Anomaly)]
searchInterval: (Int, Int) = (0, Int.MaxValue)): Seq[(Int, AnomalyDetectionDataPoint)]
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ case class AnomalyDetector(strategy: AnomalyDetectionStrategy) {
def isNewPointAnomalous(
historicalDataPoints: Seq[DataPoint[Double]],
newPoint: DataPoint[Double])
: DetectionResult = {
: AnomalyDetectionResult = {

require(historicalDataPoints.nonEmpty, "historicalDataPoints must not be empty!")

Expand All @@ -57,11 +57,7 @@ case class AnomalyDetector(strategy: AnomalyDetectionStrategy) {
val allDataPoints = sortedDataPoints :+ newPoint

// Run anomaly
val anomalies = detectAnomaliesInHistory(allDataPoints, (newPoint.time, Long.MaxValue))
.anomalies

// Create a Detection result with all anomalies
DetectionResult(anomalies)
detectAnomaliesInHistory(allDataPoints, (newPoint.time, Long.MaxValue))
}

/**
Expand All @@ -74,7 +70,7 @@ case class AnomalyDetector(strategy: AnomalyDetectionStrategy) {
def detectAnomaliesInHistory(
dataSeries: Seq[DataPoint[Double]],
searchInterval: (Long, Long) = (Long.MinValue, Long.MaxValue))
: DetectionResult = {
: AnomalyDetectionResult = {

def findIndexForBound(sortedTimestamps: Seq[Long], boundValue: Long): Int = {
sortedTimestamps.search(boundValue).insertionPoint
Expand All @@ -97,6 +93,6 @@ case class AnomalyDetector(strategy: AnomalyDetectionStrategy) {
val anomalies = strategy.detect(
sortedSeries.flatMap { _.metricValue }.toVector, (lowerBoundIndex, upperBoundIndex))

DetectionResult(anomalies.map { case (index, anomaly) => (sortedTimestamps(index), anomaly) })
AnomalyDetectionResult(anomalies.map { case (index, anomaly) => (sortedTimestamps(index), anomaly) })
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ trait BaseChangeStrategy
override def detect(
dataSeries: Vector[Double],
searchInterval: (Int, Int))
: Seq[(Int, Anomaly)] = {
: Seq[(Int, AnomalyDetectionDataPoint)] = {
val (start, end) = searchInterval

require(start <= end,
Expand All @@ -89,15 +89,24 @@ trait BaseChangeStrategy
val startPoint = Seq(start - order, 0).max
val data = diff(DenseVector(dataSeries.slice(startPoint, end): _*), order).data

data.zipWithIndex.filter { case (value, _) =>
(value < maxRateDecrease.getOrElse(Double.MinValue)
|| value > maxRateIncrease.getOrElse(Double.MaxValue))
}
.map { case (change, index) =>
(index + startPoint + order, Anomaly(Option(dataSeries(index + startPoint + order)), 1.0,
Some(s"[AbsoluteChangeStrategy]: Change of $change is not in bounds [" +
s"${maxRateDecrease.getOrElse(Double.MinValue)}, " +
s"${maxRateIncrease.getOrElse(Double.MaxValue)}]. Order=$order")))
val lowerBound = maxRateDecrease.getOrElse(Double.MinValue)
val upperBound = maxRateIncrease.getOrElse(Double.MaxValue)


data.zipWithIndex.map {
case (change, index) =>
val outputSequenceIndex = index + startPoint + order
val value = dataSeries(outputSequenceIndex)
val (detail, isAnomaly) = if (change < lowerBound || change > upperBound) {
(Some(s"[AbsoluteChangeStrategy]: Change of $change is not in bounds [" +
s"$lowerBound, " +
s"$upperBound]. Order=$order"), true)
}
else {
(None, false)
}
(outputSequenceIndex, AnomalyDetectionDataPoint(value, change,
AnomalyThreshold(lowerBound = Bound(lowerBound), upperBound = Bound(upperBound)), isAnomaly, 1.0, detail))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ case class BatchNormalStrategy(
*/
override def detect(
dataSeries: Vector[Double],
searchInterval: (Int, Int)): Seq[(Int, Anomaly)] = {
searchInterval: (Int, Int)): Seq[(Int, AnomalyDetectionDataPoint)] = {

val (searchStart, searchEnd) = searchInterval

Expand Down Expand Up @@ -83,13 +83,15 @@ case class BatchNormalStrategy(

dataSeries.zipWithIndex
.slice(searchStart, searchEnd)
.filter { case (value, _) => value > upperBound || value < lowerBound }
.map { case (value, index) =>

val detail = Some(s"[BatchNormalStrategy]: Value $value is not in " +
s"bounds [$lowerBound, $upperBound].")

(index, Anomaly(Option(value), 1.0, detail))
val (detail, isAnomaly) = if (value > upperBound || value < lowerBound) {
(Some(s"[BatchNormalStrategy]: Value $value is not in " +
s"bounds [$lowerBound, $upperBound]."), true)
} else {
(None, false)
}
(index, AnomalyDetectionDataPoint(value, value,
AnomalyThreshold(lowerBound = Bound(lowerBound), upperBound = Bound(upperBound)), isAnomaly, 1.0, detail))
}
}
}
107 changes: 93 additions & 14 deletions src/main/scala/com/amazon/deequ/anomalydetection/DetectionResult.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,120 @@

package com.amazon.deequ.anomalydetection

class Anomaly(
val value: Option[Double],
val confidence: Double,
val detail: Option[String] = None) {

/**
* Anomaly Detection Data Point class (previously Anomaly)
*
* @param dataMetricValue The metric value that is the data point
* @param anomalyMetricValue The metric value that is being used in the anomaly calculation.
* This usually aligns with dataMetricValue but not always,
* like in a rate of change strategy where the rate of change is the anomaly metric
* which may not equal the actual data point value
* @param anomalyThreshold The thresholds used in the anomaly check, the anomalyMetricValue is
* compared to this threshold
* @param isAnomaly If the data point is an anomaly
* @param confidence TODO fill in more info about this
* @param detail Detailed error message
*/
class AnomalyDetectionDataPoint(
val dataMetricValue: Double,
val anomalyMetricValue: Double,
val anomalyThreshold: AnomalyThreshold,
val isAnomaly: Boolean,
val confidence: Double,
val detail: Option[String]) {

def canEqual(that: Any): Boolean = {
that.isInstanceOf[Anomaly]
that.isInstanceOf[AnomalyDetectionDataPoint]
}

/**
* Tests anomalies for equality. Ignores detailed explanation.
* Tests anomalyDetectionDataPoints for equality. Ignores detailed explanation.
*
* @param obj The object/ anomaly to compare against
* @return true, if and only if the value and confidence are the same
* @return true, if and only if the dataMetricValue, anomalyMetricValue, anomalyThreshold, isAnomaly
* and confidence are the same
*/
override def equals(obj: Any): Boolean = {
obj match {
case anomaly: Anomaly => anomaly.value == value && anomaly.confidence == confidence
case anomaly: AnomalyDetectionDataPoint => anomaly.dataMetricValue == dataMetricValue &&
anomaly.anomalyMetricValue == anomalyMetricValue &&
anomaly.anomalyThreshold == anomalyThreshold &&
anomaly.isAnomaly == isAnomaly &&
anomaly.confidence == confidence
case _ => false
}
}

override def hashCode: Int = {
val prime = 31
var result = 1
result = prime * result + (if (value == null) 0 else value.hashCode)
prime * result + confidence.hashCode
result = prime * result + dataMetricValue.hashCode()
result = prime * result + anomalyMetricValue.hashCode()
result = prime * result + anomalyThreshold.hashCode()
result = prime * result + isAnomaly.hashCode()
result = prime * result + confidence.hashCode()
result
}

}

object Anomaly {
def apply(value: Option[Double], confidence: Double, detail: Option[String] = None): Anomaly = {
new Anomaly(value, confidence, detail)
object AnomalyDetectionDataPoint {
def apply(dataMetricValue: Double, anomalyMetricValue: Double,
anomalyThreshold: AnomalyThreshold = AnomalyThreshold(), isAnomaly: Boolean = false,
confidence: Double, detail: Option[String] = None
): AnomalyDetectionDataPoint = {
new AnomalyDetectionDataPoint(dataMetricValue, anomalyMetricValue, anomalyThreshold, isAnomaly, confidence, detail)
}
}

case class DetectionResult(anomalies: Seq[(Long, Anomaly)] = Seq.empty)

/**
* AnomalyThreshold class
* Defines threshold for the anomaly detection, defaults to inclusive bounds of Double.Min and Double.Max
* @param upperBound The upper bound or threshold
* @param lowerBound The lower bound or threshold
*/
case class AnomalyThreshold(lowerBound: Bound = Bound(Double.MinValue), upperBound: Bound = Bound(Double.MaxValue))

/**
* Bound Class
* Class representing a threshold/bound, with value and inclusive/exclusive boolean
* @param value The value of the bound as a Double
* @param inclusive Boolean indicating if the Bound is inclusive or not
*/
case class Bound(value: Double, inclusive: Boolean = true)



/**
* AnomalyDetectionResult Class
* This class is returned from the detectAnomaliesInHistory function
* @param anomalyDetectionDataPointSequence The sequence of (timestamp, anomaly) pairs
*/
case class AnomalyDetectionResult(anomalyDetectionDataPointSequence: Seq[(Long, AnomalyDetectionDataPoint)] = Seq.empty)

/**
* AnomalyDetectionAssertionResult Class
* This class is returned by the anomaly detection assertion function
* @param hasNoAnomaly Boolean indicating if anomaly was detected
* @param anomalyDetectionMetadata Anomaly Detection metadata class containing anomaly details
* about the data point being checked
*/
case class AnomalyDetectionAssertionResult(hasNoAnomaly: Boolean, anomalyDetectionMetadata: AnomalyDetectionMetadata)


/**
* AnomalyDetectionMetadata Class
* This class containst anomaly detection metadata and is currently an optional field
* in the ConstraintResult class that is exposed to users
*
* Currently, anomaly detection only runs on "newest" data point (referring to the dataframe being
* run on by the verification suite) and not multiple data points, so this metadata class only contains
* one anomalyDetectionDataPoint for now
* In the future, if we allow the anomaly check to detect multiple points, we can return the anomalyDetectionResult
* instead, which contains a sequence of (Long, AnomalyDetectionDataPoints)
* @param anomalyDetectionDataPoint Anomaly detection data point
*/
case class AnomalyDetectionMetadata(anomalyDetectionDataPoint: AnomalyDetectionDataPoint)

Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ case class OnlineNormalStrategy(
override def detect(
dataSeries: Vector[Double],
searchInterval: (Int, Int))
: Seq[(Int, Anomaly)] = {
: Seq[(Int, AnomalyDetectionDataPoint)] = {

val (searchStart, searchEnd) = searchInterval

Expand All @@ -139,7 +139,6 @@ case class OnlineNormalStrategy(
computeStatsAndAnomalies(dataSeries, searchInterval)
.zipWithIndex
.slice(searchStart, searchEnd)
.filter { case (result, _) => result.isAnomaly }
.map { case (calcRes, index) =>
val lowerBound =
calcRes.mean - lowerDeviationFactor.getOrElse(Double.MaxValue) * calcRes.stdDev
Expand All @@ -149,7 +148,11 @@ case class OnlineNormalStrategy(
val detail = Some(s"[OnlineNormalStrategy]: Value ${dataSeries(index)} is not in " +
s"bounds [$lowerBound, $upperBound].")

(index, Anomaly(Option(dataSeries(index)), 1.0, detail))
val value = dataSeries(index)

(index, AnomalyDetectionDataPoint(value, value,
AnomalyThreshold(lowerBound = Bound(lowerBound), upperBound = Bound(upperBound)),
calcRes.isAnomaly, 1.0, detail))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,25 @@ case class SimpleThresholdStrategy(
*/
override def detect(
dataSeries: Vector[Double],
searchInterval: (Int, Int)): Seq[(Int, Anomaly)] = {
searchInterval: (Int, Int)): Seq[(Int, AnomalyDetectionDataPoint)] = {

val (searchStart, searchEnd) = searchInterval

require (searchStart <= searchEnd, "The start of the interval can't be larger than the end.")

dataSeries.zipWithIndex
.slice(searchStart, searchEnd)
.filter { case (value, _) => value < lowerBound || value > upperBound }
.map { case (value, index) =>

val detail = Some(s"[SimpleThresholdStrategy]: Value $value is not in " +
s"bounds [$lowerBound, $upperBound]")
val (detail, isAnomaly) = if ( value < lowerBound || value > upperBound ) {
(Some(s"[SimpleThresholdStrategy]: Value $value is not in " +
s"bounds [$lowerBound, $upperBound]"), true)
} else {
(None, false)
}

(index, Anomaly(Option(value), 1.0, detail))
(index, AnomalyDetectionDataPoint(value, value,
AnomalyThreshold(lowerBound = Bound(lowerBound), upperBound = Bound(upperBound)), isAnomaly, 1.0, detail))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.amazon.deequ.anomalydetection.seasonal

import breeze.linalg.DenseVector
import breeze.optimize.{ApproximateGradientFunction, DiffFunction, LBFGSB}
import com.amazon.deequ.anomalydetection.{Anomaly, AnomalyDetectionStrategy}
import com.amazon.deequ.anomalydetection.{AnomalyDetectionDataPoint, AnomalyDetectionStrategy, AnomalyThreshold, Bound}

import collection.mutable.ListBuffer

Expand Down Expand Up @@ -178,17 +178,27 @@ class HoltWinters(
forecasts: Seq[Double],
startIndex: Int,
residualSD: Double)
: Seq[(Int, Anomaly)] = {
: Seq[(Int, AnomalyDetectionDataPoint)] = {

testSeries.zip(forecasts).zipWithIndex
.collect { case ((inputValue, forecastedValue), detectionIndex)
if math.abs(inputValue - forecastedValue) > 1.96 * residualSD =>

detectionIndex + startIndex -> Anomaly(
value = Some(inputValue),
confidence = 1.0,
detail = Some(s"Forecasted $forecastedValue for observed value $inputValue")
.collect { case ((inputValue, forecastedValue), detectionIndex) =>
val anomalyMetricValue = math.abs(inputValue - forecastedValue)
val upperBound = 1.96 * residualSD

val (detail, isAnomaly) = if (anomalyMetricValue > upperBound) {
(Some(s"Forecasted $forecastedValue for observed value $inputValue"), true)
} else {
(None, false)
}
detectionIndex + startIndex -> AnomalyDetectionDataPoint(
dataMetricValue = inputValue,
anomalyMetricValue = anomalyMetricValue,
anomalyThreshold = AnomalyThreshold(upperBound = Bound(upperBound)),
isAnomaly = isAnomaly,
confidence = 1.0,
detail = detail
)

}
}

Expand All @@ -202,7 +212,7 @@ class HoltWinters(
override def detect(
dataSeries: Vector[Double],
searchInterval: (Int, Int) = (0, Int.MaxValue))
: Seq[(Int, Anomaly)] = {
: Seq[(Int, AnomalyDetectionDataPoint)] = {

require(dataSeries.nonEmpty, "Provided data series is empty")

Expand Down
Loading