Skip to content

Commit

Permalink
Fix output property missing for airflow version < 2.4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
sunank200 committed Dec 7, 2022
1 parent 6db1977 commit d2b5a18
Showing 1 changed file with 27 additions and 6 deletions.
33 changes: 27 additions & 6 deletions python-sdk/src/astro/sql/operators/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,17 +224,38 @@ def resolve_tables_from_tasks( # noqa: C901
for task in tasks:
try:
if isinstance(task, OPERATOR_CLASSES_WITH_TABLE_OUTPUT):
t = task.output.resolve(context)
if isinstance(t, BaseTable):
res.append(t)
try:
# works on airflow version >= 2.4.0
t = task.output.resolve(context)
if isinstance(t, BaseTable):
res.append(t)
except AttributeError:
# works on airflow version < 2.4.0
from airflow.models.xcom_arg import XComArg

task_output = XComArg(operator=self)
for t in task_output.resolve(context):
if isinstance(t, BaseTable):
res.append(t)
elif (
MappedOperator
and isinstance(task, MappedOperator)
and issubclass(task.operator_class, OPERATOR_CLASSES_WITH_TABLE_OUTPUT)
):
for t in task.output.resolve(context):
if isinstance(t, BaseTable):
res.append(t)
try:
# works on airflow version >= 2.4.0
for t in task.output.resolve(context):
if isinstance(t, BaseTable):
res.append(t)
except AttributeError:
# works on airflow version < 2.4.0
from airflow.models.xcom_arg import XComArg

task_output = XComArg(operator=self)
for t in task_output.resolve(context):
if isinstance(t, BaseTable):
res.append(t)

except AirflowException: # pragma: no cover
self.log.info(
"xcom output for %s not found. Will not clean up this task",
Expand Down

0 comments on commit d2b5a18

Please sign in to comment.