In [2]:
import great_expectations as gx

In [3]:
gx.__version__

'0.18.22'

In [4]:
from pyspark.sql import SparkSession

In [10]:
spark = (SparkSession.builder
         .master("spark://spark-master:7077")
         .appName("ge-test")
         .getOrCreate())

In [11]:
my_df_vuelos = (
    spark.read
    .option("header", "true")
    .csv("s3a://bronze/flights/")
)

                                                                                

In [12]:
my_df_vuelos.printSchema()

root
 |-- date: string (nullable = true)
 |-- delay: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)



In [13]:
my_df_vuelos.show(5)

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+
only showing top 5 rows


In [14]:
context = gx.get_context(project_root_dir="/opt/notebooks/gx_test_project")

In [15]:
context

{
  "anonymous_usage_statistics": {
    "explicit_id": true,
    "usage_statistics_url": "https://stats.greatexpectations.io/great_expectations/v1/usage_statistics",
    "data_context_id": "eb4bae23-86b4-4707-ad61-684f58650456",
    "explicit_url": false,
    "enabled": true
  },
  "checkpoint_store_name": "checkpoint_store",
  "config_variables_file_path": "uncommitted/config_variables.yml",
  "config_version": 3.0,
  "data_docs_sites": {
    "local_site": {
      "class_name": "SiteBuilder",
      "show_how_to_buttons": true,
      "store_backend": {
        "class_name": "TupleFilesystemStoreBackend",
        "base_directory": "uncommitted/data_docs/local_site/"
      },
      "site_index_builder": {
        "class_name": "DefaultSiteIndexBuilder"
      }
    }
  },
  "datasources": {},
  "evaluation_parameter_store_name": "evaluation_parameter_store",
  "expectations_store_name": "expectations_store",
  "fluent_datasources": {},
  "include_rendered_content": {
    "expectation_suit

In [16]:
source = context.sources.add_or_update_spark(name="spark")

In [17]:
source

SparkDatasource(type='spark', name='spark', id=None, assets=[], spark_config=None, force_reuse_spark_context=True, persist=True)

## Crear un “asset” desde el DataFrame y construir el batch

In [18]:
asset = source.add_dataframe_asset(name="vuelos_df")
batch_request = asset.build_batch_request(dataframe=my_df_vuelos)

In [19]:
batch_request

BatchRequest(datasource_name='spark', data_asset_name='vuelos_df', options={})

## Crear una suite y un validator

In [20]:
suite_name = "bronze_vuelos_suite"

In [21]:
context.add_or_update_expectation_suite(expectation_suite_name=suite_name)

{
  "expectation_suite_name": "bronze_vuelos_suite",
  "ge_cloud_id": null,
  "expectations": [],
  "data_asset_type": null,
  "meta": {
    "great_expectations_version": "0.18.22"
  }
}

In [22]:
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name=suite_name
)

In [23]:
validator

<great_expectations.validator.validator.Validator at 0x7f21887cb7f0>

In [24]:
validator.expect_table_row_count_to_be_between(min_value=1)

Calculating Metrics:   0%|          | 0/2 [00:00<?, ?it/s]

                                                                                

{
  "success": true,
  "result": {
    "observed_value": 1391578
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [25]:
validator.save_expectation_suite()

In [26]:
spark.stop()

En este punto:
- Hemos creado una expectation suite Bronze
- Las reglas se ejecutan sobre Spark
- La suite queda persistida y reutilizable
- El mismo código se podrá ejecutar desde Airflow

Siguiente paso:
- Ejecutar esta validación desde un script .py orquestado por Airflow