Adding GreatExpectationsBigQueryOperator#11113
Adding GreatExpectationsBigQueryOperator#11113brian-lavery wants to merge 10 commits intoapache:masterfrom
Conversation
| def __init__(self, gcp_project, expectations_file_name, gcs_bucket, gcs_expectations_prefix, | ||
| gcs_validations_prefix, gcs_datadocs_prefix, validation_type, validation_type_input, | ||
| bq_dataset_name, email_to, send_alert_email=True, include_datadocs_link_in_email=False, | ||
| fail_if_expectations_not_met=True, | ||
| bigquery_conn_id='bigquery_default', | ||
| *args, **kwargs): |
There was a problem hiding this comment.
Please apply pre-commit run black to this file. We started using black formatter for providers package. Also please add type hints and as only keywords are allowed in operators we should remove *args and use the following signature __init__(self, *, gcp_project, ..., **kwargs) In this way python will know that only keywords are allowed (take a look at other operators).
| *args, **kwargs): | ||
| self.expectations_file_name = expectations_file_name | ||
| if validation_type.upper() not in GreatExpectationsValidations: | ||
| raise AirflowException("argument 'validation_type' must be one of %r." % great_expectations_valid_type) |
| datasources={ | ||
| "bq_datasource": { | ||
| "credentials": { | ||
| "url": "bigquery://" + connection_json[ | ||
| 'extra__google_cloud_platform__project'] + "/" + self.bq_dataset_name + "?credentials_path=" + | ||
| connection_json['extra__google_cloud_platform__key_path'] | ||
| }, | ||
| "class_name": "SqlAlchemyDatasource", | ||
| "module_name": "great_expectations.datasource", | ||
| "data_asset_type": { | ||
| "module_name": "great_expectations.dataset", | ||
| "class_name": "SqlAlchemyDataset" | ||
| } | ||
| } | ||
| }, | ||
| expectations_store_name="expectations_GCS_store", | ||
| validations_store_name="validations_GCS_store", | ||
| evaluation_parameter_store_name="evaluation_parameter_store", | ||
| plugins_directory=None, | ||
| validation_operators={ | ||
| "action_list_operator": { | ||
| "class_name": "ActionListValidationOperator", | ||
| "action_list": [ | ||
| { | ||
| "name": "store_validation_result", | ||
| "action": {"class_name": "StoreValidationResultAction"}, | ||
| }, | ||
| { | ||
| "name": "store_evaluation_params", | ||
| "action": {"class_name": "StoreEvaluationParametersAction"}, | ||
| }, | ||
| { | ||
| "name": "update_data_docs", | ||
| "action": {"class_name": "UpdateDataDocsAction"}, | ||
| }, | ||
| ], | ||
| } | ||
| }, | ||
| stores={ | ||
| 'expectations_GCS_store': { | ||
| 'class_name': 'ExpectationsStore', | ||
| 'store_backend': { | ||
| 'class_name': 'TupleGCSStoreBackend', | ||
| 'project': self.gcp_project, | ||
| 'bucket': self.gcs_bucket, | ||
| 'prefix': self.gcs_expectations_prefix | ||
| } | ||
| }, | ||
| 'validations_GCS_store': { | ||
| 'class_name': 'ValidationsStore', | ||
| 'store_backend': { | ||
| 'class_name': 'TupleGCSStoreBackend', | ||
| 'project': self.gcp_project, | ||
| 'bucket': self.gcs_bucket, | ||
| 'prefix': self.gcs_validations_prefix | ||
| } | ||
| }, | ||
| "evaluation_parameter_store": {"class_name": "EvaluationParameterStore"}, | ||
| }, | ||
| data_docs_sites={ | ||
| "GCS_site": { | ||
| "class_name": "SiteBuilder", | ||
| "store_backend": { | ||
| "class_name": "TupleGCSStoreBackend", | ||
| "project": self.gcp_project, | ||
| "bucket": self.gcs_bucket, | ||
| 'prefix': self.gcs_datadocs_prefix | ||
| }, | ||
| "site_index_builder": { | ||
| "class_name": "DefaultSiteIndexBuilder", | ||
| }, | ||
| } | ||
| }, | ||
| config_variables_file_path=None, | ||
| commented_map=None, | ||
| ) |
There was a problem hiding this comment.
Do I correctly understand that those are configuration for Great Expectations? If yes, do they allow users to pass all possible configuration arguments? Currently, we prefer (at least in GCP ops) interfaces that requires users to pass dictionaries instead of individual keys.
There was a problem hiding this comment.
Yes Tomek, I already played around with GE for some time but never directly used the DataContextConfig.
This might be a good option to use instead of having to build a whole file structure through great_expectations init and having to manage those yaml files. I can see that work, but I would give the caller of this operator more control about the sections like datasources, validation_operators, stores and data_docs_sites.
I've already read @dlamblin comment on adding more "abstraction layers" when we want to introduce GE to the project. I am all for it. I think this would make the integration in for example BigQuery a lot easier if we have such a base GE operator.
| full_name = name_start + ''.join(random.choices(string.ascii_uppercase + | ||
| string.digits, k=5)) |
There was a problem hiding this comment.
How about using uuid? We can shorten it if necessary.
| def send_alert(self, data_docs_url): | ||
| if self.include_datadocs_link_in_email: | ||
| # From an Airflow variable set by the user, get the domain name of the service serving the data docs. | ||
| domain = Variable.get("great_expectations_datadocs_domain") |
There was a problem hiding this comment.
This value should be configurable on operator level
| results = ' See the following GCS location for results:' + parsed.path | ||
| email_content = ''' | ||
| <html> | ||
| <head> | ||
| <meta charset="utf-8"> | ||
| </head> | ||
| <body style="background-color: #fafafa; font-family: Roboto, sans-serif=;"> | ||
| <div style="width: 600px; margin:0 auto;"> | ||
| <div style="background-color: white; border-top: 4px solid #22a667; border-left: 1px solid #eee; border-right: 1px solid #eee; border-radius: 6px 6px 0 0; height: 24px;"></div> | ||
| <div style="background-color: white; border-left: 1px solid #eee; border-right: 1px solid #eee; padding: 0 24px; overflow: hidden;"> | ||
| <div style="margin-left: 35px;"> | ||
| Great Expectations Alert<br> | ||
| One or more data expectations were not met in the {0} file. {1} | ||
| </div> | ||
| </body> | ||
| </html> | ||
| '''.format(self.expectations_file_name, results) | ||
| send_email(self.email_to, 'expectations in ' + self.expectations_file_name + ' not met', email_content, | ||
| files=None, cc=None, bcc=None, | ||
| mime_subtype='mixed', mime_charset='us_ascii') | ||
|
|
There was a problem hiding this comment.
How about extracting this string to a top level as a constant? This should improve readability of operator logic
| from google.api_core.exceptions import Conflict | ||
| from google.cloud.bigquery import TableReference | ||
|
|
||
| from great_expectations.data_context.types.base import DataContextConfig |
There was a problem hiding this comment.
The dependency will need to get added as an extra to setup.py
There was a problem hiding this comment.
I'm wondering how we should approach it. GE should not be a requirement for other Google operators, on the other hand having separate package for GE doesn't sounds like a good way. Unless we make this operator generic and provider neutral
| self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id, location=self.location) | ||
|
|
||
|
|
||
| class GreatExpectationsBigQueryOperator(BaseOperator): |
There was a problem hiding this comment.
I'm not much of a committer so take this with a grain of salt as a user suggestion, but:
I'd like to see this broken into two classes, GreatExpectationsBaseOperator, and GreatExpectationsBigQueryOperator.
The former could have a method _configure_project(…) composed of calls to _configure_datasouces(…) _configure_stores(…) and _configure_docs(…) which would effectively throw a not implemented exception. Its init would not mention the gcp specific params but still cover the validations, email, and other parameters that are general. The latter could then implement these methods to build the config used in the base's execute method. Its init could call super with kwargs. This would leave it open to users if they wanted to run expectations on one of the other supported datasources to implement something that could.
I'm a little unsure of how you could support EG a user that wants to DQ check a mysql db and have the stores and docs in gcp vs checking a mysql db with the stores and docs on aws. I almost want to recommend mixins, but am unsure.
There was a problem hiding this comment.
So this, sounds like an answer to #11113 (comment) - I would be in favor of having generic GE. I'm not familiar with this tool but from #11113 (comment) it look like we should be able to create one
There was a problem hiding this comment.
Ok, so we'll work on creating a GreatExpectationsBaseOperator and extending it using the GreatExpectationsBigQueryOperator.
Co-authored-by: Tomek Urbaszek <turbaszek@gmail.com>
Co-authored-by: Tomek Urbaszek <turbaszek@gmail.com>
Co-authored-by: Tomek Urbaszek <turbaszek@gmail.com>
Co-authored-by: Tomek Urbaszek <turbaszek@gmail.com>
Co-authored-by: Tomek Urbaszek <turbaszek@gmail.com>
|
The Build Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*. |
|
@mrrobby could you jump in on this PR? |
| :type fail_if_expectations_not_met: boolean | ||
| """ | ||
|
|
||
| _EMAIL_CONTENT = ''' |
There was a problem hiding this comment.
@turbaszek , as suggested, the email content was made a constant
| ''' | ||
|
|
||
| @apply_defaults | ||
| def __init__(self, *, gcp_project, expectations_file_name, gcs_bucket, gcs_expectations_prefix, |
There was a problem hiding this comment.
@turbaszek , as you can see, I did not yet format the file but I will when I check in changes with the tests.
| from airflow.models import BaseOperator | ||
|
|
||
|
|
||
| class GreatExpectationsBaseOperator(BaseOperator): |
There was a problem hiding this comment.
@turbaszek , this is the new GreatExpectationsBaseOperator. It's really just an interface right now. That was the approach recommended by the GE team because they are planning to do a big refactor and it's not worth trying to move any generic functionality into the base operator until the refactor is done.
There was a problem hiding this comment.
Then I would say there's no need for such an operator - it gives us nothing currently. We should add the operator / hook only if it brings some value, for example creates repository of reusable methods. In this case, this may only make future refactoring harder.
There was a problem hiding this comment.
Just to chime in, one challenge we're looking at is that GE can basically work with any kind of backend, and we have a lot of folks who've built their own Airflow operators already. Even if the base operator isn't super helpful as part of this PR, I can see us extending it to work with other backends very quickly.
I think this would be my biggest question for @ryw and @mrrobby. I'm thinking the base operator could implement a generic load data context + load batch + validation flow similar to a draft PR we received: https://github.com/apache/airflow/pull/11113/files
There was a problem hiding this comment.
the base operator could implement a generic load data context + load batch + validation
I agree with that, the base operator should implement methods that can be reused in other more specific operators. It should also implement some error handling as well as auth part if necessary
|
|
||
| self.expectations_file_name = expectations_file_name | ||
| if validation_type.upper() not in GreatExpectationsValidations: | ||
| raise AirflowException(f"argument 'validation_type' must be one of {great_expectations_valid_type}") |
There was a problem hiding this comment.
@turbaszek , as suggested, changed this to use f-strings.
| @apply_defaults | ||
| def __init__(self, *, gcp_project, expectations_file_name, gcs_bucket, gcs_expectations_prefix, | ||
| gcs_validations_prefix, gcs_datadocs_prefix, validation_type, validation_type_input, | ||
| bq_dataset_name, email_to, datadocs_domain='none', send_alert_email=True, datadocs_link_in_email=False, |
There was a problem hiding this comment.
@turbaszek , as suggested, made the datadocs domain a parameter instead of setting it from an Airflow variable.
| def get_temp_table_name(self): | ||
| now = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') | ||
| name_start = "temp_great_expectations_" + now + '_' | ||
| full_name = name_start + ''.join(str(uuid.uuid4().hex)) |
There was a problem hiding this comment.
@turbaszek , as suggested, changed the temp table name to be generated based on a uuid.
| } | ||
| }, | ||
| stores={ | ||
| 'expectations_GCS_store': { |
There was a problem hiding this comment.
I feel like setting the expectations store and the validations store (or all of the data context at that) should be abstracted away from here somehow, perhaps by creating a great expectations hook. That would allow to, say, use a gcs bucket for these stores and create a snowflake great expectations operator perhaps.
datasource = ge_gcs_hook.create_bigquery_datasource(bigquery_conn_id, bq_dataset_name, other_params)
data_context = ge_gcs_hook.create_data_context(datasource, gcs_conn_id, gcs_bucket, gcs_validations_prefix, gcs_expectations_prefix)so we can also have something like
# Base hook sets abstract on data context
custom_datasource = ge_hook.create_custom_datasource(conn_id, dataset_name, class_name, module_name) Something like that
| from airflow.providers.greatexpectations.operators.greatexpectations_base import GreatExpectationsBaseOperator | ||
| from great_expectations.data_context.types.base import DataContextConfig | ||
| from great_expectations.data_context import BaseDataContext |
There was a problem hiding this comment.
+1 for moving all great_expectations imports to its provider package.
| and where the validation files & data docs will be output (e.g. HTML docs showing if the data matches | ||
| expectations). | ||
| :type gcp_project: str | ||
| :param expectations_file_name: The name of the JSON file containing the expectations for the data. |
There was a problem hiding this comment.
Just noticed this here, is there a specific reason for using the filename rather than the expectation suite name? I think that would be more in line with how we've been referring to expectation suites, and you should be able to load the expectation suite from the context by name.
Co-authored-by: Tomek Urbaszek <turbaszek@gmail.com>
… a GreatExpectationsHook and extended it with a GreatExpectationsBigQueryGCSHook to handle the Google specific connection details 2) moved the GreatExpectationsBigQueryOperator from under providers/google/cloud/operators to providers/greatexpectations/operators 3) made the email alerting function more generic and moved it under the GreatExpectationsBaseOperator. Again, there are no tests included yet because there is a lot of discussion about more refactoring still happening.
| a sql query that will return a single row. Each value on that | ||
| first row is evaluated using python ``bool`` casting. If any of the | ||
| values return ``False`` the check is failed and errors out. | ||
|
|
There was a problem hiding this comment.
Hey @brian-lavery can you also revert the changes made to this file, please?
|
Hi all, I just wanted to give a quick update! After some conversations internally with the Great Expectations team, we figured it would make the most sense for us to submit the base operator that actually has quite solid functionality. Any more specific operators like this one here can then inherit from it. I discussed that with Brian and he's on board. Apologies for the back and forth, this has been going through a few revisions and we've finally settled on a path. |
|
@turbaszek and @feluelle , given that @spbail and the GE team will be submitting a new base operator for GE, I'm going to close this pull request. I'll open a new one for the GreatExpectationsBigQueryOperator after Sam submits her pull request and extend her base operator to create the GreatExpectationsBigQueryOperator. |
This is my second attempt to submit a request to add the GreatExpectationsBigQueryOperator to Airflow. There were some problems with my first attempt.
I'm probably bending protocol a bit because this is what I call a 'preliminary' pull request. Before going to the effort of writing the tests for the new operator and submitting a more refined pull request, I need feedback about the viability of this operator being accepted into Airflow and any other major flaws I need to address.
I'll start with some background. What is Great Expectations(GE)? It's an open source python project that automates data QA testing and data profiling. A user's data 'expectations' can be laid out in a json file and GE will check those expectations against a table or query result by firing off a series of SQL queries. Results of the checks are output in json and html files. I've created a short video to explain how GE and the operator work.
I'm looking for any feedback I can get but I have three immediate questions:
Thanks,
Brian