Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,15 @@ def _ensure_venv_cache_exists(self, venv_cache_path: Path) -> Path:
self.log.info("New Python virtual environment created in %s", venv_path)
return venv_path

def _cleanup_python_pycache_dir(self, cache_dir_path: Path) -> None:
try:
shutil.rmtree(cache_dir_path)
self.log.debug("The directory %s has been deleted.", cache_dir_path)
except FileNotFoundError:
self.log.warning("Fail to delete %s. The directory does not exist.", cache_dir_path)
except PermissionError:
self.log.warning("Permission denied to delete the directory %s.", cache_dir_path)

def _retrieve_index_urls_from_connection_ids(self):
"""Retrieve index URLs from Package Index connections."""
if self.index_urls is None:
Expand All @@ -880,9 +889,13 @@ def execute_callable(self):

with TemporaryDirectory(prefix="venv") as tmp_dir:
tmp_path = Path(tmp_dir)
tmp_dir, temp_venv_dir = tmp_path.relative_to(tmp_path.anchor).parts
Copy link
Member

@gopidesupavan gopidesupavan Aug 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not guarantee always two values :)

image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added fix here #54214

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not guarantee always two values :)

image

Oh, didn't expect that 😢 Thank you for the test and fix!

Copy link
Contributor Author

@olegkachur-e olegkachur-e Aug 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, can you please share a way to reproduce it, as for with TemporaryDirectory(prefix="venv") as tmp_dir: I was expecting a path like tmp_dir_path/venv<random_str>, as both parts are 'static'. What is a way to add additional nested dirs there? @gopidesupavan

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah you can do it simply in local. your assumption will work inside container. but it may not work if it installed standalone in local.

you can follow these steps:

uv venv --python 3.10
source .venv/bin/activate
mkdir airflow_home
export AIRFLOW_HOME=<pwd>/airflow_home
uv pip install -U apache-airflow==3.0.4rc2 --pre --constraints https://github.com/apache/airflow/blob/constraints-3.0.4rc2/constraints-3.10.txt
airflow standalone

pwd -> repalce full path
And you can run an example with PythonVirtualenvOperator

custom_pycache_prefix = Path(sys.pycache_prefix or "")
venv_python_cache_dir = Path.cwd() / custom_pycache_prefix / tmp_dir / temp_venv_dir
self._prepare_venv(tmp_path)
python_path = tmp_path / "bin" / "python"
result = self._execute_python_callable_in_subprocess(python_path)
self._cleanup_python_pycache_dir(venv_python_cache_dir)
return result

def _iter_serializable_context_keys(self):
Expand Down
36 changes: 36 additions & 0 deletions providers/standard/tests/unit/standard/operators/test_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -1660,6 +1660,42 @@ def func():
msg = str(exc_info.value)
assert f"Invalid requirement '{invalid_requirement}'" in msg

@mock.patch("airflow.providers.standard.operators.python.PythonVirtualenvOperator._prepare_venv")
@mock.patch(
"airflow.providers.standard.operators.python.PythonVirtualenvOperator._execute_python_callable_in_subprocess"
)
@mock.patch(
"airflow.providers.standard.operators.python.PythonVirtualenvOperator._cleanup_python_pycache_dir"
)
def test_execute_callable_pycache_cleanup(
self, pycache_cleanup_mock, execute_in_subprocess_mock, prepare_venv_mock
):
custom_pycache_prefix = "custom/__pycache__"
tempdir_name = "tmp"
venv_dir_temp_name = "venvrandom"
venv_path_tmp = f"/{tempdir_name}/{venv_dir_temp_name}"
expected_cleanup_path = Path.cwd() / custom_pycache_prefix / tempdir_name / venv_dir_temp_name

def f():
return 1

op = PythonVirtualenvOperator(
task_id="task",
python_callable=f,
system_site_packages=False,
)
with mock.patch.object(sys, "pycache_prefix", new=custom_pycache_prefix):
with mock.patch(
"airflow.providers.standard.operators.python.TemporaryDirectory"
) as mock_temp_dir:
mock_context = mock_temp_dir.return_value.__enter__
mock_context.return_value = venv_path_tmp
op.execute_callable()

execute_in_subprocess_mock.assert_called_once()
prepare_venv_mock.assert_called_once_with(Path(venv_path_tmp))
pycache_cleanup_mock.assert_called_once_with(expected_cleanup_path)


# when venv tests are run in parallel to other test they create new processes and this might take
# quite some time in shared docker environment and get some contention even between different containers
Expand Down