In [2]:
!pip install great_expectations pandas pandasql pyspark

Collecting great_expectations
  Using cached great_expectations-1.3.7-py3-none-any.whl.metadata (8.5 kB)
Collecting pandas
  Using cached pandas-2.2.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (89 kB)
Collecting pandasql
  Using cached pandasql-0.7.3-py3-none-any.whl
Collecting pyspark
  Using cached pyspark-3.5.4-py2.py3-none-any.whl
Collecting altair<5.0.0,>=4.2.1 (from great_expectations)
  Using cached altair-4.2.2-py3-none-any.whl.metadata (13 kB)
Collecting marshmallow<4.0.0,>=3.7.1 (from great_expectations)
  Using cached marshmallow-3.26.1-py3-none-any.whl.metadata (7.3 kB)
Collecting posthog<4,>3 (from great_expectations)
  Downloading posthog-3.15.1-py2.py3-none-any.whl.metadata (2.9 kB)
Collecting pydantic>=1.10.7 (from great_expectations)
  Using cached pydantic-2.10.6-py3-none-any.whl.metadata (30 kB)
Collecting pyparsing>=2.4 (from great_expectations)
  Using cached pyparsing-3.2.1-py3-none-any.whl.metadata (5.0 kB)
Collecting scipy>=1.6.0 (from 

In [3]:
# Import necessary libraries
import pandas as pd
import great_expectations as gx
import pandasql as psql
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [4]:
# QUERY SQL NON FUNZIONANTE SU DATAFRAME PANDAS
data_transfers = {
    'transfer_balance_id': [1, 2, 3, 4],
    'amount': [100, 200, 150, 500],
    'transfer_date': ['2023-01-01', '2023-01-02', '2023-01-03', '2023-01-04']
}

data_transfer_balance = {
    'transfer_balance_id': [1, 2, 3, 4],
    'total_amount': [100, 200, 150, 500]
}

df_transfers = pd.DataFrame(data_transfers)
df_transfer_balance = pd.DataFrame(data_transfer_balance)

context = gx.get_context()

data_source = context.data_sources.add_pandas("example")
transfers_data_asset = data_source.add_dataframe_asset(name="transfers")
transfer_balance_data_asset = data_source.add_dataframe_asset(name="transfer_balance")

batch_transfers = transfers_data_asset.add_batch_definition_whole_dataframe("transfers").get_batch(batch_parameters={"dataframe": df_transfers})
batch_transfer_balance = transfer_balance_data_asset.add_batch_definition_whole_dataframe("transfer_balance").get_batch(batch_parameters={"dataframe": df_transfer_balance})

query = """
    SELECT
        *
    FROM
        {batch}
    WHERE
        amount < 200
    """

query_expectation = gx.expectations.UnexpectedRowsExpectation(
    unexpected_rows_query=query
)

batch_transfers.validate(query_expectation)

Calculating Metrics: 0it [00:00, ?it/s]


{
  "success": false,
  "expectation_config": {
    "type": "unexpected_rows_expectation",
    "kwargs": {
      "unexpected_rows_query": "\n    SELECT\n        *\n    FROM\n        {batch}\n    WHERE\n        amount < 200",
      "batch_id": "example-transfers"
    },
    "meta": {}
  },
  "result": {},
  "meta": {},
  "exception_info": {
    "exception_traceback": "Traceback (most recent call last):\n  File \"/opt/conda/lib/python3.11/site-packages/great_expectations/expectations/registry.py\", line 315, in get_metric_provider\n    return metric_definition[\"providers\"][type(execution_engine).__name__]\n           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\nKeyError: 'PandasExecutionEngine'\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File \"/opt/conda/lib/python3.11/site-packages/great_expectations/validator/validator.py\", line 714, in _generate_metric_dependency_subgraphs_for_each_expectation_

In [5]:
# FUNZIONANTE DATAFRAME PANDAS + JOIN DATAFRAME (SENZA QUERY SQL)
data_transfers = {
    'transfer_balance_id': [1, 2, 3, 4],
    'amount': [100, 200, 150, 500],
    'transfer_date': ['2023-01-01', '2023-01-02', '2023-01-03', '2023-01-04']
}

data_transfer_balance = {
    'transfer_balance_id': [1, 2, 3, 4],
    'total_amount': [100, 200, 150, 500]
}

df_transfers = pd.DataFrame(data_transfers)
df_transfer_balance = pd.DataFrame(data_transfer_balance)

merged_df = pd.merge(df_transfers, df_transfer_balance, on='transfer_balance_id', how='left')

context = gx.get_context()

data_source = context.data_sources.add_pandas("example")
merge_data_asset = data_source.add_dataframe_asset(name="merge")
merge_batch = merge_data_asset.add_batch_definition_whole_dataframe("merge").get_batch(batch_parameters={"dataframe": merged_df})

expectation = gx.expectations.core.ExpectColumnPairValuesToBeEqual(
    column_A="amount",
    column_B="total_amount"
)

merge_batch.validate(expectation)

Calculating Metrics: 100%|██████████| 9/9 [00:00<00:00, 258.53it/s] 


{
  "success": true,
  "expectation_config": {
    "type": "expect_column_pair_values_to_be_equal",
    "kwargs": {
      "batch_id": "example-merge",
      "column_A": "amount",
      "column_B": "total_amount"
    },
    "meta": {}
  },
  "result": {
    "element_count": 4,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": [],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0,
    "partial_unexpected_counts": [],
    "partial_unexpected_index_list": []
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [6]:
# FUNZIONANTE SQL (senza {batch}) + JOIN DATAFRAME SPARK + VIEW SPARK

spark = SparkSession.builder.appName("DataFrames Example").getOrCreate()

data_transfers = [
    (1, 100, '2023-01-01'),
    (2, 200, '2023-01-02'),
    (3, 150, '2023-01-03'),
    (4, 500, '2023-01-04')
]

data_transfer_balance = [
    (1, 100),
    (2, 200),
    (3, 150),
    (4, 500)
]

df_transfers = spark.createDataFrame(data_transfers, ["transfer_balance_id", "amount", "transfer_date"])
df_transfer_balance = spark.createDataFrame(data_transfer_balance, ["transfer_balance_id", "total_amount"])

context = gx.get_context()

data_source = context.data_sources.add_spark("example")
transfers_data_asset = data_source.add_dataframe_asset(name="transfers")
transfer_balance_data_asset = data_source.add_dataframe_asset(name="transfer_balance")

batch_transfers = transfers_data_asset.add_batch_definition_whole_dataframe("transfers").get_batch(batch_parameters={"dataframe": df_transfers})
batch_transfer_balance = transfer_balance_data_asset.add_batch_definition_whole_dataframe("transfer_balance").get_batch(batch_parameters={"dataframe": df_transfer_balance})

merged_df = df_transfers.join(df_transfer_balance, on="transfer_balance_id", how="left")

merged_df.createOrReplaceTempView("transfers_table")

query = """
    SELECT *
    FROM transfers_table
    WHERE amount < 200
"""

query_result = spark.sql(query)

query_result.show()

query_expectation = gx.expectations.UnexpectedRowsExpectation(
    unexpected_rows_query=query
)

batch_transfers.validate(query_expectation)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/24 15:32:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+-------------------+------+-------------+------------+
|transfer_balance_id|amount|transfer_date|total_amount|
+-------------------+------+-------------+------------+
|                  1|   100|   2023-01-01|         100|
|                  3|   150|   2023-01-03|         150|
+-------------------+------+-------------+------------+

unexpected_rows_query should contain the {batch} parameter. Otherwise data outside the configured batch will be queried.
unexpected_rows_query should contain the {batch} parameter. Otherwise data outside the configured batch will be queried.
unexpected_rows_query should contain the {batch} parameter. Otherwise data outside the configured batch will be queried.
unexpected_rows_query should contain the {batch} parameter. Otherwise data outside the configured batch will be queried.


Calculating Metrics: 100%|██████████| 2/2 [00:01<00:00,  1.10it/s]              

unexpected_rows_query should contain the {batch} parameter. Otherwise data outside the configured batch will be queried.
unexpected_rows_query should contain the {batch} parameter. Otherwise data outside the configured batch will be queried.
unexpected_rows_query should contain the {batch} parameter. Otherwise data outside the configured batch will be queried.





{
  "success": false,
  "expectation_config": {
    "type": "unexpected_rows_expectation",
    "kwargs": {
      "batch_id": "example-transfers",
      "unexpected_rows_query": "\n    SELECT *\n    FROM transfers_table\n    WHERE amount < 200"
    },
    "meta": {}
  },
  "result": {
    "observed_value": 2,
    "details": {
      "unexpected_rows": [
        {
          "transfer_balance_id": 1,
          "amount": 100,
          "transfer_date": "2023-01-01",
          "total_amount": 100
        },
        {
          "transfer_balance_id": 3,
          "amount": 150,
          "transfer_date": "2023-01-03",
          "total_amount": 150
        }
      ]
    }
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [7]:
# FUNZIONANTE SQL (con {batch}) + senza JOIN DATAFRAME SPARK e senza VIEW SPARK
spark = SparkSession.builder.appName("DataFrames Example").getOrCreate()

data_transfers = [
    (1, 100, '2023-01-01'),
    (2, 200, '2023-01-02'),
    (3, 150, '2023-01-03'),
    (4, 500, '2023-01-04')
]

data_transfer_balance = [
    (1, 100),
    (2, 200),
    (3, 150),
    (4, 500)
]

df_transfers = spark.createDataFrame(data_transfers, ["transfer_balance_id", "amount", "transfer_date"])
df_transfer_balance = spark.createDataFrame(data_transfer_balance, ["transfer_balance_id", "total_amount"])

context = gx.get_context()

data_source = context.data_sources.add_spark("example")
transfers_data_asset = data_source.add_dataframe_asset(name="transfers")
transfer_balance_data_asset = data_source.add_dataframe_asset(name="transfer_balance")

batch_transfers = transfers_data_asset.add_batch_definition_whole_dataframe("transfers").get_batch(batch_parameters={"dataframe": df_transfers})
batch_transfer_balance = transfer_balance_data_asset.add_batch_definition_whole_dataframe("transfer_balance").get_batch(batch_parameters={"dataframe": df_transfer_balance})

query = """
    SELECT *
    FROM {batch}
    WHERE amount < 200
"""

query_expectation = gx.expectations.UnexpectedRowsExpectation(
    unexpected_rows_query=query
)

batch_transfers.validate(query_expectation)

Calculating Metrics: 100%|██████████| 2/2 [00:00<00:00,  2.65it/s]


{
  "success": false,
  "expectation_config": {
    "type": "unexpected_rows_expectation",
    "kwargs": {
      "batch_id": "example-transfers",
      "unexpected_rows_query": "\n    SELECT *\n    FROM {batch}\n    WHERE amount < 200"
    },
    "meta": {}
  },
  "result": {
    "observed_value": 2,
    "details": {
      "unexpected_rows": [
        {
          "transfer_balance_id": 1,
          "amount": 100,
          "transfer_date": "2023-01-01"
        },
        {
          "transfer_balance_id": 3,
          "amount": 150,
          "transfer_date": "2023-01-03"
        }
      ]
    }
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [9]:
# FUNZIONANTE: DATAFRAME SPARK + QUERY senza {batch} ma con VIEW spark
spark = SparkSession.builder.appName("DataFrames Example").getOrCreate()

data_transfers = [
    (1, 100, '2023-01-01'),
    (2, 200, '2023-01-02'),
    (3, 150, '2023-01-03'),
    (4, 500, '2023-01-04')
]

data_transfer_balance = [
    (1, 100),
    (2, 200),
    (3, 150)#,
    #(4, 500)
]

df_transfers = spark.createDataFrame(data_transfers, ["transfer_balance_id", "amount", "transfer_date"])
df_transfer_balance = spark.createDataFrame(data_transfer_balance, ["transfer_balance_id", "total_amount"])

context = gx.get_context()

data_source = context.data_sources.add_spark("example")
transfers_data_asset = data_source.add_dataframe_asset(name="transfers")
transfer_balance_data_asset = data_source.add_dataframe_asset(name="transfer_balance")

batch_transfers = transfers_data_asset.add_batch_definition_whole_dataframe("transfers").get_batch(batch_parameters={"dataframe": df_transfers})
batch_transfer_balance = transfer_balance_data_asset.add_batch_definition_whole_dataframe("transfer_balance").get_batch(batch_parameters={"dataframe": df_transfer_balance})

df_transfers.createOrReplaceTempView("transfers")
df_transfer_balance.createOrReplaceTempView("transfer_balance")

query = """
    SELECT t.transfer_balance_id
    FROM transfers t
    LEFT JOIN transfer_balance b ON t.transfer_balance_id = b.transfer_balance_id
    WHERE b.transfer_balance_id IS NULL
"""

query_result = spark.sql(query)

query_result.show()

query_expectation = gx.expectations.UnexpectedRowsExpectation(
    unexpected_rows_query=query
)

batch_transfers.validate(query_expectation)



                                                                                

+-------------------+
|transfer_balance_id|
+-------------------+
|                  4|
+-------------------+

unexpected_rows_query should contain the {batch} parameter. Otherwise data outside the configured batch will be queried.
unexpected_rows_query should contain the {batch} parameter. Otherwise data outside the configured batch will be queried.
unexpected_rows_query should contain the {batch} parameter. Otherwise data outside the configured batch will be queried.
unexpected_rows_query should contain the {batch} parameter. Otherwise data outside the configured batch will be queried.


Calculating Metrics: 100%|██████████| 2/2 [00:01<00:00,  1.45it/s]              

unexpected_rows_query should contain the {batch} parameter. Otherwise data outside the configured batch will be queried.
unexpected_rows_query should contain the {batch} parameter. Otherwise data outside the configured batch will be queried.
unexpected_rows_query should contain the {batch} parameter. Otherwise data outside the configured batch will be queried.





{
  "success": false,
  "expectation_config": {
    "type": "unexpected_rows_expectation",
    "kwargs": {
      "batch_id": "example-transfers",
      "unexpected_rows_query": "\n    SELECT t.transfer_balance_id\n    FROM transfers t\n    LEFT JOIN transfer_balance b ON t.transfer_balance_id = b.transfer_balance_id\n    WHERE b.transfer_balance_id IS NULL"
    },
    "meta": {}
  },
  "result": {
    "observed_value": 1,
    "details": {
      "unexpected_rows": [
        {
          "transfer_balance_id": 4
        }
      ]
    }
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [None]:
# FUNZIONANTE: DATAFRAME SPARK + QUERY con {batch}
spark = SparkSession.builder.appName("DataFrames Example").getOrCreate()

data_transfers = [
    (1, 100, '2023-01-01'),
    (2, 200, '2023-01-02'),
    (3, 150, '2023-01-03'),
    (4, 500, '2023-01-04')
]

data_transfer_balance = [
    (1, 100),
    (2, 200),
    (3, 150),
    (4, 500)
]

df_transfers = spark.createDataFrame(data_transfers, ["transfer_balance_id", "amount", "transfer_date"])
df_transfer_balance = spark.createDataFrame(data_transfer_balance, ["transfer_balance_id", "total_amount"])

context = gx.get_context()

data_source = context.data_sources.add_spark("example")
transfers_data_asset = data_source.add_dataframe_asset(name="transfers")
transfer_balance_data_asset = data_source.add_dataframe_asset(name="transfer_balance")

batch_transfers = transfers_data_asset.add_batch_definition_whole_dataframe("transfers").get_batch(batch_parameters={"dataframe": df_transfers})
batch_transfer_balance = transfer_balance_data_asset.add_batch_definition_whole_dataframe("transfer_balance").get_batch(batch_parameters={"dataframe": df_transfer_balance})

df_transfers.createOrReplaceTempView("transfers")
df_transfer_balance.createOrReplaceTempView("transfer_balance")

query = """
    SELECT t.transfer_balance_id
    FROM {batch} t
    LEFT JOIN transfer_balance b ON t.transfer_balance_id = b.transfer_balance_id
    WHERE b.transfer_balance_id IS NULL
"""

query_expectation = gx.expectations.UnexpectedRowsExpectation(
    unexpected_rows_query=query
)

batch_transfers.validate(query_expectation, result_format="COMPLETE")
