Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-5478] Decode PythonVirtualenvOperator Output to Logs #6097

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 14 additions & 2 deletions airflow/operators/python_operator.py
Expand Up @@ -244,8 +244,10 @@ class PythonVirtualenvOperator(PythonOperator):
:param templates_exts: a list of file extensions to resolve while
processing templated fields, for examples ``['.sql', '.hql']``
:type templates_exts: list[str]
:param output_encoding: The string representing how to decode the output of the
python_callable running in virtualenv.
:type output_encoding: str
"""

@apply_defaults
def __init__(
self,
Expand All @@ -259,6 +261,7 @@ def __init__(
string_args: Optional[Iterable[str]] = None,
templates_dict: Optional[Dict] = None,
templates_exts: Optional[Iterable[str]] = None,
output_encoding: Optional[str] = None,
*args,
**kwargs
):
Expand All @@ -275,6 +278,8 @@ def __init__(
self.python_version = python_version
self.use_dill = use_dill
self.system_site_packages = system_site_packages
self.output_encoding = output_encoding

# check that dill is present if needed
dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
self.requirements)
Expand Down Expand Up @@ -335,10 +340,17 @@ def _execute_in_subprocess(self, cmd):
output = subprocess.check_output(cmd,
stderr=subprocess.STDOUT,
close_fds=True)
if self.output_encoding:
output = output.decode(self.output_encoding)

if output:
self.log.info("Got output\n%s", output)
except subprocess.CalledProcessError as e:
self.log.info("Got error output\n%s", e.output)
output = e.output
if self.output_encoding:
output = output.decode(self.output_encoding)

self.log.info("Got error output\n%s", output)
raise

def _write_string_args(self, filename):
Expand Down