Skip to content

Commit

Permalink
Fix Cloud Worflows system test (#33386)
Browse files Browse the repository at this point in the history
* Fix Cloud Worflows system test

---------

Co-authored-by: Maksim Moiseenkov <maksim_moiseenkov@epam.com>
  • Loading branch information
VladaZakharova and moiseenkov committed Aug 30, 2023
1 parent 5ca8987 commit eaf3471
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 29 deletions.
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/links/workflows.py
Expand Up @@ -26,7 +26,7 @@
from airflow.models import BaseOperator
from airflow.utils.context import Context

WORKFLOWS_BASE_LINK = "workflows"
WORKFLOWS_BASE_LINK = "/workflows"
WORKFLOW_LINK = WORKFLOWS_BASE_LINK + "/workflow/{location_id}/{workflow_id}/executions?project={project_id}"
WORKFLOWS_LINK = WORKFLOWS_BASE_LINK + "?project={project_id}"
EXECUTION_LINK = (
Expand Down
7 changes: 4 additions & 3 deletions airflow/providers/google/cloud/operators/workflows.py
Expand Up @@ -52,7 +52,7 @@ class WorkflowsCreateWorkflowOperator(GoogleCloudBaseOperator):
Creates a new workflow.
If a workflow with the specified name already exists in the specified
project and location, the long running operation will return
project and location, the long-running operation will return
[ALREADY_EXISTS][google.rpc.Code.ALREADY_EXISTS] error.
.. seealso::
Expand Down Expand Up @@ -606,7 +606,8 @@ class WorkflowsListExecutionsOperator(GoogleCloudBaseOperator):
:param workflow_id: Required. The ID of the workflow to be created.
:param start_date_filter: If passed only executions older that this date will be returned.
By default operators return executions from last 60 minutes
By default, operators return executions from last 60 minutes.
Note that datetime object must specify a time zone, e.g. ``datetime.timezone.utc``.
:param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
:param location: Required. The GCP region in which to handle the request.
:param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
Expand Down Expand Up @@ -670,7 +671,7 @@ def execute(self, context: Context):
return [
Execution.to_dict(e)
for e in execution_iter
if e.start_time.ToDatetime(tzinfo=datetime.timezone.utc) > self.start_date_filter
if e.start_time > self.start_date_filter # type: ignore
]


Expand Down
7 changes: 2 additions & 5 deletions tests/providers/google/cloud/operators/test_workflows.py
Expand Up @@ -334,12 +334,9 @@ class TestWorkflowExecutionsListExecutionsOperator:
@mock.patch(BASE_PATH.format("Execution"))
@mock.patch(BASE_PATH.format("WorkflowsHook"))
def test_execute(self, mock_hook, mock_object):
timestamp = Timestamp()
timestamp.FromDatetime(
datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(minutes=5)
)
start_date_filter = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(minutes=5)
execution_mock = mock.MagicMock()
execution_mock.start_time = timestamp
execution_mock.start_time = start_date_filter
mock_hook.return_value.list_executions.return_value = [execution_mock]

op = WorkflowsListExecutionsOperator(
Expand Down
37 changes: 17 additions & 20 deletions tests/system/providers/google/cloud/workflows/example_workflows.py
Expand Up @@ -38,31 +38,29 @@
from airflow.providers.google.cloud.sensors.workflows import WorkflowExecutionSensor
from airflow.utils.trigger_rule import TriggerRule

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")

DAG_ID = "cloud_workflows"
DAG_ID = "example_cloud_workflows"

LOCATION = "us-central1"
WORKFLOW_ID = f"workflow-{DAG_ID}-{ENV_ID}"
WORKFLOW_ID = f"workflow-{DAG_ID}-{ENV_ID}".replace("_", "-")

# [START how_to_define_workflow]
WORKFLOW_CONTENT = """
- getCurrentTime:
call: http.get
args:
url: https://us-central1-workflowsample.cloudfunctions.net/datetime
result: currentTime
- getLanguage:
assign:
- inputLanguage: "English"
- readWikipedia:
call: http.get
args:
url: https://en.wikipedia.org/w/api.php
url: https://www.wikipedia.org/
query:
action: opensearch
search: ${currentTime.body.dayOfTheWeek}
search: ${inputLanguage}
result: wikiResult
- returnResult:
return: ${wikiResult.body[1]}
return: ${wikiResult}
"""

WORKFLOW = {
Expand All @@ -74,7 +72,7 @@

EXECUTION = {"argument": ""}

SLEEP_WORKFLOW_ID = f"sleep-workflow-{DAG_ID}-{ENV_ID}"
SLEEP_WORKFLOW_ID = f"sleep-workflow-{DAG_ID}-{ENV_ID}".replace("_", "-")
SLEEP_WORKFLOW_CONTENT = """
- someSleep:
call: sys.sleep
Expand All @@ -94,6 +92,7 @@
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example", "workflows"],
) as dag:
# [START how_to_create_workflow]
create_workflow = WorkflowsCreateWorkflowOperator(
Expand Down Expand Up @@ -131,10 +130,13 @@

# [START how_to_delete_workflow]
delete_workflow = WorkflowsDeleteWorkflowOperator(
task_id="delete_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
task_id="delete_workflow",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=WORKFLOW_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
# [END how_to_delete_workflow]
delete_workflow.trigger_rule = TriggerRule.ALL_DONE

# [START how_to_create_execution]
create_execution = WorkflowsCreateExecutionOperator(
Expand Down Expand Up @@ -223,11 +225,6 @@

[cancel_execution, list_executions] >> delete_workflow

# Task dependencies created via `XComArgs`:
# create_execution >> wait_for_execution
# create_execution >> get_execution
# create_execution >> cancel_execution

# ### Everything below this line is not part of example ###
# ### Just for system tests purpose ###
from tests.system.utils.watcher import watcher
Expand Down

0 comments on commit eaf3471

Please sign in to comment.