Skip to content

Commit

Permalink
Bugfix/prevent concurrency with cached venv (#35258)
Browse files Browse the repository at this point in the history
Revert #35252 and ensure venv cache files are created in a dedicated tmp folder
  • Loading branch information
jscheffl committed Oct 30, 2023
1 parent 03b8acb commit 2f48bc9
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 87 deletions.
105 changes: 51 additions & 54 deletions airflow/operators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,57 +422,56 @@ def __deepcopy__(self, memo):
memo[id(self.pickling_library)] = self.pickling_library
return super().__deepcopy__(memo)

def _execute_python_callable_in_subprocess(self, python_path: Path, tmp_dir: Path):
op_kwargs: dict[str, Any] = dict(self.op_kwargs)
if self.templates_dict:
op_kwargs["templates_dict"] = self.templates_dict
input_path = tmp_dir / "script.in"
output_path = tmp_dir / "script.out"
string_args_path = tmp_dir / "string_args.txt"
script_path = tmp_dir / "script.py"
termination_log_path = tmp_dir / "termination.log"

self._write_args(input_path)
self._write_string_args(string_args_path)
write_python_script(
jinja_context={
"op_args": self.op_args,
"op_kwargs": op_kwargs,
"expect_airflow": self.expect_airflow,
"pickling_library": self.pickling_library.__name__,
"python_callable": self.python_callable.__name__,
"python_callable_source": self.get_python_source(),
},
filename=os.fspath(script_path),
render_template_as_native_obj=self.dag.render_template_as_native_obj,
)
# For cached venv we need to make sure that the termination log does not exist
# Before process starts (it could be a left-over from a previous run)
if termination_log_path.exists():
termination_log_path.unlink()
try:
execute_in_subprocess(
cmd=[
os.fspath(python_path),
os.fspath(script_path),
os.fspath(input_path),
os.fspath(output_path),
os.fspath(string_args_path),
os.fspath(termination_log_path),
]
def _execute_python_callable_in_subprocess(self, python_path: Path):
with TemporaryDirectory(prefix="venv-call") as tmp:
tmp_dir = Path(tmp)
op_kwargs: dict[str, Any] = dict(self.op_kwargs)
if self.templates_dict:
op_kwargs["templates_dict"] = self.templates_dict
input_path = tmp_dir / "script.in"
output_path = tmp_dir / "script.out"
string_args_path = tmp_dir / "string_args.txt"
script_path = tmp_dir / "script.py"
termination_log_path = tmp_dir / "termination.log"

self._write_args(input_path)
self._write_string_args(string_args_path)
write_python_script(
jinja_context={
"op_args": self.op_args,
"op_kwargs": op_kwargs,
"expect_airflow": self.expect_airflow,
"pickling_library": self.pickling_library.__name__,
"python_callable": self.python_callable.__name__,
"python_callable_source": self.get_python_source(),
},
filename=os.fspath(script_path),
render_template_as_native_obj=self.dag.render_template_as_native_obj,
)
except subprocess.CalledProcessError as e:
if e.returncode in self.skip_on_exit_code:
raise AirflowSkipException(f"Process exited with code {e.returncode}. Skipping.")
elif termination_log_path.exists() and termination_log_path.stat().st_size > 0:
error_msg = f"Process returned non-zero exit status {e.returncode}.\n"
with open(termination_log_path) as file:
error_msg += file.read()
raise AirflowException(error_msg) from None
else:
raise

return self._read_result(output_path)
try:
execute_in_subprocess(
cmd=[
os.fspath(python_path),
os.fspath(script_path),
os.fspath(input_path),
os.fspath(output_path),
os.fspath(string_args_path),
os.fspath(termination_log_path),
]
)
except subprocess.CalledProcessError as e:
if e.returncode in self.skip_on_exit_code:
raise AirflowSkipException(f"Process exited with code {e.returncode}. Skipping.")
elif termination_log_path.exists() and termination_log_path.stat().st_size > 0:
error_msg = f"Process returned non-zero exit status {e.returncode}.\n"
with open(termination_log_path) as file:
error_msg += file.read()
raise AirflowException(error_msg) from None
else:
raise

return self._read_result(output_path)

def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]:
return KeywordParameters.determine(self.python_callable, self.op_args, context).serializing()
Expand Down Expand Up @@ -704,13 +703,13 @@ def execute_callable(self):
if self.venv_cache_path:
venv_path = self._ensure_venv_cache_exists(Path(self.venv_cache_path))
python_path = venv_path / "bin" / "python"
return self._execute_python_callable_in_subprocess(python_path, venv_path)
return self._execute_python_callable_in_subprocess(python_path)

with TemporaryDirectory(prefix="venv") as tmp_dir:
tmp_path = Path(tmp_dir)
self._prepare_venv(tmp_path)
python_path = tmp_path / "bin" / "python"
result = self._execute_python_callable_in_subprocess(python_path, tmp_path)
result = self._execute_python_callable_in_subprocess(python_path)
return result

def _iter_serializable_context_keys(self):
Expand Down Expand Up @@ -848,9 +847,7 @@ def execute_callable(self):
f"Sys version: {sys.version_info}. "
f"Virtual environment version: {python_version_as_list_of_strings}"
)
with TemporaryDirectory(prefix="tmd") as tmp_dir:
tmp_path = Path(tmp_dir)
return self._execute_python_callable_in_subprocess(python_path, tmp_path)
return self._execute_python_callable_in_subprocess(python_path)

def _get_python_version_from_environment(self) -> list[str]:
try:
Expand Down
33 changes: 0 additions & 33 deletions tests/operators/test_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import warnings
from collections import namedtuple
from datetime import date, datetime, timedelta
from pathlib import Path
from subprocess import CalledProcessError
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, Generator
Expand Down Expand Up @@ -999,38 +998,6 @@ def f(a):
with TemporaryDirectory(prefix="pytest_venv_1234") as tmp_dir:
self.run_as_task(f, venv_cache_path=tmp_dir, op_args=[4])

def test_no_side_effect_of_caching_and_termination_log(self):
def termination_log(a):
import sys
from pathlib import Path

assert "pytest_venv_1234" in sys.executable
venv_cache_dir_name = Path(sys.executable).parent.parent.name
raise Exception(f"Should produce termination log. Subdir = {venv_cache_dir_name}")

def no_termination_log(a):
import sys

assert "pytest_venv_1234" in sys.executable
raise SystemExit(1)

with TemporaryDirectory(prefix="pytest_venv_1234") as tmp_dir:
with pytest.raises(AirflowException, match="Should produce termination log") as exc:
self.run_as_task(termination_log, venv_cache_path=tmp_dir, op_args=[4])
venv_dir_cache = exc.value.args[0].split(" ")[-1]
termination_log_path = Path(tmp_dir) / venv_dir_cache / "termination.log"
assert termination_log_path.exists()
assert "Should produce termination log" in termination_log_path.read_text()
clear_db_runs()

# termination log from previous run should not produce side effects in another task
# Using the same cached venv

assert termination_log_path.exists()
with pytest.raises(CalledProcessError):
self.run_as_task(no_termination_log, venv_cache_path=tmp_dir, op_args=[4])
assert not termination_log_path.exists()

# This tests might take longer than default 60 seconds as it is serializing a lot of
# context using dill (which is slow apparently).
@pytest.mark.execution_timeout(120)
Expand Down

0 comments on commit 2f48bc9

Please sign in to comment.