# Install deequ

In [1]:
import $ivy.`com.amazon.deequ:deequ:2.0.1-spark-3.2`

[32mimport [39m[36m$ivy.$                                       [39m

## spark setup

In [2]:
import org.apache.log4j.Logger
import org.apache.log4j.Level

Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

[32mimport [39m[36morg.apache.log4j.Logger
[39m
[32mimport [39m[36morg.apache.log4j.Level

[39m

In [3]:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
      .master("local")
      .appName("test")
      .config("spark.ui.enabled", "false")
      .getOrCreate()
    spark.sparkContext.setCheckpointDir(System.getProperty("java.io.tmpdir"))
import spark.implicits._

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties


[32mimport [39m[36morg.apache.spark.sql.SparkSession
[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@563fc0da
[32mimport [39m[36mspark.implicits._[39m

In [4]:
import org.apache.spark.sql.DataFrame
import almond.interpreter.api.DisplayData

def show(df:DataFrame) {
    val header = df.columns.map( columnName => s"<td><b>${columnName.toString}<b></td>").mkString("")
    val tableRows = df.collect.map(r => {
        r.toSeq.map(value =>s"<td>$value</td>").mkString("")
    }).map(trow=> s"<tr>$trow<tr>").mkString("")
    val htmlTable = s"<html><table><tr>${header}<tr>$tableRows</table></html>"

    display(DisplayData(
    Map(
      "text/html" -> htmlTable)))
}

[32mimport [39m[36morg.apache.spark.sql.DataFrame
[39m
[32mimport [39m[36malmond.interpreter.api.DisplayData

[39m
defined [32mfunction[39m [36mshow[39m

## read data


Both hour.csv and day.csv have the following fields, except hr which is not available in day.csv

- instant: record index
- dteday : date
- season : season (1:winter, 2:spring, 3:summer, 4:fall)
- yr : year (0: 2011, 1:2012)
- mnth : month ( 1 to 12)
- hr : hour (0 to 23)
- holiday : weather day is holiday or not (extracted from [Web Link])
- weekday : day of the week
- workingday : if day is neither weekend nor holiday is 1, otherwise is 0.
+ weathersit :
- 1: Clear, Few clouds, Partly cloudy, Partly cloudy
- 2: Mist + Cloudy, Mist + Broken clouds, Mist + Few clouds, Mist
- 3: Light Snow, Light Rain + Thunderstorm + Scattered clouds, Light Rain + Scattered clouds
- 4: Heavy Rain + Ice Pallets + Thunderstorm + Mist, Snow + Fog
- temp : Normalized temperature in Celsius. The values are derived via (t-t_min)/(t_max-t_min), t_min=-8, t_max=+39 (only in hourly scale)
- atemp: Normalized feeling temperature in Celsius. The values are derived via (t-t_min)/(t_max-t_min), t_min=-16, t_max=+50 (only in hourly scale)
- hum: Normalized humidity. The values are divided to 100 (max)
- windspeed: Normalized wind speed. The values are divided to 67 (max)
- casual: count of casual users
- registered: count of registered users
- cnt: count of total rental bikes including both casual and registered

In [5]:
import com.amazon.deequ.suggestions.{ConstraintSuggestionRunner, Rules}
val df: DataFrame = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("data/day.csv")
df.printSchema
show(df.limit(30))

root
 |-- instant: integer (nullable = true)
 |-- dteday: string (nullable = true)
 |-- season: integer (nullable = true)
 |-- yr: integer (nullable = true)
 |-- mnth: integer (nullable = true)
 |-- holiday: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- workingday: integer (nullable = true)
 |-- weathersit: integer (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- hum: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- casual: integer (nullable = true)
 |-- registered: integer (nullable = true)
 |-- cnt: integer (nullable = true)



0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15
instant,dteday,season,yr,mnth,holiday,weekday,workingday,weathersit,temp,atemp,hum,windspeed,casual,registered,cnt
,,,,,,,,,,,,,,,
1,2011-01-01,1,0,1,0,6,0,2,0.344167,0.363625,0.805833,0.160446,331,654,985
,,,,,,,,,,,,,,,
2,2011-01-02,1,0,1,0,0,0,2,0.363478,0.353739,0.696087,0.248539,131,670,801
,,,,,,,,,,,,,,,
3,2011-01-03,1,0,1,0,1,1,1,0.196364,0.189405,0.437273,0.248309,120,1229,1349
,,,,,,,,,,,,,,,
4,2011-01-04,1,0,1,0,2,1,1,0.2,0.212122,0.590435,0.160296,108,1454,1562
,,,,,,,,,,,,,,,


[32mimport [39m[36mcom.amazon.deequ.suggestions.{ConstraintSuggestionRunner, Rules}
[39m
[36mdf[39m: [32mDataFrame[39m = [instant: int, dteday: string ... 14 more fields]

# automatic suggestion of constraints

In [6]:
import org.apache.spark.sql.functions.col
import com.amazon.deequ.suggestions.rules.UniqueIfApproximatelyUniqueRule

val suggestionResult = ConstraintSuggestionRunner()
      .onData(df
              .filter(col("yr") === 0 && 'mnth === 1)
             )
      .addConstraintRules(Rules.DEFAULT)
      .addConstraintRule(UniqueIfApproximatelyUniqueRule())
      .useTrainTestSplitWithTestsetRatio(0.5, Some(0))
      .run()


[32mimport [39m[36morg.apache.spark.sql.functions.col
[39m
[32mimport [39m[36mcom.amazon.deequ.suggestions.rules.UniqueIfApproximatelyUniqueRule

[39m
[36msuggestionResult[39m: [32mcom[39m.[32mamazon[39m.[32mdeequ[39m.[32msuggestions[39m.[32mConstraintSuggestionResult[39m = [33mConstraintSuggestionResult[39m(
  [33mMap[39m(
    [32m"workingday"[39m -> [33mNumericColumnProfile[39m(
      [32m"workingday"[39m,
      [32m1.0[39m,
      [32m2L[39m,
      Integral,
      false,
      [33mMap[39m(),
      [33mSome[39m(
        [33mDistribution[39m(
          [33mMap[39m(
            [32m"1"[39m -> [33mDistributionValue[39m([32m12L[39m, [32m0.75[39m),
            [32m"0"[39m -> [33mDistributionValue[39m([32m4L[39m, [32m0.25[39m)
          ),
          [32m2L[39m
        )
      ),
      [32mNone[39m,
      [33mSome[39m([32m0.75[39m),
      [33mSome[39m([32m1.0[39m),
      [33mSome[39m([32m0.0[39m),
      [33mSome[39m(

In [7]:
suggestionResult.constraintSuggestions.foreach { case (column, suggestions) =>
      suggestions.foreach { suggestion =>
        println(s"Constraint suggestion for '$column':\t${suggestion.description}\n" +
          s"The corresponding scala code is ${suggestion.codeForConstraint}\n")
      }
      suggestions.foreach { suggestion =>
        println(suggestion.codeForConstraint)
      }
    }

Constraint suggestion for 'workingday':	'workingday' is not null
The corresponding scala code is .isComplete("workingday")

Constraint suggestion for 'workingday':	'workingday' has value range '1', '0'
The corresponding scala code is .isContainedIn("workingday", Array("1", "0"))

Constraint suggestion for 'workingday':	'workingday' has no negative values
The corresponding scala code is .isNonNegative("workingday")

.isComplete("workingday")
.isContainedIn("workingday", Array("1", "0"))
.isNonNegative("workingday")
Constraint suggestion for 'windspeed':	'windspeed' is not null
The corresponding scala code is .isComplete("windspeed")

Constraint suggestion for 'windspeed':	'windspeed' has no negative values
The corresponding scala code is .isNonNegative("windspeed")

Constraint suggestion for 'windspeed':	'windspeed' is unique
The corresponding scala code is .isUnique("windspeed")

.isComplete("windspeed")
.isNonNegative("windspeed")
.isUnique("windspeed")
Constraint suggestion for 'regi

In [8]:
suggestionResult.constraintSuggestions.foreach { case (column, suggestions) =>
      suggestions.foreach { suggestion =>
        println(suggestion.codeForConstraint)
      }
    }

.isComplete("workingday")
.isContainedIn("workingday", Array("1", "0"))
.isNonNegative("workingday")
.isComplete("windspeed")
.isNonNegative("windspeed")
.isUnique("windspeed")
.isComplete("registered")
.isNonNegative("registered")
.isUnique("registered")
.isComplete("atemp")
.isNonNegative("atemp")
.isUnique("atemp")
.isComplete("weathersit")
.isContainedIn("weathersit", Array("1", "2"))
.isNonNegative("weathersit")
.isComplete("hum")
.isNonNegative("hum")
.isUnique("hum")
.isComplete("season")
.isContainedIn("season", Array("1"))
.isNonNegative("season")
.isComplete("casual")
.isNonNegative("casual")
.isUnique("casual")
.isComplete("instant")
.isNonNegative("instant")
.isUnique("instant")
.isComplete("temp")
.isNonNegative("temp")
.isUnique("temp")
.isComplete("holiday")
.isNonNegative("holiday")
.isComplete("dteday")
.isUnique("dteday")
.isComplete("weekday")
.isContainedIn("weekday", Array("2", "1", "4", "0", "6"), _ >= 0.81, Some("It should be above 0.81!"))
.isNonNegative("weekda

# Verification suite

In [9]:
import com.amazon.deequ.{VerificationResult, VerificationSuite}
import com.amazon.deequ.checks.{Check, CheckLevel, CheckStatus}
import com.amazon.deequ.constraints.{ConstrainableDataTypes, ConstraintStatus}
import org.apache.spark.sql.DataFrame


 val verificationResult = VerificationSuite()
      .onData(df.filter('yr===1 && 'mnth === 2))
      .addCheck(
        Check(CheckLevel.Error, "unit testing Bike sharing day data")
            .isComplete("workingday")
            .isContainedIn("workingday", Array("1", "0"))
            .isNonNegative("workingday")
            .isComplete("windspeed")
            .isNonNegative("windspeed")
            .isUnique("windspeed")
            .isComplete("registered")
            .isNonNegative("registered")
            .isUnique("registered")
            .isComplete("atemp")
            .isNonNegative("atemp")
            .isUnique("atemp")
            .isComplete("weathersit")
            .isContainedIn("weathersit", Array("1", "2", "3"))
            .isContainedIn("weathersit", Array("1", "2"), _ >= 0.94, Some("It should be above 0.94!"))
            .isNonNegative("weathersit")
            .isComplete("hum")
            .isNonNegative("hum")
            .isComplete("season")
            .isContainedIn("season", Array("4", "1", "2", "3"))
            .isNonNegative("season")
            .isComplete("casual")
            .isNonNegative("casual")
            .isUnique("casual")
            .isComplete("instant")
            .isNonNegative("instant")
            .isUnique("instant")
            .isComplete("temp")
            .isNonNegative("temp")
            .isComplete("holiday")
            .isContainedIn("holiday", Array("0", "1"))
            .isContainedIn("holiday", Array("0"), _ >= 0.93, Some("It should be above 0.93!"))
            .isNonNegative("holiday")
            .isComplete("dteday")
            .isUnique("dteday")
            .isComplete("weekday")
            .isContainedIn("weekday", Array("3", "2", "0", "1", "6", "5", "4"))
            .isNonNegative("weekday")
            .isComplete("cnt")
            .isNonNegative("cnt")
            .isUnique("cnt")
            .isComplete("mnth")
            .isContainedIn("mnth", Array("1", "10", "11", "3", "12", "5", "7", "2", "9", "6", "4", "8"))
            .isContainedIn("mnth", Array("1", "10", "11", "3", "12", "5", "7", "2", "9", "6", "4"), _ >= 0.9, Some("It should be above 0.9!"))
            .isNonNegative("mnth")
            .isComplete("yr")
            .isContainedIn("yr", Array("0", "1"))
            .isNonNegative("yr")
          ).run

[32mimport [39m[36mcom.amazon.deequ.{VerificationResult, VerificationSuite}
[39m
[32mimport [39m[36mcom.amazon.deequ.checks.{Check, CheckLevel, CheckStatus}
[39m
[32mimport [39m[36mcom.amazon.deequ.constraints.{ConstrainableDataTypes, ConstraintStatus}
[39m
[32mimport [39m[36morg.apache.spark.sql.DataFrame


 [39m
[36mverificationResult[39m: [32mVerificationResult[39m = [33mVerificationResult[39m(
  Error,
  [33mMap[39m(
    [33mCheck[39m(
      Error,
      [32m"unit testing Bike sharing day data"[39m,
      [33mList[39m(
        CompletenessConstraint(Completeness(workingday,None)),
        ComplianceConstraint(Compliance(workingday contained in 1,0,`workingday` IS NULL OR `workingday` IN ('1','0'),None)),
        ComplianceConstraint(Compliance(workingday is non-negative,COALESCE(CAST(workingday AS DECIMAL(20,10)), 0.0) >= 0,None)),
        CompletenessConstraint(Completeness(windspeed,None)),
        ComplianceConstraint(Compliance(windspeed is non-neg

In [10]:
if (verificationResult.status == CheckStatus.Success) {
      println("The data passed the test, everything is fine!")
    } else {
      println("We found errors in the data:\n")

      val resultsForAllConstraints = verificationResult.checkResults
        .flatMap { case (_, checkResult) => checkResult.constraintResults }

      resultsForAllConstraints
        .filter {
          _.status != ConstraintStatus.Success
        }
        .foreach { result => println(s"${result.constraint}: ${result.message.get}") }
   }

We found errors in the data:

UniquenessConstraint(Uniqueness(List(atemp),None)): Value: 0.9310344827586207 does not meet the constraint requirement!
UniquenessConstraint(Uniqueness(List(casual),None)): Value: 0.9310344827586207 does not meet the constraint requirement!


In [11]:
val dfResults = VerificationResult.checkResultsAsDataFrame(spark, verificationResult)
show(dfResults.orderBy("constraint_status"))

0,1,2,3,4,5
check,check_level,check_status,constraint,constraint_status,constraint_message
,,,,,
unit testing Bike sharing day data,Error,Error,"UniquenessConstraint(Uniqueness(List(atemp),None))",Failure,Value: 0.9310344827586207 does not meet the constraint requirement!
,,,,,
unit testing Bike sharing day data,Error,Error,"UniquenessConstraint(Uniqueness(List(casual),None))",Failure,Value: 0.9310344827586207 does not meet the constraint requirement!
,,,,,
unit testing Bike sharing day data,Error,Error,"CompletenessConstraint(Completeness(workingday,None))",Success,
,,,,,
unit testing Bike sharing day data,Error,Error,"ComplianceConstraint(Compliance(workingday contained in 1,0,`workingday` IS NULL OR `workingday` IN ('1','0'),None))",Success,
,,,,,


[36mdfResults[39m: [32mDataFrame[39m = [check: string, check_level: string ... 4 more fields]