diff --git a/airflow/providers/google/cloud/links/workflows.py b/airflow/providers/google/cloud/links/workflows.py index 1d944a8e8f34d..52e4b4c42d60e 100644 --- a/airflow/providers/google/cloud/links/workflows.py +++ b/airflow/providers/google/cloud/links/workflows.py @@ -26,7 +26,7 @@ from airflow.models import BaseOperator from airflow.utils.context import Context -WORKFLOWS_BASE_LINK = "workflows" +WORKFLOWS_BASE_LINK = "/workflows" WORKFLOW_LINK = WORKFLOWS_BASE_LINK + "/workflow/{location_id}/{workflow_id}/executions?project={project_id}" WORKFLOWS_LINK = WORKFLOWS_BASE_LINK + "?project={project_id}" EXECUTION_LINK = ( diff --git a/airflow/providers/google/cloud/operators/workflows.py b/airflow/providers/google/cloud/operators/workflows.py index 0bfefe5df4583..505c926008311 100644 --- a/airflow/providers/google/cloud/operators/workflows.py +++ b/airflow/providers/google/cloud/operators/workflows.py @@ -52,7 +52,7 @@ class WorkflowsCreateWorkflowOperator(GoogleCloudBaseOperator): Creates a new workflow. If a workflow with the specified name already exists in the specified - project and location, the long running operation will return + project and location, the long-running operation will return [ALREADY_EXISTS][google.rpc.Code.ALREADY_EXISTS] error. .. seealso:: @@ -606,7 +606,8 @@ class WorkflowsListExecutionsOperator(GoogleCloudBaseOperator): :param workflow_id: Required. The ID of the workflow to be created. :param start_date_filter: If passed only executions older that this date will be returned. - By default operators return executions from last 60 minutes + By default, operators return executions from last 60 minutes. + Note that datetime object must specify a time zone, e.g. ``datetime.timezone.utc``. :param project_id: Required. The ID of the Google Cloud project the cluster belongs to. :param location: Required. The GCP region in which to handle the request. :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be @@ -670,7 +671,7 @@ def execute(self, context: Context): return [ Execution.to_dict(e) for e in execution_iter - if e.start_time.ToDatetime(tzinfo=datetime.timezone.utc) > self.start_date_filter + if e.start_time > self.start_date_filter # type: ignore ] diff --git a/tests/providers/google/cloud/operators/test_workflows.py b/tests/providers/google/cloud/operators/test_workflows.py index 7ecd1627fe201..c0d854c91c759 100644 --- a/tests/providers/google/cloud/operators/test_workflows.py +++ b/tests/providers/google/cloud/operators/test_workflows.py @@ -334,12 +334,9 @@ class TestWorkflowExecutionsListExecutionsOperator: @mock.patch(BASE_PATH.format("Execution")) @mock.patch(BASE_PATH.format("WorkflowsHook")) def test_execute(self, mock_hook, mock_object): - timestamp = Timestamp() - timestamp.FromDatetime( - datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(minutes=5) - ) + start_date_filter = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(minutes=5) execution_mock = mock.MagicMock() - execution_mock.start_time = timestamp + execution_mock.start_time = start_date_filter mock_hook.return_value.list_executions.return_value = [execution_mock] op = WorkflowsListExecutionsOperator( diff --git a/tests/system/providers/google/cloud/workflows/example_workflows.py b/tests/system/providers/google/cloud/workflows/example_workflows.py index 5dd84d95b179a..e1a0fd0f18b9e 100644 --- a/tests/system/providers/google/cloud/workflows/example_workflows.py +++ b/tests/system/providers/google/cloud/workflows/example_workflows.py @@ -38,31 +38,29 @@ from airflow.providers.google.cloud.sensors.workflows import WorkflowExecutionSensor from airflow.utils.trigger_rule import TriggerRule -ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") -PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") -DAG_ID = "cloud_workflows" +DAG_ID = "example_cloud_workflows" LOCATION = "us-central1" -WORKFLOW_ID = f"workflow-{DAG_ID}-{ENV_ID}" +WORKFLOW_ID = f"workflow-{DAG_ID}-{ENV_ID}".replace("_", "-") # [START how_to_define_workflow] WORKFLOW_CONTENT = """ -- getCurrentTime: - call: http.get - args: - url: https://us-central1-workflowsample.cloudfunctions.net/datetime - result: currentTime +- getLanguage: + assign: + - inputLanguage: "English" - readWikipedia: call: http.get args: - url: https://en.wikipedia.org/w/api.php + url: https://www.wikipedia.org/ query: action: opensearch - search: ${currentTime.body.dayOfTheWeek} + search: ${inputLanguage} result: wikiResult - returnResult: - return: ${wikiResult.body[1]} + return: ${wikiResult} """ WORKFLOW = { @@ -74,7 +72,7 @@ EXECUTION = {"argument": ""} -SLEEP_WORKFLOW_ID = f"sleep-workflow-{DAG_ID}-{ENV_ID}" +SLEEP_WORKFLOW_ID = f"sleep-workflow-{DAG_ID}-{ENV_ID}".replace("_", "-") SLEEP_WORKFLOW_CONTENT = """ - someSleep: call: sys.sleep @@ -94,6 +92,7 @@ schedule="@once", start_date=datetime(2021, 1, 1), catchup=False, + tags=["example", "workflows"], ) as dag: # [START how_to_create_workflow] create_workflow = WorkflowsCreateWorkflowOperator( @@ -131,10 +130,13 @@ # [START how_to_delete_workflow] delete_workflow = WorkflowsDeleteWorkflowOperator( - task_id="delete_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID + task_id="delete_workflow", + location=LOCATION, + project_id=PROJECT_ID, + workflow_id=WORKFLOW_ID, + trigger_rule=TriggerRule.ALL_DONE, ) # [END how_to_delete_workflow] - delete_workflow.trigger_rule = TriggerRule.ALL_DONE # [START how_to_create_execution] create_execution = WorkflowsCreateExecutionOperator( @@ -223,11 +225,6 @@ [cancel_execution, list_executions] >> delete_workflow - # Task dependencies created via `XComArgs`: - # create_execution >> wait_for_execution - # create_execution >> get_execution - # create_execution >> cancel_execution - # ### Everything below this line is not part of example ### # ### Just for system tests purpose ### from tests.system.utils.watcher import watcher