In [1]:
%env SPARK_VERSION=3.2.1

env: SPARK_VERSION=3.2.1


In [2]:
from pyspark.sql import SparkSession, Row

spark = (
    SparkSession.builder \
    .appName("TRN Database Test") \
    .config("spark.driver.extraClassPath", "/home/jovyan/work/sqljdbc42.jar") \
    .config("spark.executor.extraClassPath", "/home/jovyan/work/sqljdbc42.jar") \
    .getOrCreate()
)

print("Spark version:", spark.version)


Spark version: 3.2.1


In [3]:
server = "192.168.0.26"
port = "1433"
database = "TRN"
username = "Tester"
password = "tester_tester"

jdbc_url = f"jdbc:sqlserver://{server}:{port};database={database};user={username};password={password}"

In [4]:
# Create temp view for "hr.Jobs" table
spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "hr.Jobs") \
    .load() \
    .createOrReplaceTempView("Jobs")

# Create temp view for "hr.Employees" table
spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "hr.Employees") \
    .load() \
    .createOrReplaceTempView("Employees")

# Create temp view for "hr.Dependents" table
spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "hr.Dependents") \
    .load() \
    .createOrReplaceTempView("Dependents")

In [5]:
from great_expectations.compatibility import pyspark

import great_expectations as gx
from great_expectations import DataContext
from great_expectations.core import ExpectationSuite
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.core.yaml_handler import YAMLHandler

from great_expectations.data_context.util import file_relative_path
from great_expectations.validator.validator import Validator

yaml = YAMLHandler()

In [6]:
from great_expectations.core.expectation_configuration import ExpectationConfiguration

context: DataContext = gx.get_context()

spark_session: pyspark.SparkSession = gx.core.util.get_or_create_spark_application()

# create (remove existing) and load Expectation Suite
expectation_suite_name="TRN Database testing"

context.add_or_update_expectation_suite(
    expectation_suite_name=expectation_suite_name
)

suite: ExpectationSuite = context.get_expectation_suite(
    expectation_suite_name=expectation_suite_name,
)

# Create expectations
config1 = ExpectationConfiguration(
    expectation_type="expect_column_values_to_not_be_null",
    kwargs={"column": "job_id"},
    meta={"notes": "Check if 'job_id' column has non-null values"},
)

config2 = ExpectationConfiguration(
    expectation_type="expect_column_values_to_be_between",
    kwargs={
        "auto": True,
        "column": "min_salary",
        "domain": "column",
        "max_value": 10000,
        "min_value": 3000,
        "mostly": 1.0,
        "strict_max": False,
        "strict_min": False,
    },
    meta={"notes": "Check if 'min_salary' is in defined range"},
)

config3 = ExpectationConfiguration(
    expectation_type="expect_column_values_to_be_unique",
    kwargs={
        "column": "job_title"
    },
    meta={"notes": "Check if 'job_title' column has unique values"}
)


suite.add_expectation(expectation_configuration=config1)
suite.add_expectation(expectation_configuration=config2)
suite.add_expectation(expectation_configuration=config3)

{"kwargs": {"column": "job_title"}, "meta": {"notes": "Check if 'job_title' column has unique values"}, "expectation_type": "expect_column_values_to_be_unique"}

In [7]:
datasource_yaml = f"""
name: trn_spark_datasource
class_name: Datasource
module_name: great_expectations.datasource
execution_engine:
    module_name: great_expectations.execution_engine
    class_name: SparkDFExecutionEngine
data_connectors:
    trn_spark_df_data_connector:
        class_name: RuntimeDataConnector
        batch_identifiers:
            - trn_db_jobs
"""

context.add_datasource(**yaml.load(datasource_yaml))

<great_expectations.datasource.new_datasource.Datasource at 0x7f0ee4b24a00>

In [8]:
df: pyspark.DataFrame = spark_session.sql("select * from Jobs")

runtime_batch_request = RuntimeBatchRequest(
    datasource_name="trn_spark_datasource",
    data_connector_name="trn_spark_df_data_connector",
    data_asset_name="Jobs",
    runtime_parameters={"batch_data": df},
    batch_identifiers={
        "trn_db_jobs": "TRN Database - Jobs",
    },
)

In [9]:
# Constructing Validator by passing in RuntimeBatchRequest
jobs_validator: Validator = context.get_validator(
    batch_request=runtime_batch_request,
    expectation_suite=suite,  
)

validation_data = jobs_validator.validate()
print(validation_data)

Calculating Metrics:   0%|          | 0/21 [00:00<?, ?it/s]

{
  "evaluation_parameters": {},
  "meta": {
    "great_expectations_version": "0.16.16",
    "expectation_suite_name": "TRN Database testing",
    "run_id": {
      "run_name": null,
      "run_time": "2023-06-17T11:29:27.376858+00:00"
    },
    "batch_spec": {
      "data_asset_name": "Jobs",
      "batch_data": "SparkDataFrame"
    },
    "batch_markers": {
      "ge_load_time": "20230617T112927.286924Z"
    },
    "active_batch_definition": {
      "datasource_name": "trn_spark_datasource",
      "data_connector_name": "trn_spark_df_data_connector",
      "data_asset_name": "Jobs",
      "batch_identifiers": {
        "trn_db_jobs": "TRN Database - Jobs"
      }
    },
    "validation_time": "20230617T112927.376801Z",
    "checkpoint_name": null
  },
  "statistics": {
    "evaluated_expectations": 3,
    "successful_expectations": 2,
    "unsuccessful_expectations": 1,
    "success_percent": 66.66666666666666
  },
  "results": [
    {
      "expectation_config": {
        "kwargs"

In [10]:
from great_expectations.profile.basic_dataset_profiler import BasicDatasetProfiler

df_ge = gx.dataset.SparkDFDataset(df)
profile_expectations = BasicDatasetProfiler.profile(df_ge)

print(profile_expectations)

({
  "expectations": [
    {
      "kwargs": {
        "min_value": 0,
        "max_value": null
      },
      "meta": {
        "BasicDatasetProfiler": {
          "confidence": "very low"
        }
      },
      "expectation_type": "expect_table_row_count_to_be_between"
    },
    {
      "kwargs": {
        "column_list": null
      },
      "meta": {
        "BasicDatasetProfiler": {
          "confidence": "very low"
        }
      },
      "expectation_type": "expect_table_columns_to_match_ordered_list"
    },
    {
      "kwargs": {
        "column": "job_id",
        "type_list": [
          "BIGINT",
          "BYTEINT",
          "ByteType()",
          "INT",
          "INT64",
          "INTEGER",
          "Int16Dtype",
          "Int32Dtype",
          "Int64Dtype",
          "Int8Dtype",
          "IntegerType",
          "IntegerType()",
          "LongType",
          "LongType()",
          "SMALLINT",
          "ShortType()",
          "TINYINT",
          "UInt16

In [11]:
from great_expectations.checkpoint import SimpleCheckpoint

checkpoint_name = "trn_jobs_checkpoint"

context.add_or_update_checkpoint(
    name = checkpoint_name,
    config_version = 1,
    class_name = "SimpleCheckpoint",
    validations = [
        {
            "expectation_suite_name": expectation_suite_name,
        }
    ]
)

checkpoint_run_result = context.run_checkpoint(
    checkpoint_name=checkpoint_name,
    batch_request=runtime_batch_request,
    run_name="Checkpoint run"
)



Calculating Metrics: 0it [00:00, ?it/s]

In [21]:
# Generate report
import json

s = validation_data.statistics.get("success_percent")


print("Validation Report")
print("-------------------------------")

s = validation_data.meta.get("expectation_suite_name")
print(f"Suite Name: {s}")
s = validation_data.statistics.get("evaluated_expectations")
print(f"Evaluated Expectations: {s}")
s = validation_data.statistics.get("successful_expectations")
print(f"Successful Expectations: {s}")
s = validation_data.statistics.get("unsuccessful_expectations")
print(f"Unsuccessful Expectations: {s}")
s = validation_data.statistics.get("success_percent")
print(f"Success Percent: {s}\n")

print("Check Results:")
for res in validation_data.results:
    s = res.expectation_config.meta.get("notes")
    r = res.get("success")
    if r:
        status = "PASSED"
    else:
        status = "FAILED"
    print(f"- {s}; Status: {status}")
    if not r:
        s = res.result.get("element_count")
        print(f"\t- Element Count: {s}")
        s = res.result.get("unexpected_count")
        print(f"\t- Unexpected Count: {s}")
        s = res.result.get("unexpected_percent")
        print(f"\t- Unexpected Percent: {s}")
        s = res.result.get("partial_unexpected_list")
        print(f"\t- Unexpected List:")
        for r in s:
            print(f"\t\t{r}")
       



Validation Report
-------------------------------
Suite Name: TRN Database testing
Evaluated Expectations: 3
Successful Expectations: 2
Unsuccessful Expectations: 1
Success Percent: 66.66666666666666

Check Results:
- Check if 'job_id' column has non-null values; Status: PASSED
- Check if 'min_salary' is in defined range; Status: FAILED
	- Element Count: 19
	- Unexpected Count: 5
	- Unexpected Percent: 26.31578947368421
	- Unexpected List:
		20000.00
		15000.00
		2500.00
		2500.00
		2000.00
- Check if 'job_title' column has unique values; Status: PASSED
