In [1]:
import sys
sys.path.append("../tsumugi/proto/")

In [2]:
from pyspark.sql import SparkSession
import pandas as pd

In [3]:
spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()

In [4]:
test_data = spark.createDataFrame(
    [
        ["foo", 1, 4],
        ["bar", 2, 6],
        ["baz", 3, None],
    ],
    schema="struct<a:string, b:int, c:int>"
)

In [5]:
test_data.show()

+---+---+----+
|  a|  b|   c|
+---+---+----+
|foo|  1|   4|
|bar|  2|   6|
|baz|  3|NULL|
+---+---+----+



In [6]:
from tsumugi.verification import VerificationSuite
from tsumugi.analyzers import Size, Minimum, Completeness, CustomSql, ConstraintBuilder
from tsumugi.checks import CheckBuilder

In [7]:
suite = (
    VerificationSuite
    .on_data(test_data) # add DataFrame
    .with_row_level_results() # mark that row-level results are required
    .add_check(
        CheckBuilder()
        .with_description("Basic checks")
        .has_size(expected_size=3.0, name="Size(*)")
        .is_primary_key(column="b", name="col(b) should be PK-like")
        .build()
    )
    .add_check(
        CheckBuilder()
        .with_description("Additional checks")
        .is_complete(column="c", name="col(c) should be complete")
        .with_constraint(
            ConstraintBuilder()
            .for_analyzer(Minimum(column="b"))
            .should_be_eq_to(0.0)
            .build()
        )
        .build()
    )
)

In [8]:
result = suite.run_with_spark_session(spark)

In [9]:
from dataclasses import asdict

In [10]:
pd.DataFrame.from_records([asdict(dt) for dt in result.check_results])

Unnamed: 0,level,check_description,constraint_message,metric_name,metric_instance,metric_entity,metric_value,status,constraint
0,Warning,Basic checks,Value: 3 does not meet the constraint requirem...,Size,*,Dataset,3.0,Failure,SizeConstraint(Size(None))
1,Warning,Basic checks,,Uniqueness,b,Column,1.0,Success,"UniquenessConstraint(Uniqueness(Buffer(b),None..."
2,Warning,Basic checks,,Completeness,b,Column,1.0,Success,"CompletenessConstraint(Completeness(b,None,Som..."
3,Warning,Additional checks,Value: 0.6666666666666666 does not meet the co...,Completeness,c,Column,0.666667,Failure,"CompletenessConstraint(Completeness(c,None,Som..."
4,Warning,Additional checks,Value: 1.0 does not meet the constraint requir...,Minimum,b,Column,1.0,Failure,"MinimumConstraint(Minimum(b,None,Some(Analyzer..."


In [11]:
pd.DataFrame.from_records([asdict(dt) for dt in result.checks])

Unnamed: 0,check,check_level,check_status,constraint,constraint_status,constraint_message
0,Basic checks,Warning,Warning,SizeConstraint(Size(None)),Failure,Value: 3 does not meet the constraint requirem...
1,Basic checks,Warning,Warning,"UniquenessConstraint(Uniqueness(Buffer(b),None...",Success,
2,Basic checks,Warning,Warning,"CompletenessConstraint(Completeness(b,None,Som...",Success,
3,Additional checks,Warning,Warning,"CompletenessConstraint(Completeness(c,None,Som...",Failure,Value: 0.6666666666666666 does not meet the co...
4,Additional checks,Warning,Warning,"MinimumConstraint(Minimum(b,None,Some(Analyzer...",Failure,Value: 1.0 does not meet the constraint requir...


In [12]:
pd.DataFrame.from_records([asdict(dt) for dt in result.metrics])

Unnamed: 0,entity,instance,name,value
0,Column,c,Completeness,0.666667
1,Column,b,Uniqueness,1.0
2,Dataset,*,Size,3.0
3,Column,b,Completeness,1.0
4,Column,b,Minimum,1.0


In [14]:
result.row_level_results.toPandas()

Unnamed: 0,a,b,c,Basic checks,Additional checks
0,foo,1,4.0,True,False
1,bar,2,6.0,True,False
2,baz,3,,True,False
