Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimization and additional columns to detailed stats #96

Merged
merged 11 commits into from
May 29, 2024
59 changes: 34 additions & 25 deletions docs/getting-started/setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ create table if not exists `catalog`.`schema`.`{product}_rules` (
7. `action_if_failed` There are 3 different types of actions. These are 'ignore', 'drop', and 'fail'.
Ignore: The rule is run and the output is logged. No action is performed regardless of whether the rule has succeeded or failed. Applies for all 3 rule types.
Drop: The rows that fail the rule get dropped from the dataset. Applies for only row_dq rule type.
Fail: DAG fails if the rule fails. Applies for all 3 rule types.
Fail: job fails if the rule fails. Applies for all 3 rule types.
8. `tag` provide some tag name to dq rule example: completeness, validity, uniqueness etc.
9. `description` Long description for the rule
10. `enable_for_source_dq_validation` flag to run the agg rule
Expand All @@ -59,12 +59,11 @@ create table if not exists `catalog`.`schema`.`{product}_rules` (
15. `query_dq_delimiter` segregate custom queries delimiter ex: $, @ etc. By default it is @. Users can override it with any other delimiter based on the need. The same delimiter mentioned here has to be used in the custom query.
16. `enable_querydq_custom_output` required custom query output in separate table

rule_type, enable_for_source_dq_validation and enable_for_target_dq_validation columns define source_agg_dq, target_agg_dq,source_query_dq and target_query_dq. please see the below definitions:
If rule_type is row_dq then row_dq is TRUE
If rule_type is agg_dq and enable_for_source_dq_validation is TRUE then source_agg_dq is TRUE
If rule_type is agg_dq and enable_for_target_dq_validation is TRUE then target_agg_dq is TRUE
If rule_type is query_dq and enable_for_source_dq_validation is TRUE then source_query_dq is TRUE
If rule_type is query_dq and enable_for_target_dq_validation is TRUE then target_query_dq is TRUE

The Spark Expectation process consists of three phases:
1. When enable_for_source_dq_validation is true, execute agg_dq and query_dq on the source Dataframe
2. If the first step is successful, proceed to run row_dq
3. When enable_for_target_dq_validation is true, exeucte agg_dq and query_dq on the Dataframe resulting from row_dq

### Rule Type For Rules

Expand Down Expand Up @@ -166,15 +165,20 @@ source_dq_expected_outcome string, -- (11)!
source_dq_actual_row_count string, -- (12)!
source_dq_error_row_count string, -- (13)!
source_dq_row_count string, -- (14)!
target_expectations string, -- (15)!
target_dq_status string, -- (16)!
target_dq_actual_outcome string, -- (17)!
target_dq_expected_outcome string, -- (18)!
target_dq_actual_row_count string, -- (19)!
target_dq_error_row_count string, -- (20)!
target_dq_row_count string, -- (21)!
dq_date date, -- (22)!
dq_time string, -- (23)!
source_dq_start_time string, -- (15)!
source_dq_end_time string, -- (16)!
target_expectations string, -- (17)!
target_dq_status string, -- (18)!
target_dq_actual_outcome string, -- (19)!
target_dq_expected_outcome string, -- (20)!
target_dq_actual_row_count string, -- (21)!
target_dq_error_row_count string, -- (22)!
target_dq_row_count string, -- (23)!
target_dq_start_time string, -- (24)!
target_dq_end_time string, -- (25)!
dq_date date, -- (26)!
dq_time string, -- (27)!
dq_job_metadata_info string, -- (28)!
);
```

Expand All @@ -192,12 +196,17 @@ dq_time string, -- (23)!
12. `source_dq_actual_row_count` Number of rows of the source dq
13. `source_dq_error_row_count` Number of rows failed in the source dq
14. `source_dq_row_count` Number of rows of the source dq
15. `target_expectations` Actual Rule to be executed on the target dq
16. `target_dq_status` Status of the rule execution in the Target dq
17. `target_dq_actual_outcome` Actual outcome of the Target dq check
18. `target_dq_expected_outcome` Expected outcome of the Target dq check
19. `target_dq_actual_row_count` Number of rows of the target dq
20. `target_dq_error_row_count` Number of rows failed in the target dq
21. `target_dq_row_count` Number of rows of the target dq
22. `dq_date` Dq executed date
23. `dq_time` Dq executed timestamp
15. `source_dq_start_time` source dq start timestamp
16. `source_dq_end_time` source dq end timestamp
17. `target_expectations` Actual Rule to be executed on the target dq
18. `target_dq_status` Status of the rule execution in the Target dq
19. `target_dq_actual_outcome` Actual outcome of the Target dq check
20. `target_dq_expected_outcome` Expected outcome of the Target dq check
21. `target_dq_actual_row_count` Number of rows of the target dq
22. `target_dq_error_row_count` Number of rows failed in the target dq
23. `target_dq_row_count` Number of rows of the target dq
24. `target_dq_start_time` target dq start timestamp
25. `target_dq_end_time` target dq end timestamp
26. `dq_date` Dq executed date
27. `dq_time` Dq executed timestamp
28. `dq_job_metadata_info` dq job metadata
1 change: 1 addition & 0 deletions prospector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ max-line-length: 120

pylint:
disable:
- too-many-lines
- too-many-branches
- too-many-statements
- too-many-instance-attributes
Expand Down
1 change: 1 addition & 0 deletions spark_expectations/config/user_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class Constants:
# declare const user config variables for agg query dq detailed stats
se_enable_agg_dq_detailed_result = "spark.expectations.agg.dq.detailed.stats"
se_enable_query_dq_detailed_result = "spark.expectations.query.dq.detailed.stats"
se_job_metadata = "spark.expectations.job.metadata"

querydq_output_custom_table_name = "spark.expectations.query.dq.custom.table_name"

Expand Down
24 changes: 24 additions & 0 deletions spark_expectations/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def __post_init__(self) -> None:
self._final_agg_dq_result: Optional[List[Dict[str, str]]] = None
self._source_query_dq_result: Optional[List[Dict[str, str]]] = None
self._final_query_dq_result: Optional[List[Dict[str, str]]] = None
self._job_metadata: Optional[str] = None

self._source_agg_dq_detailed_stats: Optional[List[Tuple]] = None
self._source_query_dq_detailed_stats: Optional[List[Tuple]] = None
Expand Down Expand Up @@ -1911,3 +1912,26 @@ def get_dq_rules_params(self) -> dict:

"""
return self._dq_rules_params

def set_job_metadata(self, job_metadata: Optional[str] = None) -> None:
"""
This function is used to set the job_metadata

Returns:
None

"""
self._job_metadata = job_metadata

@property
def get_job_metadata(self) -> Optional[str]:
"""
This function is used to get row data quality rule type name

Returns:
str: Returns _row_dq_rule_type_name"

"""
if self._job_metadata is not None:
return str(self._job_metadata)
return None
34 changes: 28 additions & 6 deletions spark_expectations/core/expectations.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,15 @@ class SparkExpectations:

def __post_init__(self) -> None:
if isinstance(self.rules_df, DataFrame):
self.spark: SparkSession = self.rules_df.sparkSession
try:
self.spark: Optional[SparkSession] = self.rules_df.sparkSession
except AttributeError:
self.spark = SparkSession.getActiveSession()

if self.spark is None:
raise SparkExpectationsMiscException(
"Spark session is not available, please initialize a spark session before calling SE"
)
else:
raise SparkExpectationsMiscException(
"Input rules_df is not of dataframe type"
Expand Down Expand Up @@ -112,7 +120,7 @@ def _except(func: Any) -> Any:
# variable used for enabling notification at different level

_default_notification_dict: Dict[
str, Union[str, int, bool, Dict[str, str]]
str, Union[str, int, bool, Dict[str, str], None]
] = {
user_config.se_notifications_on_start: False,
user_config.se_notifications_on_completion: False,
Expand All @@ -121,10 +129,13 @@ def _except(func: Any) -> Any:
user_config.se_notifications_on_error_drop_threshold: 100,
user_config.se_enable_agg_dq_detailed_result: False,
user_config.se_enable_query_dq_detailed_result: False,
user_config.se_job_metadata: None,
user_config.querydq_output_custom_table_name: f"{self.stats_table}_querydq_output",
}

_notification_dict: Dict[str, Union[str, int, bool, Dict[str, str]]] = (
_notification_dict: Dict[
str, Union[str, int, bool, Dict[str, str], None]
] = (
{**_default_notification_dict, **user_conf}
if user_conf
else _default_notification_dict
Expand Down Expand Up @@ -262,6 +273,8 @@ def _except(func: Any) -> Any:
else False
)

_job_metadata: str = user_config.se_job_metadata

notifications_on_error_drop_threshold = _notification_dict.get(
user_config.se_notifications_on_error_drop_threshold, 100
)
Expand All @@ -280,6 +293,7 @@ def _except(func: Any) -> Any:
self._context.set_dq_expectations(expectations)
self._context.set_rules_execution_settings_config(rules_execution_settings)
self._context.set_querydq_secondary_queries(dq_queries_dict)
self._context.set_job_metadata(_job_metadata)

@self._notification.send_notification_decorator
@self._statistics_decorator.collect_stats_decorator
Expand All @@ -292,6 +306,7 @@ def wrapper(*args: tuple, **kwargs: dict) -> DataFrame:
table_name: str = self._context.get_table_name

_input_count = _df.count()
_log.info("data frame input record count: %s", _input_count)
_output_count: int = 0
_error_count: int = 0
_source_dq_df: Optional[DataFrame] = None
Expand Down Expand Up @@ -333,21 +348,28 @@ def wrapper(*args: tuple, **kwargs: dict) -> DataFrame:
self._context.set_input_count(_input_count)
self._context.set_error_drop_threshold(_error_drop_threshold)

_log.info(
"Spark Expectations run id for this run: %s",
self._context.get_run_id,
)

if isinstance(_df, DataFrame):
_log.info("The function dataframe is created")
self._context.set_table_name(table_name)
if write_to_temp_table:
_log.info("Dropping to temp table started")
self.spark.sql(f"drop table if exists {table_name}_temp")
self.spark.sql(f"drop table if exists {table_name}_temp") # type: ignore
_log.info("Dropping to temp table completed")
_log.info("Writing to temp table started")
source_columns = _df.columns
self._writer.save_df_as_table(
_df,
f"{table_name}_temp",
self._context.get_target_and_error_table_writer_config,
)
_log.info("Read from temp table started")
_df = self.spark.sql(f"select * from {table_name}_temp")
_df = self.spark.sql(f"select * from {table_name}_temp") # type: ignore
_df = _df.select(source_columns)
_log.info("Read from temp table completed")

func_process = self._process.execute_dq_process(
Expand Down Expand Up @@ -544,7 +566,7 @@ def wrapper(*args: tuple, **kwargs: dict) -> DataFrame:
"error occurred while processing spark "
"expectations due to given dataframe is not type of dataframe"
)
self.spark.catalog.clearCache()
# self.spark.catalog.clearCache()

return _row_dq_df

Expand Down
6 changes: 3 additions & 3 deletions spark_expectations/examples/base_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"""


RULES_DATA = """
RULES_DATA = """

("your_product", "dq_spark_local.customer_order", "row_dq", "sales_greater_than_zero", "sales", "sales > 2000", "ignore", "accuracy", "sales value should be greater than zero", false, true, true, false, 0,null, null)
,("your_product", "dq_spark_{env}.customer_order", "row_dq", "discount_threshold", "discount", "discount*100 < 60","drop", "validity", "discount should be less than 40", true, true, true, false, 0,null, null)
Expand All @@ -41,8 +41,8 @@
,("your_product", "dq_spark_local.customer_order", "query_dq", "order_count_validity_check", "*", "(select count(*) from order_source) > 10", "ignore", "validity", "row count threshold", true, true, true, false, 0, null, true)
,("your_product", "dq_spark_{env}.customer_order", "query_dq", "product_category", "*", "(select count(distinct category) from {table}) < 5", "ignore", "validity", "distinct product category", true, true, true, false, 0,null, true)
,("your_product", "dq_spark_{env}.customer_order", "agg_dq", "distinct_of_ship_mode", "ship_mode", "count(distinct ship_mode) <= 3", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0,null, null)


"""


Expand Down
7 changes: 7 additions & 0 deletions spark_expectations/examples/sample_dq_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
.option("createDisposition", "CREATE_IF_NEEDED")
.option("writeMethod", "direct")
)
dic_job_info = {
"job": "job_name",
"Region": "NA",
"Snapshot": "2024-04-15",
}
job_info = str(dic_job_info)

# if wanted to use indirect method use below setting and spark session
# writer = WrappedDataFrameWriter().mode("overwrite").format("bigquery").\
Expand Down Expand Up @@ -63,6 +69,7 @@
"env": "local",
"table": "product",
},
user_config.se_job_metadata: job_info,
}


Expand Down
7 changes: 7 additions & 0 deletions spark_expectations/examples/sample_dq_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
writer = WrappedDataFrameWriter().mode("append").format("delta")

spark = set_up_delta()
dic_job_info = {
"job": "job_name",
"Region": "NA",
"Snapshot": "2024-04-15",
}
job_info = str(dic_job_info)

se: SparkExpectations = SparkExpectations(
product_id="your_product",
Expand Down Expand Up @@ -47,6 +53,7 @@
"env": "dev",
"table": "product",
},
user_config.se_job_metadata: job_info,
}


Expand Down
7 changes: 7 additions & 0 deletions spark_expectations/examples/sample_dq_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
from spark_expectations.config.user_config import Constants as user_config

writer = WrappedDataFrameWriter().mode("append").format("iceberg")
dic_job_info = {
"job": "job_name",
"Region": "NA",
"Snapshot": "2024-04-15",
}
job_info = str(dic_job_info)

spark = set_up_iceberg()

Expand Down Expand Up @@ -48,6 +54,7 @@
"env": "local",
"table": "product",
},
user_config.se_job_metadata: job_info,
}


Expand Down
Loading
Loading