Skip to content

Commit

Permalink
Fix Async GCSObjectsWithPrefixExistenceSensor xcom push (#37634)
Browse files Browse the repository at this point in the history
Fix GCSObjectsWithPrefixExistenceSensor found prefix in fist
poke only in async mode of sensor then it does not push matches
in xcom. This PR fix it.
  • Loading branch information
pankajastro committed Feb 22, 2024
1 parent a667941 commit ca4c559
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 0 deletions.
2 changes: 2 additions & 0 deletions airflow/providers/google/cloud/sensors/gcs.py
Expand Up @@ -363,6 +363,8 @@ def execute(self, context: Context):
),
method_name="execute_complete",
)
else:
return self._matches

def execute_complete(self, context: dict[str, Any], event: dict[str, str | list[str]]) -> str | list[str]:
"""Return immediately and rely on trigger to throw a success event. Callback for the trigger."""
Expand Down
14 changes: 14 additions & 0 deletions tests/providers/google/cloud/sensors/test_gcs.py
Expand Up @@ -429,6 +429,20 @@ def test_gcs_object_prefix_existence_sensor_finish_before_deferred(self, mock_de
task.execute(mock.MagicMock())
assert not mock_defer.called

@mock.patch("airflow.providers.google.cloud.sensors.gcs.GCSHook")
def test_xcom_value_when_poke_success(self, mock_hook):
mock_hook.return_value.list.return_value = ["test.txt"]
task = GCSObjectsWithPrefixExistenceSensor(
task_id="task-id",
bucket=TEST_BUCKET,
prefix=TEST_PREFIX,
google_cloud_conn_id=TEST_GCP_CONN_ID,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
deferrable=True,
)
responses = task.execute(None)
assert responses == ["test.txt"]


class TestGCSObjectsWithPrefixExistenceAsyncSensor:
OPERATOR = GCSObjectsWithPrefixExistenceSensor(
Expand Down

0 comments on commit ca4c559

Please sign in to comment.