forked from awslabs/deequ
-
Notifications
You must be signed in to change notification settings - Fork 0
/
AnomalyBasedConstraint.scala
134 lines (116 loc) · 5.95 KB
/
AnomalyBasedConstraint.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/**
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
* is located at
*
* http://aws.amazon.com/apache2.0/
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/
package com.amazon.deequ.constraints
import com.amazon.deequ.analyzers.{Analyzer, State}
import com.amazon.deequ.anomalydetection.AnomalyDetectionAssertionResult
import com.amazon.deequ.metrics.Metric
import org.apache.spark.sql.DataFrame
import scala.util.{Failure, Success}
/**
* Case class for anomaly based constraints that provides unified way to access
* AnalyzerContext and metrics stored in it.
* TODO this differs from AnalysisBasedConstraint only in that it uses an assertion function that
* returns an AnomalyAssertionResult with an anomaly detection metadata as well as the assertion boolean.
* Figure out if it's better to use some inheritance/composition from a common trait/abstract class.
*
* Runs the analysis and get the value of the metric returned by the analysis,
* picks the numeric value that will be used in the assertion function with metric picker
* runs the assertion.
*
* @param analyzer Analyzer to be run on the data frame
* @param assertion Assertion function that returns an AnomalyDetectionAssertionResult with
* anomaly detection metadata as well as the assertion boolean
* @param valuePicker Optional function to pick the interested part of the metric value that the
* assertion will be running on. Absence of such function means the metric
* value would be used in the assertion as it is.
* @param hint A hint to provide additional context why a constraint could have failed
* @tparam M : Type of the metric value generated by the Analyzer
* @tparam V : Type of the value being used in assertion function
*
*/
private[deequ] case class AnomalyBasedConstraint[S <: State[S], M, V](
analyzer: Analyzer[S, Metric[M]],
private[deequ] val assertion: V => AnomalyDetectionAssertionResult,
private[deequ] val valuePicker: Option[M => V] = None,
private[deequ] val hint: Option[String] = None)
extends Constraint {
private[deequ] def calculateAndEvaluate(data: DataFrame) = {
val metric = analyzer.calculate(data)
evaluate(Map(analyzer -> metric))
}
override def evaluate(
analysisResults: Map[Analyzer[_, Metric[_]], Metric[_]])
: ConstraintResult = {
val metric = analysisResults.get(analyzer).map(_.asInstanceOf[Metric[M]])
metric.map(pickValueAndAssert).getOrElse(
// Analysis is missing
ConstraintResult(this, ConstraintStatus.Failure,
message = Some(AnomalyBasedConstraint.MissingAnalysis), metric = metric)
)
}
private[this] def pickValueAndAssert(metric: Metric[M]): ConstraintResult = {
metric.value match {
// Analysis done successfully and result metric is there
case Success(metricValue) =>
try {
val assertOn = runPickerOnMetric(metricValue)
val anomalyAssertionResult = runAssertion(assertOn)
if (anomalyAssertionResult.hasNoAnomaly) {
ConstraintResult(this, ConstraintStatus.Success, metric = Some(metric),
anomalyDetectionMetadata = Some(anomalyAssertionResult.anomalyDetectionMetadata))
} else {
var errorMessage = s"Value: $assertOn does not meet the constraint requirement," +
s" check the anomaly detection metadata!"
hint.foreach(hint => errorMessage += s" $hint")
ConstraintResult(this, ConstraintStatus.Failure, Some(errorMessage), Some(metric),
anomalyDetectionMetadata = Some(anomalyAssertionResult.anomalyDetectionMetadata))
}
} catch {
case AnomalyBasedConstraint.ConstraintAssertionException(msg) =>
ConstraintResult(this, ConstraintStatus.Failure,
message = Some(s"${AnomalyBasedConstraint.AssertionException}: $msg!"), metric = Some(metric))
case AnomalyBasedConstraint.ValuePickerException(msg) =>
ConstraintResult(this, ConstraintStatus.Failure,
message = Some(s"${AnomalyBasedConstraint.ProblematicMetricPicker}: $msg!"), metric = Some(metric))
}
// An exception occurred during analysis
case Failure(e) => ConstraintResult(this,
ConstraintStatus.Failure, message = Some(e.getMessage), metric = Some(metric))
}
}
private def runPickerOnMetric(metricValue: M): V =
try {
valuePicker.map(function => function(metricValue)).getOrElse(metricValue.asInstanceOf[V])
} catch {
case e: Exception => throw AnomalyBasedConstraint.ValuePickerException(e.getMessage)
}
private def runAssertion(assertOn: V): AnomalyDetectionAssertionResult =
try {
assertion(assertOn)
} catch {
case e: Exception => throw AnomalyBasedConstraint.ConstraintAssertionException(e.getMessage)
}
// 'assertion' and 'valuePicker' are lambdas we have to represent them like '<function1>'
override def toString: String =
s"AnomalyBasedConstraint($analyzer,<function1>,${valuePicker.map(_ => "<function1>")},$hint)"
}
private[deequ] object AnomalyBasedConstraint {
val MissingAnalysis = "Missing Analysis, can't run the constraint!"
val ProblematicMetricPicker = "Can't retrieve the value to assert on"
val AssertionException = "Can't execute the assertion"
private case class ValuePickerException(message: String) extends RuntimeException(message)
private case class ConstraintAssertionException(message: String) extends RuntimeException(message)
}