Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dataproc metastore operators #18945

Merged
merged 1 commit into from
Nov 7, 2021

Conversation

wojsamjan
Copy link
Contributor

@wojsamjan wojsamjan commented Oct 13, 2021

Add support for Google Dataproc Metastore. Includes operators, hooks, example dags, tests and docs.

Co-authored-by: Wojciech Januszek januszek@google.com
Co-authored-by: Lukasz Wyszomirski wyszomirski@google.com
Co-authored-by: Maksim Yermakou maksimy@google.com


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@boring-cyborg boring-cyborg bot added area:providers kind:documentation provider:google Google (including GCP) related issues labels Oct 13, 2021
@lwyszomi lwyszomi force-pushed the dataproc-metastore-operators branch 2 times, most recently from 7347897 to c1982ee Compare October 13, 2021 14:00
# [END how_to_cloud_dataproc_metastore_create_metadata_import]


with models.DAG("example_gcp_dataproc_metastore", start_date=days_ago(1), schedule_interval="@once") as dag:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an ongoing effort to update the example DAGs in Airflow to transition away from days_ago(n) for start_date to a static value, improve default_args use, etc. when applicable. For new DAGs, let's follow suit for using a static start_date value as it is best practice.

Comment on lines +105 to +110
region=REGION,
project_id=PROJECT_ID,
service_id=SERVICE_ID,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like project_id, region, and service_id are passed to all of the operators with the same value. This would be a great opportunity for adding these to default_args and removing them from each of the individual operators.

You may want to update the operator docs to make a note that these args have been passed used in default_args of the DAG so it's transparent why some args are obviously missing from the operator instantiations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding function arguments is not a good idea in this case as it makes it difficult to understand the examples in the generated documentation. The guides are written in such a way that everyone can copy the fragment they need. If we use default_args, copying will be difficult.

Copy link
Contributor

@josh-fell josh-fell Oct 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mik-laj WDYT about this being a case where the documentation could have explicit code blocks rather than reference the example DAG? Seems like folks would look at the example DAG in its entirety, as a demonstration of a use case, while the documentation serves as the individual operator information.

I'm not saying we should never reference the example DAG in the operator docs but there might be cases where the deference to having a solid, well-written example DAG (especially ones showcasing a nice feature that simplifies authoring) makes sense.

Copy link
Member

@mik-laj mik-laj Oct 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@josh-fell The example DAG files in the airflow/providers/google directory are automatically tested so we are sure they are working correctly. If we use blocks of code, it will be difficult for us to provide a similar level of confidence and trust. See: https://github.com/apache/airflow/blob/c1982eecad75c68e52a76088eb90ce15b9326436/tests/providers/google/cloud/operators/test_dataproc_metastore_system.py

Use default_args makes sense in the examples DAG in the core - airflow/example_dags. The examples in airflow/providers should first of all show how to use operators, not fancy features.
WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mik-laj ❤️ the system tests - the Google provider is absolutely impressive - but adding the default_args shouldn't break the system test right? The DAG should still execute successfully unless I missing something in the automated testing. Or is some of the documentation/test cases automated as well and not having the full operator definition in the example DAG breaks the automation?

I'm thinking the variables that are imported for the system test in the DAG could still be defined where they are (so they remain accessible via import) and add default_args={"region": REGION, "project_id": PROJECT_ID, "service_id": SERVICE_ID} in the DAG constructor + the explicit code blocks in the operator docs.

Use default_args makes sense in the examples DAG in the core - airflow/example_dags. The examples in airflow/providers should first of all show how to use operators, not fancy features.

Yes I agree. The provider example DAGs should certainly show how to use the operators and ideally in some useful context/use case but should also demo a real-world way of how the DAG could be authored as well. I'll happily defer to whomever is maintaining the providers for this though. Just thought it was an opportunity for some legitimate use around default_args and document it.

Copy link
Member

@mik-laj mik-laj Oct 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will work as long as the task definition is in a separate Python file. To add examples to documentation, we use example-include directive, To allow us to keep docs and example dags in sync

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_dataproc_create_cluster_operator]
:end-before: [END how_to_cloud_dataproc_create_cluster_operator]

Alternatively, we can also use the code-block directive, but this means that the example will not be tested and executed on CI. If it is not running on a CI, the examples very quickly become out-of-date and are more of a burden to maintain.
.. code-block:: python
t1.set_downstream(t2)
# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)
# The bit shift operator can also be
# used to chain operations:
t1 >> t2
# And the upstream dependency with the
# bit shift operator:
t2 << t1
# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3
# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

For example:
In this PR, I deprecated default value of pod_name parameter in EKSCreateClusterOperator. This meant that if someone loads the original example file, DeprecationWarning will be displayed. But we also detect such problems automatically on CI, and we won't allow a merge PR that doesn't update the examples. This makes these examples much easier to maintain. We don't have to worry if all the examples have been updated to show the best practices because we detect all such problems automatically and if something still needs to be changed, we will inform everyone on the CI. All this without the involvement of any of the project maintainers.

If we used the code-block directive, we would have to find a way to detect similar problems automatically as well, or the examples would be out of date very quickly.

This is one example, but we have other automated checks that allow us to take care of the high quality of documentation and examples

Just thought it was an opportunity for some legitimate use around default_args and document it.

I agree that this is a good use for default_args, but given the problems with using these examples in the documentation, we don't necessarily have to do in all cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mik-laj Thanks for the context and explanation!

@wojsamjan Please ignore this default_args comment given the details explained above.

Comment on lines 196 to 202
create_service >> update_service >> get_service_details

get_service_details >> backup_service >> list_backups >> restore_service >> delete_backup

delete_backup >> export_metadata >> import_metadata

import_metadata >> delete_service
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT about using airflow.models.baseoperator.chain() here? It may be more readable to do something like

chain(
    create_service,
    update_service,
    get_service_details,
    backup_service,
    list_backups,
    restore_service,
    delete_backup,
    export_metadata,
    import_metadata,
    delete_service,
)

SERVICE_ID = os.environ.get("GCP_DATAPROC_METASTORE_SERVICE_ID", "dataproc-metastore-system-tests-service-1")
BACKUP_ID = os.environ.get("GCP_DATAPROC_METASTORE_BACKUP_ID", "dataproc-metastore-system-tests-backup-1")
REGION = os.environ.get("GCP_REGION", "<REGION>")
BUCKET = os.environ.get("GCP_DATAPROC_METASTORE_BUCKET", "dataproc-metastore-system-tests")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For security reasons, we should use bucket names that are invalid and impossible to create, e.g. contain special characters.

)
return result

def create_metadata_import(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add @GoogleBaseHook.fallback_to_default_project_id decorator?

Copy link
Contributor Author

@wojsamjan wojsamjan Oct 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I decorate another Hook´s methods for services and backup as well? @mik-laj

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All public method of hook that accepts project ID parameter.

Comment on lines 50 to 53
for time_to_wait in exponential_sleep_generator(initial=10, maximum=120):
sleep(time_to_wait)
if operation.done():
break
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@wojsamjan wojsamjan Oct 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:param request: The request object. Request message for
[DataprocMetastore.DeleteService][google.cloud.metastore.v1.DataprocMetastore.DeleteService].
:type request: google.cloud.metastore_v1.types.DeleteServiceRequest
:param project_id: TODO: Fill description
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:param project_id: TODO: Fill description
:param project_id: Required. The ID of the Google Cloud project that the service belongs to.


def wait_for_operation(self, timeout: float, operation: Operation):
"""Waits for long-lasting operation to complete."""
error = operation.exception(timeout=timeout)
Copy link
Contributor

@lwyszomi lwyszomi Oct 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wojsamjan This is not clear what happens here (from logic perspective 😄 ) maybe to make code more readable we can use this sample:

try:
    return operation.result(timeout=timeout)
except Exception:
    error = operaion.exception()
    raise AirflowException(error)

@mik-laj
Copy link
Member

mik-laj commented Oct 21, 2021

These problems looks related to this contribution

==== Providers postgres: 13 failures ====

tests/providers/google/cloud/hooks/test_dataproc_metastore.py::TestDataprocMetastoreWithDefaultProjectIdHook::test_create_service: AssertionError: Expected call: create_service(metadata=None, request={'parent': 'projects/test-project-id/locations/test-region', 'service_id': 'test-service-id', 'service': {}, 'request_id': None}, retry=None, timeout=None)
Actual call: create_service(metadata=(), request={'parent': 'projects/test-project-id/locations/test-region', 'service_id': 'test-service-id', 'service': {'name': 'test-service'}, 'request_id': None}, retry=None, timeout=None)
tests/providers/google/cloud/hooks/test_dataproc_metastore.py::TestDataprocMetastoreWithDefaultProjectIdHook::test_update_service: google.auth.exceptions.DefaultCredentialsError: Could not automatically determine credentials. Please set GOOGLE_APPLICATION_CREDENTIALS or explicitly create credentials and re-run the application. For more information, please see https://cloud.google.com/docs/authentication/getting-started
tests/providers/google/cloud/hooks/test_dataproc_metastore.py::TestDataprocMetastoreWithoutDefaultProjectIdHook::test_create_service: AssertionError: Expected call: create_service(metadata=None, request={'parent': 'projects/test-project-id/locations/test-region', 'service_id': 'test-service-id', 'service': {'name': 'test-service'}, 'request_id': None}, retry=None, timeout=None)
Actual call: create_service(metadata=(), request={'parent': 'projects/test-project-id/locations/test-region', 'service_id': 'test-service-id', 'service': {'name': 'test-service'}, 'request_id': None}, retry=None, timeout=None)
tests/providers/google/cloud/hooks/test_dataproc_metastore.py::TestDataprocMetastoreWithoutDefaultProjectIdHook::test_update_service: google.auth.exceptions.DefaultCredentialsError: Could not automatically determine credentials. Please set GOOGLE_APPLICATION_CREDENTIALS or explicitly create credentials and re-run the application. For more information, please see https://cloud.google.com/docs/authentication/getting-started
tests/providers/google/cloud/operators/test_dataproc_metastore.py::TestDataprocMetastoreCreateBackupOperator::test_assert_valid_hook_call: TypeError: <MagicMock name='DataprocMetastoreHook().wait_for_operation()' id='140010347401960'> is not an instance of Backup
tests/providers/google/cloud/operators/test_dataproc_metastore.py::TestDataprocMetastoreCreateMetadataImportOperator::test_assert_valid_hook_call: TypeError: <MagicMock name='DataprocMetastoreHook().wait_for_operation()' id='140010430133976'> is not an instance of MetadataImport
tests/providers/google/cloud/operators/test_dataproc_metastore.py::TestDataprocMetastoreCreateServiceOperator::test_execute: TypeError: <MagicMock name='DataprocMetastoreHook().wait_for_operation()' id='140010332321160'> is not an instance of Service
tests/providers/google/cloud/operators/test_dataproc_metastore.py::TestDataprocMetastoreDeleteBackupOperator::test_assert_valid_hook_call: AssertionError: Expected call: DataprocMetastoreHook(gcp_conn_id='test-gcp-conn-id')
Actual call: DataprocMetastoreHook(gcp_conn_id='test-gcp-conn-id', impersonation_chain=None)
tests/providers/google/cloud/operators/test_dataproc_metastore.py::TestDataprocMetastoreExportMetadataOperator::test_assert_valid_hook_call: airflow.exceptions.AirflowException: Argument ['service_id'] is required
tests/providers/google/cloud/operators/test_dataproc_metastore.py::TestDataprocMetastoreGetServiceOperator::test_execute: TypeError: <MagicMock name='DataprocMetastoreHook().get_service()' id='140010395853488'> is not an instance of Service
tests/providers/google/cloud/operators/test_dataproc_metastore.py::TestDataprocMetastoreListBackupsOperator::test_assert_valid_hook_call: AssertionError: Expected call: DataprocMetastoreHook(gcp_conn_id='test-gcp-conn-id')
Actual call: DataprocMetastoreHook(gcp_conn_id='test-gcp-conn-id', impersonation_chain=None)
tests/providers/google/cloud/operators/test_dataproc_metastore.py::TestDataprocMetastoreRestoreServiceOperator::test_assert_valid_hook_call: Failed: Timeout >60.0s
tests/providers/google/cloud/operators/test_dataproc_metastore.py::TestDataprocMetastoreUpdateServiceOperator::test_assert_valid_hook_call: AssertionError: Expected call: DataprocMetastoreHook(gcp_conn_id='test-gcp-conn-id')
Actual call: DataprocMetastoreHook(gcp_conn_id='test-gcp-conn-id', impersonation_chain=None)
Error: Process completed with exit code 1.

https://github.com/apache/airflow/pull/18945/checks?check_run_id=3953505880

Run codespell to check for common misspellings in files..................................Failed
- hook id: codespell
- exit code: 65

airflow/providers/google/cloud/operators/dataproc_metastore.py:46: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/operators/dataproc_metastore.py:60: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/operators/dataproc_metastore.py:168: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/operators/dataproc_metastore.py:182: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/operators/dataproc_metastore.py:281: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/operators/dataproc_metastore.py:385: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/operators/dataproc_metastore.py:393: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/operators/dataproc_metastore.py:549: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/operators/dataproc_metastore.py:664: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/operators/dataproc_metastore.py:743: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/operators/dataproc_metastore.py:834: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/operators/dataproc_metastore.py:849: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/operators/dataproc_metastore.py:974: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/hooks/dataproc_metastore.py:76: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/hooks/dataproc_metastore.py:90: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/hooks/dataproc_metastore.py:142: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/hooks/dataproc_metastore.py:156: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/hooks/dataproc_metastore.py:214: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/hooks/dataproc_metastore.py:266: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/hooks/dataproc_metastore.py:274: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/hooks/dataproc_metastore.py:322: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/hooks/dataproc_metastore.py:378: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/hooks/dataproc_metastore.py:431: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/hooks/dataproc_metastore.py:480: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/hooks/dataproc_metastore.py:558: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/hooks/dataproc_metastore.py:573: alpha-numeric ==> alphanumeric
airflow/providers/google/cloud/hooks/dataproc_metastore.py:631: alpha-numeric ==> alphanumeric

https://github.com/apache/airflow/pull/18945/checks?check_run_id=3953505494

@lwyszomi lwyszomi force-pushed the dataproc-metastore-operators branch 2 times, most recently from 7cf85d9 to af8d839 Compare October 21, 2021 13:47
@lwyszomi
Copy link
Contributor

@mik-laj Hey I fixed style errors and failing tests, now only failied Tests / Quarantined tests (pull_request) step, I think this is not related.

@wojsamjan wojsamjan closed this Oct 26, 2021
@wojsamjan wojsamjan reopened this Oct 26, 2021
@lwyszomi lwyszomi force-pushed the dataproc-metastore-operators branch 3 times, most recently from fde5b2b to 74da77b Compare November 2, 2021 08:39
* Add generated files for Dataproc Metastore

* Create DataprocMetastore hooks

* Create CreataService, GetService and DeleteService operators

* Create CreateBackup, DeleteBackup, ListBackups and CreateMetadataImport operators

* Create ExportMetadata, RestoreService, UpdateService operators

* wait for operation

* Fix DataprocMetastoreUpdateServiceOperator and DataprocMetastoreCreateMetadataImportOperator

* Fixes for Operators

* Added workaround to check MetadataExport status

* Create docs, system tests. Prepare operators, hooks, dags

* Pass region to client method

Co-authored-by: Wojciech Januszek <januszek@google.com>
Co-authored-by: MaksYermak <maksimy@google.com>
Co-authored-by: Wyszomirski Lukasz <wyszomirski@google.com>
@lwyszomi
Copy link
Contributor

lwyszomi commented Nov 3, 2021

@mik-laj @josh-fell @SamWheating finally all checks passed, is there a possibility to merge new operators?

@potiuk potiuk merged commit 26ad55b into apache:main Nov 7, 2021
@potiuk potiuk added the changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) label Jan 22, 2022
@potiuk potiuk added this to the Airflow 2.2.4 milestone Jan 22, 2022
potiuk pushed a commit that referenced this pull request Jan 22, 2022
jedcunningham pushed a commit that referenced this pull request Jan 27, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) kind:documentation provider:google Google (including GCP) related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants