Skip to content

Commit

Permalink
Speedup determine installed airflow version in `ExternalPythonOperato…
Browse files Browse the repository at this point in the history
…r` (apache#37409)
  • Loading branch information
Taragolis authored and sunank200 committed Feb 21, 2024
1 parent 51cf64e commit 29a5467
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 6 deletions.
33 changes: 28 additions & 5 deletions airflow/operators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -889,21 +889,44 @@ def _is_pendulum_installed_in_target_env(self) -> bool:
)
return False

@property
def _external_airflow_version_script(self):
"""
Return python script which determines the version of the Apache Airflow.
Import airflow as a module might take a while as a result,
obtaining a version would take up to 1 second.
On the other hand, `importlib.metadata.version` will retrieve the package version pretty fast
something below 100ms; this includes new subprocess overhead.
Possible side effect: it might be a situation that backport package is not available
in Python 3.8 and below, which indicates that venv doesn't contain an `apache-airflow`
or something wrong with the environment.
"""
return textwrap.dedent(
"""
import sys
if sys.version_info >= (3, 9):
from importlib.metadata import version
else:
from importlib_metadata import version
print(version("apache-airflow"))
"""
)

def _get_airflow_version_from_target_env(self) -> str | None:
from airflow import __version__ as airflow_version

try:
result = subprocess.check_output(
[self.python, "-c", "from airflow import __version__; print(__version__)"],
[self.python, "-c", self._external_airflow_version_script],
text=True,
# Avoid Airflow logs polluting stdout.
env={**os.environ, "_AIRFLOW__AS_LIBRARY": "true"},
)
target_airflow_version = result.strip()
if target_airflow_version != airflow_version:
raise AirflowConfigException(
f"The version of Airflow installed for the {self.python}("
f"{target_airflow_version}) is different than the runtime Airflow version: "
f"The version of Airflow installed for the {self.python} "
f"({target_airflow_version}) is different than the runtime Airflow version: "
f"{airflow_version}. Make sure your environment has the same Airflow version "
f"installed as the Airflow runtime."
)
Expand Down
47 changes: 46 additions & 1 deletion tests/operators/test_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -1208,16 +1208,61 @@ def test_except_value_error(self, loads_mock):
def f():
return 1

task = PythonVirtualenvOperator(
task = ExternalPythonOperator(
python_callable=f,
task_id="task",
python=sys.executable,
dag=self.dag,
)

loads_mock.side_effect = DeserializingResultError
with pytest.raises(DeserializingResultError):
task._read_result(path=mock.Mock())

def test_airflow_version(self):
def f():
return 42

op = ExternalPythonOperator(
python_callable=f, task_id="task", python=sys.executable, expect_airflow=True
)
assert op._get_airflow_version_from_target_env()

def test_airflow_version_doesnt_match(self, caplog):
def f():
return 42

op = ExternalPythonOperator(
python_callable=f, task_id="task", python=sys.executable, expect_airflow=True
)

with mock.patch.object(
ExternalPythonOperator, "_external_airflow_version_script", new_callable=mock.PropertyMock
) as mock_script:
mock_script.return_value = "print('1.10.4')"
caplog.set_level("WARNING")
caplog.clear()
assert op._get_airflow_version_from_target_env() is None
assert "(1.10.4) is different than the runtime Airflow version" in caplog.text

def test_airflow_version_script_error_handle(self, caplog):
def f():
return 42

op = ExternalPythonOperator(
python_callable=f, task_id="task", python=sys.executable, expect_airflow=True
)

with mock.patch.object(
ExternalPythonOperator, "_external_airflow_version_script", new_callable=mock.PropertyMock
) as mock_script:
mock_script.return_value = "raise SystemExit('Something went wrong')"
caplog.set_level("WARNING")
caplog.clear()
assert op._get_airflow_version_from_target_env() is None
assert "Something went wrong" in caplog.text
assert "returned non-zero exit status" in caplog.text


class BaseTestBranchPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator):
@pytest.fixture(autouse=True)
Expand Down

0 comments on commit 29a5467

Please sign in to comment.