In [1]:
# Logging setup
import logging
defaultLogger = logging.getLogger('default')
defaultLogger.setLevel(logging.DEBUG)

# Logging examples
# defaultLogger.debug("default debug message")
# defaultLogger.info("default info message")
# defaultLogger.warning("default warning message")
# defaultLogger.error("default error message")
# defaultLogger.critical("default critical message")

In [2]:
# Read data
from pyspark import sql

spark = (
    sql.SparkSession.builder
        .appName("Data Contract Test")
        .getOrCreate()

)

path = 'examples/spark/data/demographics.csv' #'abfss://covid@staadpstaginglakefrey.dfs.core.windows.net/raw/demographics.csv'
schema = spark.read \
            .format("csv") \
            .option("header", True) \
            .option("inferSchema", True) \
            .load(path) \
            .limit(10) \
            .schema
df = spark.read \
            .format("csv") \
            .option("header", True) \
            .schema(schema) \
            .load(path) 

defaultLogger.debug(f"Schema: {df.schema}")

In [8]:
from datacontract.data_contract import DataContract

dataset_name = "demographics"

# Create a view
df.createOrReplaceTempView(dataset_name)

# Specify data contract
data_contract_str = f"""
dataContractSpecification: 0.9.2
id: adp:datacontract:covid:demographics
info:
  title: Demographics
  version: 1.0.0
  description: Information related to the population demographics for each region.    
  owner: Google
  contact:
    name: Freya
servers:
  production:
    type: databricks
    host: {path}
    catalog: none
    schema: none
models:
  {dataset_name}: # corresponds to a table
    type: table
    fields: 
      location_key:
        type: string
        description: Unique string identifying the region
        required: true
        unique: true
      population:
        type: integer
        description: Total count of humans
        required: true
        minimum: 0
      population_male:
        type: integer
        description: Total count of males
        required: true
        minimum: 0
      population_female:
        type: integer
        description: Total count of females
        required: true
        minimum: 0
      human_development_index:
        type: double
        description: Composite index of life expectancy, education, and per capita income indicators
        required: true
        minimum: 0
        maximum: 1
"""

data_contract = DataContract(
  data_contract_str=data_contract_str, 
  spark=spark)

In [9]:
# Test data contract
run = data_contract.test()
defaultLogger.info(f"Run result: {run.result}")

INFO:default:Run result: failed


In [10]:
from rich import box
from rich import print
from rich.table import Table

def _handle_result(run):
    if run.result == "passed":
        print(
            f"ðŸŸ¢ data contract is valid. Run {len(run.checks)} checks. Took {(run.timestampEnd - run.timestampStart).total_seconds()} seconds.")
    else:
        print(f"ðŸ”´ data contract is invalid. Run {len(run.checks)} checks. Took {(run.timestampEnd - run.timestampStart).total_seconds()} seconds.")
    _print_table(run)

def with_markup(result):
    if result == "passed":
        return "[green]passed[/green]"
    if result == "warning":
        return "[yellow]warning[/yellow]"
    if result == "failed":
        return "[red]failed[/red]"
    if result == "error":
        return "[red]error[/red]"
    return result

def to_field(run, check):
    models = [c.model for c in run.checks]
    if len(set(models)) > 1:
        if check.field is None:
            return check.model
        return check.model + "." + check.field
    else:
        return check.field

def _print_table(run):
    table = Table(box=box.ROUNDED)
    table.add_column("Result", no_wrap=True)
    table.add_column("Check", max_width=100)
    table.add_column("Field", max_width=32)
    table.add_column("Details", max_width=50)
    for check in run.checks:
        table.add_row(with_markup(check.result), check.name, to_field(run, check), check.reason)
    print(table)

defaultLogger.info(_handle_result(run))
defaultLogger.debug(run)

INFO:default:None


In [6]:
# See failed checks
failed_checks = [check for check in run.checks if check.result == 'failed']
defaultLogger.info(f"Failed checks: {failed_checks}")

INFO:default:Failed checks: [Check(type='schema', name='Check that required field population has no null values', result='failed', engine='soda-core', reason='', model='demographics', field='population', details=None), Check(type='schema', name='Check that required field population_male has no null values', result='failed', engine='soda-core', reason='', model='demographics', field='population_male', details=None), Check(type='schema', name='Check that required field population_female has no null values', result='failed', engine='soda-core', reason='', model='demographics', field='population_female', details=None), Check(type='schema', name='Check that required field human_development_index has no null values', result='failed', engine='soda-core', reason='', model='demographics', field='human_development_index', details=None)]


In [7]:
# Fail if failed checks
if not run.has_passed():
    raise Exception("Data contract validation failed")

Exception: Data contract validation failed

In [None]:
# Proof that Human Development Index is 70.91 on one row
#max_hdi = df.agg({"human_development_index": "max"}).collect()[0][0]
#print(max_hdi)
df_max_hdi = df.where(df.human_development_index > 1.0)
df_max_hdi.show()

+------------+----------+---------------+-----------------+----------------+----------------+-----------------------+--------------------+------------------+-----------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------------------+
|location_key|population|population_male|population_female|population_rural|population_urban|population_largest_city|population_clustered|population_density|human_development_index|population_age_00_09|population_age_10_19|population_age_20_29|population_age_30_39|population_age_40_49|population_age_50_59|population_age_60_69|population_age_70_79|population_age_80_and_older|
+------------+----------+---------------+-----------------+----------------+----------------+-----------------------+--------------------+------------------+-----------------------+--------------------+--------------------+--------------------+