Skip to content

Commit

Permalink
Store dataset_events on the operator instead
Browse files Browse the repository at this point in the history
  • Loading branch information
uranusjr committed Apr 3, 2024
1 parent 822983f commit c3cd16d
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions airflow/operators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,10 @@ def execute(self, context: Context) -> Any:
self.op_kwargs = self.determine_kwargs(context)

try:
dataset_events = context["dataset_events"]
self._dataset_events = context["dataset_events"]
except KeyError:
dataset_events = context["dataset_events"] = DatasetEventAccessors()
return_value = self.execute_callable(dataset_events)
self._dataset_events = context["dataset_events"] = DatasetEventAccessors()
return_value = self.execute_callable()
if self.show_return_value_in_logs:
self.log.info("Done. Returned value was: %s", return_value)
else:
Expand All @@ -247,13 +247,13 @@ def execute(self, context: Context) -> Any:
def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]:
return KeywordParameters.determine(self.python_callable, self.op_args, context).unpacking()

def execute_callable(self, dataset_events: DatasetEventAccessors) -> Any:
def execute_callable(self) -> Any:
"""
Call the python callable with the given arguments.
:return: the return value of the call.
"""
runner = ExecutionCallableRunner(self.python_callable, dataset_events, logger=self.log)
runner = ExecutionCallableRunner(self.python_callable, self._dataset_events, logger=self.log)
return runner.run(*self.op_args, **self.op_kwargs)


Expand Down Expand Up @@ -753,7 +753,7 @@ def _ensure_venv_cache_exists(self, venv_cache_path: Path) -> Path:
self.log.info("New Python virtual environment created in %s", venv_path)
return venv_path

def execute_callable(self, dataset_events: DatasetEventAccessors) -> Any:
def execute_callable(self) -> Any:
if self.venv_cache_path:
venv_path = self._ensure_venv_cache_exists(Path(self.venv_cache_path))
python_path = venv_path / "bin" / "python"
Expand Down Expand Up @@ -881,7 +881,7 @@ def __init__(
**kwargs,
)

def execute_callable(self, dataset_events: DatasetEventAccessors) -> Any:
def execute_callable(self) -> Any:
python_path = Path(self.python)
if not python_path.exists():
raise ValueError(f"Python Path '{python_path}' must exists")
Expand Down

0 comments on commit c3cd16d

Please sign in to comment.