Skip to content

Commit

Permalink
Clean-up of google cloud example dags - batch 2 (#19527)
Browse files Browse the repository at this point in the history
- Use static start_date
- Use catchup=False
- Tidy up the chaining of tasks in some cases
- Remove unnecessary specification of default conn ids
  • Loading branch information
o-nikolas committed Nov 15, 2021
1 parent 4c495ca commit 6ef44b6
Show file tree
Hide file tree
Showing 25 changed files with 164 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
'retry_delay': timedelta(minutes=5),
},
schedule_interval='@once',
start_date=datetime(2018, 11, 1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
# [START howto_operator_azure_fileshare_to_gcs_basic]
Expand All @@ -46,8 +47,6 @@
share_name=AZURE_SHARE_NAME,
dest_gcs=DEST_GCS_BUCKET,
directory_name=AZURE_DIRECTORY_NAME,
azure_fileshare_conn_id='azure_fileshare_default',
gcp_conn_id='google_cloud_default',
replace=False,
gzip=True,
google_impersonation_chain=None,
Expand Down
16 changes: 12 additions & 4 deletions airflow/providers/google/cloud/example_dags/example_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@
"""

import os
from datetime import datetime

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.compute import (
ComputeEngineSetMachineTypeOperator,
ComputeEngineStartInstanceOperator,
ComputeEngineStopInstanceOperator,
)
from airflow.utils.dates import days_ago

# [START howto_operator_gce_args_common]
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
Expand All @@ -52,7 +53,8 @@
with models.DAG(
'example_gcp_compute',
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
# [START howto_operator_gce_start]
Expand Down Expand Up @@ -96,5 +98,11 @@
)
# [END howto_operator_gce_set_machine_type_no_project_id]

gce_instance_start >> gce_instance_start2 >> gce_instance_stop >> gce_instance_stop2
gce_instance_stop2 >> gce_set_machine_type >> gce_set_machine_type2
chain(
gce_instance_start,
gce_instance_start2,
gce_instance_stop,
gce_instance_stop2,
gce_set_machine_type,
gce_set_machine_type2,
)
14 changes: 10 additions & 4 deletions airflow/providers/google/cloud/example_dags/example_compute_igm.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@
"""

import os
from datetime import datetime

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.compute import (
ComputeEngineCopyInstanceTemplateOperator,
ComputeEngineInstanceGroupUpdateManagerTemplateOperator,
)
from airflow.utils.dates import days_ago

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCE_ZONE = os.environ.get('GCE_ZONE', 'europe-west1-b')
Expand Down Expand Up @@ -92,7 +93,8 @@
with models.DAG(
'example_gcp_compute_igm',
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
# [START howto_operator_gce_igm_copy_template]
Expand Down Expand Up @@ -133,5 +135,9 @@
)
# [END howto_operator_gce_igm_update_template_no_project_id]

gce_instance_template_copy >> gce_instance_template_copy2 >> gce_instance_group_manager_update_template
gce_instance_group_manager_update_template >> gce_instance_group_manager_update_template2
chain(
gce_instance_template_copy,
gce_instance_template_copy2,
gce_instance_group_manager_update_template,
gce_instance_group_manager_update_template2,
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
# under the License.

import os
from datetime import datetime

from airflow import models
from airflow.providers.google.cloud.hooks.compute_ssh import ComputeEngineSSHHook
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.utils import dates

# [START howto_operator_gce_args_common]
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
Expand All @@ -30,8 +30,9 @@

with models.DAG(
'example_compute_ssh',
default_args=dict(start_date=dates.days_ago(1)),
schedule_interval='@once', # Override to match your needs
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
# # [START howto_execute_command_on_remote1]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Example Airflow DAG that interacts with Google Data Catalog service
"""
import os
from datetime import datetime

from google.cloud.datacatalog_v1beta1 import FieldType, TagField, TagTemplateField

Expand Down Expand Up @@ -49,7 +50,6 @@
CloudDataCatalogUpdateTagTemplateFieldOperator,
CloudDataCatalogUpdateTagTemplateOperator,
)
from airflow.utils.dates import days_ago

PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_ID = os.getenv("GCP_TEST_DATA_BUCKET", "INVALID BUCKET NAME")
Expand All @@ -61,7 +61,12 @@
FIELD_NAME_2 = "second"
FIELD_NAME_3 = "first-rename"

with models.DAG("example_gcp_datacatalog", schedule_interval='@once', start_date=days_ago(1)) as dag:
with models.DAG(
"example_gcp_datacatalog",
schedule_interval='@once',
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag:
# Create
# [START howto_operator_gcp_datacatalog_create_entry_group]
create_entry_group = CloudDataCatalogCreateEntryGroupOperator(
Expand Down
16 changes: 11 additions & 5 deletions airflow/providers/google/cloud/example_dags/example_dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Example Airflow DAG for Google Cloud Dataflow service
"""
import os
from datetime import datetime
from typing import Callable, Dict, List
from urllib.parse import urlparse

Expand All @@ -41,7 +42,8 @@
DataflowJobStatusSensor,
)
from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
from airflow.utils.dates import days_ago

START_DATE = datetime(2021, 1, 1)

GCS_TMP = os.environ.get('GCP_DATAFLOW_GCS_TMP', 'gs://INVALID BUCKET NAME/temp/')
GCS_STAGING = os.environ.get('GCP_DATAFLOW_GCS_STAGING', 'gs://INVALID BUCKET NAME/staging/')
Expand All @@ -63,7 +65,8 @@
with models.DAG(
"example_gcp_dataflow_native_java",
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
start_date=START_DATE,
catchup=False,
tags=['example'],
) as dag_native_java:

Expand Down Expand Up @@ -110,7 +113,8 @@
with models.DAG(
"example_gcp_dataflow_native_python",
default_args=default_args,
start_date=days_ago(1),
start_date=START_DATE,
catchup=False,
schedule_interval='@once', # Override to match your needs
tags=['example'],
) as dag_native_python:
Expand Down Expand Up @@ -145,7 +149,8 @@
with models.DAG(
"example_gcp_dataflow_native_python_async",
default_args=default_args,
start_date=days_ago(1),
start_date=START_DATE,
catchup=False,
schedule_interval='@once', # Override to match your needs
tags=['example'],
) as dag_native_python_async:
Expand Down Expand Up @@ -246,7 +251,8 @@ def check_autoscaling_event(autoscaling_events: List[dict]) -> bool:
with models.DAG(
"example_gcp_dataflow_template",
default_args=default_args,
start_date=days_ago(1),
start_date=START_DATE,
catchup=False,
schedule_interval='@once', # Override to match your needs
tags=['example'],
) as dag_template:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
Example Airflow DAG for Google Cloud Dataflow service
"""
import os
from datetime import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataflow import DataflowStartFlexTemplateOperator
from airflow.utils.dates import days_ago

GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")

Expand All @@ -45,7 +45,8 @@

with models.DAG(
dag_id="example_gcp_dataflow_flex_template_java",
start_date=days_ago(1),
start_date=datetime(2021, 1, 1),
catchup=False,
schedule_interval='@once', # Override to match your needs
) as dag_flex_template:
# [START howto_operator_start_template_job]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
Example Airflow DAG for Google Cloud Dataflow service
"""
import os
from datetime import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataflow import DataflowStartSqlJobOperator
from airflow.utils.dates import days_ago

GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")

Expand All @@ -36,7 +36,8 @@

with models.DAG(
dag_id="example_gcp_dataflow_sql",
start_date=days_ago(1),
start_date=datetime(2021, 1, 1),
catchup=False,
schedule_interval='@once', # Override to match your needs
tags=['example'],
) as dag_sql:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
Example Airflow DAG that shows how to use DataFusion.
"""
import os
from datetime import datetime

from airflow import models
from airflow.operators.bash import BashOperator
Expand All @@ -35,7 +36,6 @@
CloudDataFusionUpdateInstanceOperator,
)
from airflow.providers.google.cloud.sensors.datafusion import CloudDataFusionPipelineStateSensor
from airflow.utils import dates
from airflow.utils.state import State

# [START howto_data_fusion_env_variables]
Expand Down Expand Up @@ -153,7 +153,8 @@
with models.DAG(
"example_data_fusion",
schedule_interval='@once', # Override to match your needs
start_date=dates.days_ago(1),
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag:
# [START howto_cloud_data_fusion_create_instance_operator]
create_instance = CloudDataFusionCreateInstanceOperator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
Example Airflow DAG that shows how to use Google Dataprep.
"""
import os
from datetime import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataprep import (
DataprepGetJobGroupOperator,
DataprepGetJobsForJobGroupOperator,
DataprepRunJobGroupOperator,
)
from airflow.utils import dates

DATAPREP_JOB_ID = int(os.environ.get('DATAPREP_JOB_ID', 12345677))
DATAPREP_JOB_RECIPE_ID = int(os.environ.get('DATAPREP_JOB_RECIPE_ID', 12345677))
Expand Down Expand Up @@ -53,7 +53,8 @@
with models.DAG(
"example_dataprep",
schedule_interval='@once',
start_date=dates.days_ago(1), # Override to match your needs
start_date=datetime(2021, 1, 1), # Override to match your needs
catchup=False,
) as dag:
# [START how_to_dataprep_run_job_group_operator]
run_job_group = DataprepRunJobGroupOperator(task_id="run_job_group", body_request=DATA)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"""

import os
from datetime import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
Expand All @@ -32,7 +33,6 @@
DataprocUpdateClusterOperator,
)
from airflow.providers.google.cloud.sensors.dataproc import DataprocJobSensor
from airflow.utils.dates import days_ago

PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id")
CLUSTER_NAME = os.environ.get("GCP_DATAPROC_CLUSTER_NAME", "example-cluster")
Expand Down Expand Up @@ -151,7 +151,12 @@
}


with models.DAG("example_gcp_dataproc", schedule_interval='@once', start_date=days_ago(1)) as dag:
with models.DAG(
"example_gcp_dataproc",
schedule_interval='@once',
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag:
# [START how_to_cloud_dataproc_create_cluster_operator]
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster",
Expand Down
10 changes: 7 additions & 3 deletions airflow/providers/google/cloud/example_dags/example_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"""

import os
from datetime import datetime
from typing import Any, Dict

from airflow import models
Expand All @@ -35,15 +36,17 @@
CloudDatastoreRollbackOperator,
CloudDatastoreRunQueryOperator,
)
from airflow.utils import dates

START_DATE = datetime(2021, 1, 1)

GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
BUCKET = os.environ.get("GCP_DATASTORE_BUCKET", "datastore-system-test")

with models.DAG(
"example_gcp_datastore",
schedule_interval='@once', # Override to match your needs
start_date=dates.days_ago(1),
start_date=START_DATE,
catchup=False,
tags=["example"],
) as dag:
# [START how_to_export_task]
Expand Down Expand Up @@ -82,8 +85,9 @@

with models.DAG(
"example_gcp_datastore_operations",
start_date=dates.days_ago(1),
schedule_interval='@once', # Override to match your needs
start_date=START_DATE,
catchup=False,
tags=["example"],
) as dag2:
# [START how_to_allocate_ids]
Expand Down
Loading

0 comments on commit 6ef44b6

Please sign in to comment.