In [1]:
import os

In [2]:
os.environ["SPARK_VERSION"]="3.0.1"

# Set up a PySpark session

In [3]:
from pyspark.sql import SparkSession, Row
import pydeequ

In [5]:
from glob import glob 

In [8]:
jars = glob("D:\work\sparkapps\jars\deequ_jars\*.jar")

In [9]:
",".join(jars)

'D:\\work\\sparkapps\\jars\\deequ_jars\\com.amazon.deequ_deequ-1.2.2-spark-3.0.jar,D:\\work\\sparkapps\\jars\\deequ_jars\\com.chuusai_shapeless_2.12-2.3.2.jar,D:\\work\\sparkapps\\jars\\deequ_jars\\com.github.fommil.netlib_core-1.1.2.jar,D:\\work\\sparkapps\\jars\\deequ_jars\\com.github.rwl_jtransforms-2.4.0.jar,D:\\work\\sparkapps\\jars\\deequ_jars\\junit_junit-4.8.2.jar,D:\\work\\sparkapps\\jars\\deequ_jars\\net.sf.opencsv_opencsv-2.3.jar,D:\\work\\sparkapps\\jars\\deequ_jars\\org.apache.commons_commons-math3-3.2.jar,D:\\work\\sparkapps\\jars\\deequ_jars\\org.scala-lang_scala-reflect-2.12.1.jar,D:\\work\\sparkapps\\jars\\deequ_jars\\org.scalanlp_breeze-macros_2.12-0.13.2.jar,D:\\work\\sparkapps\\jars\\deequ_jars\\org.scalanlp_breeze_2.12-0.13.2.jar,D:\\work\\sparkapps\\jars\\deequ_jars\\org.slf4j_slf4j-api-1.7.5.jar,D:\\work\\sparkapps\\jars\\deequ_jars\\org.spire-math_spire-macros_2.12-0.13.0.jar,D:\\work\\sparkapps\\jars\\deequ_jars\\org.spire-math_spire_2.12-0.13.0.jar,D:\\work\\s

In [10]:
spark = (SparkSession
    .builder
#     .config("spark.jars.packages", pydeequ.deequ_maven_coord)
#     .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
         .config("spark.jars", ",".join(jars))
    .getOrCreate())

In [11]:
spark

In [12]:
df = spark.sparkContext.parallelize([
            Row(a="foo", b=1, c=5),
            Row(a="bar", b=2, c=6),
            Row(a="baz", b=3, c=None)]).toDF()

In [13]:
df.show()

+---+---+----+
|  a|  b|   c|
+---+---+----+
|foo|  1|   5|
|bar|  2|   6|
|baz|  3|null|
+---+---+----+



In [14]:
df.printSchema()

root
 |-- a: string (nullable = true)
 |-- b: long (nullable = true)
 |-- c: long (nullable = true)



# Analyzers

In [15]:
from pydeequ.analyzers import *

In [16]:
analysisResult = AnalysisRunner(spark) \
                    .onData(df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("b")) \
                    .run()

In [17]:
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)

In [18]:
analysisResult_df.show()

+-------+--------+------------+-----+
| entity|instance|        name|value|
+-------+--------+------------+-----+
|Dataset|       *|        Size|  3.0|
| Column|       b|Completeness|  1.0|
+-------+--------+------------+-----+



# Profile

In [19]:
from pydeequ.profiles import *

In [20]:
result = ColumnProfilerRunner(spark) \
    .onData(df) \
    .run()

In [21]:
for col, profile in result.profiles.items():
    print(profile)

StandardProfiles for column: a: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 3,
    "dataType": "String",
    "isDataTypeInferred": false,
    "typeCounts": {
        "Boolean": 0,
        "Fractional": 0,
        "Integral": 0,
        "Unknown": 0,
        "String": 3
    },
    "histogram": [
        [
            "baz",
            1,
            0.3333333333333333
        ],
        [
            "foo",
            1,
            0.3333333333333333
        ],
        [
            "bar",
            1,
            0.3333333333333333
        ]
    ]
}
NumericProfiles for column: b: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 3,
    "dataType": "Integral",
    "isDataTypeInferred": false,
    "typeCounts": {},
    "histogram": [
        [
            "1",
            1,
            0.3333333333333333
        ],
        [
            "2",
            1,
            0.3333333333333333
        ],
        [
            "3",
            1,
           

# Constraint Suggestions

In [22]:
from pydeequ.suggestions import *

In [23]:
suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(df) \
             .addConstraintRule(DEFAULT()) \
             .run()

In [24]:
# Constraint Suggestions in JSON format
suggestionResult

{'constraint_suggestions': [{'constraint_name': 'CompletenessConstraint(Completeness(b,None))',
   'column_name': 'b',
   'current_value': 'Completeness: 1.0',
   'description': "'b' is not null",
   'suggesting_rule': 'CompleteIfCompleteRule()',
   'rule_description': 'If a column is complete in the sample, we suggest a NOT NULL constraint',
   'code_for_constraint': '.isComplete("b")'},
  {'constraint_name': "ComplianceConstraint(Compliance('b' has no negative values,b >= 0,None))",
   'column_name': 'b',
   'current_value': 'Minimum: 1.0',
   'description': "'b' has no negative values",
   'suggesting_rule': 'NonNegativeNumbersRule()',
   'rule_description': 'If we see only non-negative numbers in a column, we suggest a corresponding constraint',
   'code_for_constraint': '.isNonNegative("b")'},
  {'constraint_name': 'UniquenessConstraint(Uniqueness(List(b),None))',
   'column_name': 'b',
   'current_value': 'ApproxDistinctness: 1.0',
   'description': "'b' is unique",
   'suggestin

# Constraint Verification

In [25]:
from pydeequ.checks import *
from pydeequ.verification import *

In [26]:
check = Check(spark, CheckLevel.Warning, "Review Check")

In [27]:
checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.hasSize(lambda x: x >= 3) \
        .hasMin("b", lambda x: x == 0) \
        .isComplete("c")  \
        .isUnique("a")  \
        .isContainedIn("a", ["foo", "bar", "baz"]) \
        .isNonNegative("b")) \
    .run()

Python Callback server started!


In [28]:
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)

In [29]:
checkResult_df.show(truncate=False)

+------------+-----------+------------+-----------------------------------------------------------------------------------------------------------+-----------------+-------------------------------------------------------------------+
|check       |check_level|check_status|constraint                                                                                                 |constraint_status|constraint_message                                                 |
+------------+-----------+------------+-----------------------------------------------------------------------------------------------------------+-----------------+-------------------------------------------------------------------+
+------------+-----------+------------+-----------------------------------------------------------------------------------------------------------+-----------------+-------------------------------------------------------------------+



# Repository

In [30]:
from pydeequ.repository import *
from pydeequ.analyzers import *

In [31]:
metrics_file = FileSystemMetricsRepository.helper_metrics_file(spark, 'metrics.json')
repository = FileSystemMetricsRepository(spark, metrics_file)
key_tags = {'tag': 'pydeequ hello world'}
resultKey = ResultKey(spark, ResultKey.current_milli_time(), key_tags)

In [32]:
analysisResult = AnalysisRunner(spark) \
    .onData(df) \
    .addAnalyzer(ApproxCountDistinct('b')) \
    .useRepository(repository) \
    .saveOrAppendResult(resultKey) \
    .run()

In [33]:
result_metrep_df = repository.load() \
    .before(ResultKey.current_milli_time()) \
    .forAnalyzers([ApproxCountDistinct('b')]) \
    .getSuccessMetricsAsDataFrame()


In [34]:
result_metrep_df.show()

+------+--------+-------------------+-----+-------------+-------------------+
|entity|instance|               name|value| dataset_date|                tag|
+------+--------+-------------------+-----+-------------+-------------------+
|Column|       b|ApproxCountDistinct|  3.0|1636830217163|pydeequ hello world|
+------+--------+-------------------+-----+-------------+-------------------+



# wrapping up

In [None]:
spark.sparkContext._gateway.shutdown_callback_server()
spark.stop()
