In [8]:
import great_expectations as ge
context = ge.data_context.DataContext()

In [9]:
from dotenv import load_dotenv
from great_expectations.dataset.sparkdf_dataset import SparkDFDataset
import os

load_dotenv()
s3_access_key = os.environ.get("S3_ACCESS_KEY")
s3_secret_key = os.environ.get("S3_SECRET_KEY")
CORES = 2 # Remember to use 2 cores for laptop work and 4 cores for local machine

- Common columns
- Different columns
- Compare data for common columns in both files
- Difference in the amount of items stored in each parquet file
- Count and group the occurrences of the value “trustRate” in each file
- Count and group the occurrences of the value “humanId” in each file
- Determine based on the contents of the compared files whether there is a major or critical difference in terms of data
- Any additional statistical information considered useful is appreciated (within the context of an identity resolution process)

1. Compare A to B in notebook with ge
2. See how to integrate with CI-CD
3. Final work

In [10]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Loyalty") \
    .config("spark.master", f"local[{CORES}]") \
    .config("spark.executor.cores", f"{CORES}") \
    .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:3.3.1") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.access.key", s3_access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", s3_secret_key) \
    .getOrCreate()

In [11]:
#Fetch results A, B and C parquets data from S3 
df_results_a = spark.read.parquet("s3a://data-test-202302/adstra/resultsA.parquet")
df_results_b = spark.read.parquet("s3a://data-test-202302/adstra/resultsB.parquet")
df_results_c = spark.read.parquet("s3a://data-test-202302/adstra/resultsC.parquet")

In [12]:
df_results_a.show(5)

+--------+------+------+--------------------+--------+-------+-----+----------+----------+-------+--------------------+---------+
|   index| fname| lname|               email|socialid|country|state|    mobile|     phone|zipcode|             humanId|trustRate|
+--------+------+------+--------------------+--------+-------+-----+----------+----------+-------+--------------------+---------+
|r0000001| james|robles|    person1@mail.net|774121t9|     mx|   tj|9991200767|8881200767|  49162|ae7b5b63-da1c-493...|       80|
|r0000002|daniel| boyle|roddaniel445@comp...|998112a4|     ca|   on|9991200610|8881200610|  20883|89abf215-f658-4a0...|       50|
|r0000003|  xxxx| xxxxx|              xxxxxx|878445b2|     us|   fl|      null|      null|   null|                null|        0|
|r0000004| marie| velez|marie.rodarte123@...|798125x4|     ca|   ot|9991200114|8881200114|  61415|19ce37e6-2edf-41e...|       50|
|r0000005|MARTIN|TORRES|not.provided@abc.com|981210o2|     us|   ca|9991200276|8881200276|

In [None]:
df_results_b.show(5)

In [13]:
ge_results_a = SparkDFDataset(df_results_a)
ge_results_b = SparkDFDataset(df_results_b)
#df_results_c = SparkDFDataset(df_results_c)

In [11]:
context = ge.get_context()
context.create_expectation_suite("exploration")

In [14]:
ge_results_b.expect_column_values_to_not_be_null("lname")

In [15]:
# Compare columns with ge A - B

columns_a = df_results_a.columns
columns_b = df_results_b.columns

dataset = ge.dataset.PandasDataset({'column_name': columns_a})
dataset.expect_column_values_to_be_in_set('column_name', columns_b)

print(result)


{
  "success": false,
  "meta": {},
  "result": {
    "element_count": 12,
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_count": 2,
    "unexpected_percent": 16.666666666666664,
    "unexpected_percent_total": 16.666666666666664,
    "unexpected_percent_nonmissing": 16.666666666666664,
    "partial_unexpected_list": [
      "state",
      "zipcode"
    ]
  },
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  },
  "expectation_config": {
    "meta": {},
    "expectation_type": "expect_column_values_to_be_in_set",
    "kwargs": {
      "column": "column_name",
      "value_set": [
        "index",
        "fname",
        "lname",
        "email",
        "socialid",
        "country",
        "mobile",
        "phone",
        "gender",
        "humanId",
        "trustRate"
      ],
      "result_format": "BASIC"
    }
  }
}


In [21]:
ge_results_b.get_expectation_suite(discard_failed_expectations=False)

{
  "expectations": [
    {
      "meta": {},
      "expectation_type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "lname"
      }
    }
  ],
  "meta": {
    "great_expectations_version": "0.16.10"
  },
  "data_asset_type": "Dataset",
  "ge_cloud_id": null,
  "expectation_suite_name": "default"
}

In [24]:
ge_results_b.save_expectation_suite("my_expectation_suite.json")

In [26]:
context.build_data_docs()

{'local_site': 'file:///app/great_expectations/uncommitted/data_docs/local_site/index.html'}

In [36]:
checkpoint = ge.checkpoint.SimpleCheckpoint(
    name="my_name_checkpoint",
    data_context=context,
    validator=ge_results_b
)
checkpoint_result = checkpoint.run()

TypeError: DataAsset.validate() got an unexpected keyword argument 'checkpoint_name'

In [31]:
result = ge_results_a.expect_column_pair_values_to_be_equal('index', ge_results_b, 'index')

TypeError: <great_expectations.dataset.sparkdf_dataset.SparkDFDataset object at 0x7fe0f1c5a1a0> is of type SparkDFDataset which cannot be serialized.

In [47]:
import json
#my_expectation_suite = json.load("my_expectation_suite.json")

with open('my_expectation_suite.json') as f:
    my_expectation_suite = json.load(f)

my_df = SparkDFDataset(df_results_b)
my_df.validate()


ConnectionRefusedError: [Errno 111] Connection refused

In [57]:
batch_kwargs = {'dataset': df_results_b, 'datasource': SparkDFDatasource}


batch = context.get_batch(ge_results_b, "my_suite")

BatchKwargsError: BatchKwargs must be a BatchKwargs object or dictionary.

In [51]:
for expectation_suite_id in context.list_expectation_suites():
    print(expectation_suite_id.expectation_suite_name)

my_suite


In [58]:
[datasource['name'] for datasource in context.list_datasources() if datasource['class_name'] == 'SparkDFDatasource']

[]