Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion awswrangler/data_quality/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
"""AWS Glue Data Quality package."""

from awswrangler.data_quality._create import create_ruleset, evaluate_ruleset, update_ruleset
from awswrangler.data_quality._create import (
create_recommendation_ruleset,
create_ruleset,
evaluate_ruleset,
update_ruleset,
)
from awswrangler.data_quality._get import get_ruleset

__all__ = [
"create_recommendation_ruleset",
"create_ruleset",
"evaluate_ruleset",
"get_ruleset",
"update_ruleset",
]
160 changes: 133 additions & 27 deletions awswrangler/data_quality/_create.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
"""AWS Glue Data Quality Create module."""

import logging
import pprint
import uuid
from typing import Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Union, cast

import boto3
import pandas as pd

from awswrangler import _utils, exceptions
from awswrangler._config import apply_configs
from awswrangler.data_quality._utils import (
_create_datasource,
_get_data_quality_results,
_rules_to_df,
_start_ruleset_evaluation_run,
_wait_ruleset_evaluation_run,
_wait_ruleset_run,
)

_logger: logging.Logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -58,9 +61,9 @@ def create_ruleset(
description : str
Ruleset description.
client_token : str, optional
Random id used for idempotency. Will be automatically generated if not provided.
Random id used for idempotency. Is automatically generated if not provided.
boto3_session : boto3.Session, optional
Ruleset description.
Boto3 Session. If none, the default boto3 session is used.

Examples
--------
Expand Down Expand Up @@ -93,7 +96,7 @@ def create_ruleset(
>>> df_rules=df_rules,
>>>)
"""
if df_rules is not None and dqdl_rules:
if (df_rules is not None and dqdl_rules) or (df_rules is None and not dqdl_rules):
raise exceptions.InvalidArgumentCombination("You must pass either ruleset `df_rules` or `dqdl_rules`.")

client_glue: boto3.client = _utils.client(service_name="glue", session=boto3_session)
Expand All @@ -110,8 +113,8 @@ def create_ruleset(
},
ClientToken=client_token if client_token else uuid.uuid4().hex,
)
except client_glue.exceptions.AlreadyExistsException:
raise exceptions.AlreadyExists(f"Ruleset {name} already exists.")
except client_glue.exceptions.AlreadyExistsException as not_found:
raise exceptions.AlreadyExists(f"Ruleset {name} already exists.") from not_found


@apply_configs
Expand Down Expand Up @@ -139,9 +142,9 @@ def update_ruleset(
description : str
Ruleset description.
client_token : str, optional
Random id used for idempotency. Will be automatically generated if not provided.
Random id used for idempotency. Is automatically generated if not provided.
boto3_session : boto3.Session, optional
Ruleset description.
Boto3 Session. If none, the default boto3 session is used.

Examples
--------
Expand All @@ -151,7 +154,7 @@ def update_ruleset(
>>> dqdl_rules="Rules = [ RowCount between 1 and 3 ]",
>>>)
"""
if df_rules is not None and dqdl_rules:
if (df_rules is not None and dqdl_rules) or (df_rules is None and not dqdl_rules):
raise exceptions.InvalidArgumentCombination("You must pass either ruleset `df_rules` or `dqdl_rules`.")

client_glue: boto3.client = _utils.client(service_name="glue", session=boto3_session)
Expand All @@ -165,8 +168,99 @@ def update_ruleset(
Ruleset=dqdl_rules,
ClientToken=client_token if client_token else uuid.uuid4().hex,
)
except client_glue.exceptions.EntityNotFoundException:
raise exceptions.ResourceDoesNotExist(f"Ruleset {name} does not exist.")
except client_glue.exceptions.EntityNotFoundException as not_found:
raise exceptions.ResourceDoesNotExist(f"Ruleset {name} does not exist.") from not_found


@apply_configs
def create_recommendation_ruleset(
database: str,
table: str,
iam_role_arn: str,
name: Optional[str] = None,
catalog_id: Optional[str] = None,
connection_name: Optional[str] = None,
additional_options: Optional[Dict[str, Any]] = None,
number_of_workers: int = 5,
timeout: int = 2880,
client_token: Optional[str] = None,
boto3_session: Optional[boto3.Session] = None,
) -> pd.DataFrame:
"""Create recommendation Data Quality ruleset.

Parameters
----------
database : str
Glue database name.
table : str
Glue table name.
iam_role_arn : str
IAM Role ARN.
name : str, optional
Ruleset name.
catalog_id : str, optional
Glue Catalog id.
connection_name : str, optional
Glue connection name.
additional_options : dict, optional
Additional options for the table. Supported keys:
`pushDownPredicate`: to filter on partitions without having to list and read all the files in your dataset.
`catalogPartitionPredicate`: to use server-side partition pruning using partition indexes in the
Glue Data Catalog.
number_of_workers: int, optional
The number of G.1X workers to be used in the run. The default is 5.
timeout: int, optional
The timeout for a run in minutes. The default is 2880 (48 hours).
client_token : str, optional
Random id used for idempotency. Is automatically generated if not provided.
boto3_session : boto3.Session, optional
Boto3 Session. If none, the default boto3 session is used.

Returns
-------
pd.DataFrame
Data frame with recommended ruleset details.

Examples
--------
>>> import awswrangler as wr

>>> df_recommended_ruleset = wr.data_quality.create_recommendation_ruleset(
>>> database="database",
>>> table="table",
>>> iam_role_arn="arn:...",
>>>)
"""
client_glue: boto3.client = _utils.client(service_name="glue", session=boto3_session)

args: Dict[str, Any] = {
"DataSource": _create_datasource(
database=database,
table=table,
catalog_id=catalog_id,
connection_name=connection_name,
additional_options=additional_options,
),
"Role": iam_role_arn,
"NumberOfWorkers": number_of_workers,
"Timeout": timeout,
"ClientToken": client_token if client_token else uuid.uuid4().hex,
}
if name:
args["CreatedRulesetName"] = name
_logger.debug("args: \n%s", pprint.pformat(args))
run_id: str = cast(str, client_glue.start_data_quality_rule_recommendation_run(**args)["RunId"])

_logger.debug("run_id: %s", run_id)
dqdl_recommended_rules: str = cast(
str,
_wait_ruleset_run(
run_id=run_id,
run_type="recommendation",
boto3_session=boto3_session,
)["RecommendedRuleset"],
)
return _rules_to_df(rules=dqdl_recommended_rules)


@apply_configs
Expand All @@ -178,7 +272,7 @@ def evaluate_ruleset(
database: Optional[str] = None,
table: Optional[str] = None,
catalog_id: Optional[str] = None,
connection: Optional[str] = None,
connection_name: Optional[str] = None,
additional_options: Optional[Dict[str, str]] = None,
additional_run_options: Optional[Dict[str, str]] = None,
client_token: Optional[str] = None,
Expand All @@ -188,35 +282,40 @@ def evaluate_ruleset(

Parameters
----------
name : str
Ruleset name.
name : str or list[str]
Ruleset name or list of names.
iam_role_arn : str
IAM Role.
IAM Role ARN.
number_of_workers: int, optional
The number of G.1X workers to be used in the run. The default is 5.
timeout: int, optional
The timeout for a run in minutes. The default is 2880 (48 hours).
database : str, optional
Glue database name. Database associated with the ruleset will be used if not provided.
table : str, optinal
table : str, optional
Glue table name. Table associated with the ruleset will be used if not provided.
catalog_id : str, optional
Glue Catalog id.
connection : str, optional
Glue connection.
additional_options : Dict[str, str], optional
connection_name : str, optional
Glue connection name.
additional_options : dict, optional
Additional options for the table. Supported keys:
`pushDownPredicate`: to filter on partitions without having to list and read all the files in your dataset.
`catalogPartitionPredicate`: to use server-side partition pruning using partition indexes in the
Glue Data Catalog.
`catalogPartitionPredicate`: to use server-side partition pruning using partition indexes in the
Glue Data Catalog.
additional_run_options : Dict[str, str], optional
Additional run options. Supported keys:
`CloudWatchMetricsEnabled`: whether to enable CloudWatch metrics.
`ResultsS3Prefix`: prefix for Amazon S3 to store results.
client_token : str, optional
Random id used for idempotency. Will be automatically generated if not provided.
boto3_session : boto3.Session, optional
Ruleset description.
Boto3 Session. If none, the default boto3 session is used.

Returns
-------
pd.DataFrame
Data frame with ruleset evaluation results.

Examples
--------
Expand All @@ -231,8 +330,8 @@ def evaluate_ruleset(
>>> table="table",
>>> dqdl_rules="Rules = [ RowCount between 1 and 3 ]",
>>>)
>>> wr.data_quality.evaluate_ruleset(
>>> name="ruleset",
>>> df_ruleset_results = wr.data_quality.evaluate_ruleset(
>>> name=["ruleset1", "rulseset2"],
>>> iam_role_arn=glue_data_quality_role,
>>> )
"""
Expand All @@ -244,12 +343,19 @@ def evaluate_ruleset(
database=database,
table=table,
catalog_id=catalog_id,
connection=connection,
connection_name=connection_name,
additional_options=additional_options,
additional_run_options=additional_run_options,
client_token=client_token if client_token else uuid.uuid4().hex,
boto3_session=boto3_session,
)
_logger.debug("run_id: %s", run_id)
result_ids: List[str] = _wait_ruleset_evaluation_run(run_id=run_id, boto3_session=boto3_session)
result_ids: List[str] = cast(
List[str],
_wait_ruleset_run(
run_id=run_id,
run_type="evaluation",
boto3_session=boto3_session,
)["ResultIds"],
)
return _get_data_quality_results(result_ids=result_ids, boto3_session=boto3_session)
36 changes: 36 additions & 0 deletions awswrangler/data_quality/_get.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""AWS Glue Data Quality Get Module."""

from typing import Optional, cast

import boto3
import pandas as pd

from awswrangler.data_quality._utils import _get_ruleset, _rules_to_df


def get_ruleset(
name: str,
boto3_session: Optional[boto3.Session] = None,
) -> pd.DataFrame:
"""Get a Data Quality ruleset.

Parameters
----------
name : str
Ruleset name.
boto3_session : boto3.Session, optional
Boto3 Session. If none, the default boto3 session is used.

Returns
-------
pd.DataFrame
Data frame with ruleset details.

Examples
--------
>>> import awswrangler as wr

>>> df_ruleset = wr.data_quality.get_ruleset(name="my_ruleset")
"""
rules = cast(str, _get_ruleset(ruleset_name=name, boto3_session=boto3_session)["Ruleset"])
return _rules_to_df(rules=rules)
Loading