Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate Google example trino_to_gcs to new design AIP-47 #25420

Merged
merged 1 commit into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ All parameters are described in the reference documentation - :class:`~airflow.p

An example operator call might look like this:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_trino_to_gcs.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_trino_to_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_operator_trino_to_gcs_basic]
Expand All @@ -67,7 +67,7 @@ You can specify these options by the ``export_format`` parameter.

If you want a CSV file to be created, your operator call might look like this:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_trino_to_gcs.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_trino_to_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_operator_trino_to_gcs_csv]
Expand All @@ -81,7 +81,7 @@ will be dumped from the database and upload to the bucket.

If you want to create a schema file, then an example operator call might look like this:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_trino_to_gcs.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_trino_to_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_operator_trino_to_gcs_multiple_types]
Expand All @@ -102,7 +102,7 @@ maximum allowed file size for a single object.

If you want to create 10 MB files, your code might look like this:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_trino_to_gcs.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_trino_to_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_operator_read_data_from_gcs_many_chunks]
Expand All @@ -123,7 +123,7 @@ For example, if you want to create an external table that allows you to create q
read data directly from GCS, then you can use :class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryCreateExternalTableOperator`.
Using this operator looks like this:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_trino_to_gcs.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_trino_to_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_operator_create_external_table_multiple_types]
Expand Down
158 changes: 0 additions & 158 deletions tests/providers/google/cloud/transfers/test_trino_to_gcs_system.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,14 @@
BigQueryInsertJobOperator,
)
from airflow.providers.google.cloud.transfers.trino_to_gcs import TrinoToGCSOperator
from airflow.utils.trigger_rule import TriggerRule

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_trino_to_gcs"

GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", 'example-project')
GCS_BUCKET = os.environ.get("GCP_TRINO_TO_GCS_BUCKET_NAME", "INVALID BUCKET NAME")
DATASET_NAME = os.environ.get("GCP_TRINO_TO_GCS_DATASET_NAME", "test_trino_to_gcs_dataset")
GCS_BUCKET = f"bucket_{DAG_ID}_{ENV_ID}"
DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"

SOURCE_MULTIPLE_TYPES = "memory.default.test_multiple_types"
SOURCE_CUSTOMER_TABLE = "tpch.sf1.customer"
Expand All @@ -47,17 +51,19 @@ def safe_name(s: str) -> str:


with models.DAG(
dag_id="example_trino_to_gcs",
dag_id=DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
tags=["example", "gcs"],
) as dag:

create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create-dataset", dataset_id=DATASET_NAME)

delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
task_id="delete_dataset",
dataset_id=DATASET_NAME,
delete_contents=True,
trigger_rule=TriggerRule.ALL_DONE,
)

# [START howto_operator_trino_to_gcs_basic]
Expand Down Expand Up @@ -179,15 +185,29 @@ def safe_name(s: str) -> str:
)
# [END howto_operator_trino_to_gcs_csv]

create_dataset >> trino_to_gcs_basic
create_dataset >> trino_to_gcs_multiple_types
create_dataset >> trino_to_gcs_many_chunks
create_dataset >> trino_to_gcs_csv
(
# TEST SETUP
create_dataset
# TEST BODY
>> trino_to_gcs_basic
>> trino_to_gcs_multiple_types
>> trino_to_gcs_many_chunks
>> trino_to_gcs_csv
>> create_external_table_multiple_types
>> create_external_table_many_chunks
>> read_data_from_gcs_multiple_types
>> read_data_from_gcs_many_chunks
# TEST TEARDOWN
>> delete_dataset
)

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()

trino_to_gcs_multiple_types >> create_external_table_multiple_types >> read_data_from_gcs_multiple_types
trino_to_gcs_many_chunks >> create_external_table_many_chunks >> read_data_from_gcs_many_chunks
from tests.system.utils import get_test_run # noqa: E402

trino_to_gcs_basic >> delete_dataset
trino_to_gcs_csv >> delete_dataset
read_data_from_gcs_multiple_types >> delete_dataset
read_data_from_gcs_many_chunks >> delete_dataset
# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)