In [1]:
# Import libraries
from typing import Any
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from whylogs.api.pyspark.experimental import collect_column_profile_views
from whylogs.api.pyspark.experimental import collect_dataset_profile_view
from whylogs.core.metrics.condition_count_metric import Condition
from whylogs.core.relations import Predicate
from whylogs.core.schema import DeclarativeSchema
from whylogs.core.resolvers import STANDARD_RESOLVER
from whylogs.core.specialized_resolvers import ConditionCountMetricSpec
from whylogs.core.constraints.factories import condition_meets
from whylogs.core.constraints import ConstraintsBuilder
from whylogs.core.constraints.factories import no_missing_values
from whylogs.core.constraints.factories import greater_than_number
from whylogs.viz import NotebookProfileVisualizer
import pandas as pd
import datetime

In [9]:
# initialize sparkSession
spark = SparkSession.builder.appName('whylogs').getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled","true")

df = spark.read.option("header",True).option("inferSchema",True).csv("patient_data.csv")
df.printSchema()


root
 |-- patient_id: integer (nullable = true)
 |-- patient_name: string (nullable = true)
 |-- height: integer (nullable = true)
 |-- weight: integer (nullable = true)
 |-- visit_date: string (nullable = true)



In [4]:
def check_date_format(date_value: Any) -> bool:
    date_format = '%Y-%m-%d'
    try:
        datetime.datetime.strptime(date_value, date_format)
        return True
    except ValueError:
        return False

visit_date_condition = {"is_date_format": Condition(Predicate().is_(check_date_format))}

In [6]:
# create condition count metric
schema = DeclarativeSchema(STANDARD_RESOLVER)
schema.add_resolver_spec(column_name="visit_date", metrics=[ConditionCountMetricSpec(visit_date_condition)])

In [10]:
# use the schema to pass to logger with collect_dataset_profile_view
# this created profile with standard metrics as well as condition
df_profile_view_v2 = collect_dataset_profile_view(input_df=df, schema=schema)

In [13]:
builder = ConstraintsBuilder(dataset_profile_view=df_profile_view_v2)
builder.add_constraint(no_missing_values(column_name="patient_id"))
builder.add_constraint(condition_meets(column_name="visit_date",condition_name="is_date_format"))
builder.add_constraint(greater_than_number(column_name="weight", number=0))

constraints = builder.build()
constraints.generate_constraints_report()

[ReportResult(name='patient_id has no missing values', passed=1, failed=0, summary=None),
 ReportResult(name='visit_date meets condition is_date_format', passed=0, failed=1, summary=None),
 ReportResult(name='weight greater than number 0', passed=0, failed=1, summary=None)]

In [14]:
# visualize constraints report using Notebook Profile Visualizer
visualization = NotebookProfileVisualizer()
visualization.constraints_report(constraints, cell_height=300)

In [15]:
# validate visit_date colummn
df \
.withColumn("check_visit_date", F.to_date(F.col("visit_date"), "yyyy-MM-dd")) \
.withColumn("null_check", F.when(F.col("check_visit_date").isNull(), "null").otherwise("not null")) \
.groupBy("null_check") \
.count() \
.show(truncate = False)

+----------+-----+
|null_check|count|
+----------+-----+
|not null  |98977|
|null      |1023 |
+----------+-----+



In [18]:
# validate weight column
df \
.select("weight") \
.groupBy("weight") \
.count() \
.orderBy(F.col("weight")) \
.limit(1) \
.show(truncate = False)

+------+-----+
|weight|count|
+------+-----+
|0     |2039 |
+------+-----+

