Skip to content

Conversation

@asaf400
Copy link

@asaf400 asaf400 commented Oct 17, 2021

closes: #18874

This PR is an attempt at fixing DockerOperator xcoms behavior,
The fix is keeping the existing attach to docker's log stream for 'live' airflow logging for long-running docker containers with progress output, but asking for docker logs after the container exited for xcoms usage (either last line, or all lines),
thus restoring the behavior documented here:
https://airflow.apache.org/docs/apache-airflow-providers-docker/stable/_api/airflow/providers/docker/operators/docker/index.html#airflow.providers.docker.operators.docker.DockerOperator

xcom_all (bool) -- Push all the stdout or just the last line. The default is False (last line).

Please note, I do not know how Mocks works in the testing files, and so, I don't think I can do much about tests for this PR,
the current tests for this functionality exist and are currently (before this PR) passing,
even though they shouldn't, as described in #18874

the Tests are located here:

xcom_push_operator = DockerOperator(**kwargs, do_xcom_push=True, xcom_all=False)
xcom_all_operator = DockerOperator(**kwargs, do_xcom_push=True, xcom_all=True)
no_xcom_push_operator = DockerOperator(**kwargs, do_xcom_push=False)
xcom_push_result = xcom_push_operator.execute(None)
xcom_all_result = xcom_all_operator.execute(None)
no_xcom_push_result = no_xcom_push_operator.execute(None)
assert xcom_push_result == 'container log 2'
assert xcom_all_result == ['container log 1', 'container log 2']
assert no_xcom_push_result is None

so in summary, the current tests have been useless for discovering this issue, and I personally don't know enough about airflow pytest/unittest setup to propose a change to make them better, any help or additional commit into this PR is Welcome

While keeping the existing attach to docker's log stream for 'live' airflow logging for long-running docker containers with progress output, but asking for docker logs after the container exited for xcoms
@boring-cyborg
Copy link

boring-cyborg bot commented Oct 17, 2021

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

asaf400 added a commit to asaf400/airflow that referenced this pull request Oct 17, 2021
DockerOperator has* the option to send the entire container log into xcom return_value, not just the last line
EcsOperator doesn't have this option, it defaults to only sending the last line, This commit changes that behaviour, adding xcom_all parameter to the Operator, to enable sending all logs lines.

In addition, the get_last_log_message function has been refactored,
In an edge case where the log is bigger than 1 MB (up to 10,000 log events) as per aws docs,
it is possible that the function would cause more than one 'GetLogEvents' AWS API call because it defaults to read the logstream/group from beginning,
and using deque with queue size of 1, it would iterate over the entire generator yields, calling get_log_events as much as it needs,
just so that when the generator is exhausted, the queue would keep only the last event, this is a waste of time and data transfer,
AWS get_log_events method has start_from_head parameter, which can be used to actually tail the event log instead of reading from head!
in combination with a single next() yield, we can ensure we only call get_log_events once, and within the response, grab the last message

* DockerOperator is actually also unstable in regard to xcoms, see issue apache#18874 and PR apache#19027
Comment on lines +322 to +324
# after container has exited, grab the entire log ignoring the chunked log stream that was used with attach
# self.cli.logs uses docker's /containers/{id}/logs, while self.cli.attach uses /containers/{id}/attach
lines = self.cli.logs(container=self.container['Id'], stdout=True, stderr=True, stream=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we should ultilise the tail argument to fetch less data when only the last line if needed. Or even not do this additional call at all if retrieve_output is set and the container exited successfully.

The _get_return_value_from_logs can be removed entirely (it is only used once here), and it's "private" so we are allowed to break it (or we can always bump the major version of the Docker provider if needed).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, tail looks great, I didn't know it was an argument of logs(), even though it was right there in the docs..

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# after container has exited, grab the entire log ignoring the chunked log stream that was used with attach
# self.cli.logs uses docker's /containers/{id}/logs, while self.cli.attach uses /containers/{id}/attach
lines = self.cli.logs(container=self.container['Id'], stdout=True, stderr=True, stream=True)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@uranusjr - I'm going to finish this one up. Is there a reason we're pulling the logs again instead of reusing the parsed logs from above?

Copy link
Member

@uranusjr uranusjr Jan 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logstream only contains logs starting from the attach happens and may lose things between it and container startup. I’m not sure if it makes a difference, but honestly the additional API call is pretty cheap anyway, and might actually be more efficient for xcom_all=False because we don’t need to put all the previous logs into memory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but we're calling attach before we call cli.start() so that should mean the stream has everything from the beginning. I'm happy to go either way, just thought it might be simplified while we were in there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to #9164, attching too slow and missing a line is a real problem though? I feel it’s just much safer to have a separate API call.

Comment on lines +334 to 335
ret = self._get_return_value_from_logs(lines, lines[-1])
return ret
Copy link
Author

@asaf400 asaf400 Oct 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@uranusjr

so you were thinking a doing flavor of this,
with the previous suggested edit (removing lines from earlier in the code) ?

it kind of looks bad, because it's too long, but it should work,
Feel free to give another suggestion to make it prettier, I'm missing the way to prettify it, if that what you meant by removing _get_return_value_from_logs

Suggested change
ret = self._get_return_value_from_logs(lines, lines[-1])
return ret
return self.cli.logs(container=self.container['Id'], stdout=True, stderr=True, stream=True) if self.xcom_all else self.cli.logs(container=self.container['Id'], stdout=True, stderr=True, stream=True, tail=1)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, something like this. We also need to deal with the AirflowException message above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if "".join(logstream).splitlines() would achieve the same as the tail-less call.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, I tested out adding random \n into one of the bash 'echo' (without -e) commands in my example in the issue discussion,
I tried using logstream.split('\n') but it had undesired results, as some of the random \n's were converted to newlines in the output, but I forgot to test splitlines() I guess

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll test these suggestions shortly

Copy link
Author

@asaf400 asaf400 Oct 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this small test help understand what I meant

#!/usr/bin/python3

str = r"""this is
an 'herestring'
with some lines,
but now\n there's even more \n lines, but are they real?\n
string example....\nwow!!!"""
print (str.splitlines())

In this case, the difference between an expected results and an unexpected results lays in the r,
without the r""", splitlines would also process the "inline \n's"

I'll test how logstream would handle splitlines

@asaf400
Copy link
Author

asaf400 commented Oct 18, 2021

Tests are failing with:

tests/providers/docker/operators/test_docker.py::TestDockerOperator::test_execute_xcom_behavior: TypeError: 'Mock' object does not support indexing

and similar, is that my bad, or just bad luck?
I didn't touch test files

@eladkal
Copy link
Contributor

eladkal commented Oct 21, 2021

Tests are failing with:

tests/providers/docker/operators/test_docker.py::TestDockerOperator::test_execute_xcom_behavior: TypeError: 'Mock' object does not support indexing

and similar, is that my bad, or just bad luck? I didn't touch test files

Probably not a bad luck as you are changing the xcom functionality of DockerOperator. The tests are there to avoid regression. So either there is issue with the functionality you add or simply you need to fix the tests.

@asaf400
Copy link
Author

asaf400 commented Oct 28, 2021

Okay, I'll be able to invest more time into this PR in the next few day or two,
I wasn't actually able to work on this over the last week

@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Dec 13, 2021
@github-actions github-actions bot closed this Dec 18, 2021
ferruzzi added a commit to ferruzzi/airflow that referenced this pull request Jan 28, 2022
Please see: apache#19027

Co-Author: asaf400@gmail.com
potiuk added a commit to potiuk/airflow that referenced this pull request Feb 8, 2022
The fix from apache#21175 did not actually fix the logging behaviour
with non-ascii characters returned by docker logs when xcom
push was enabled. The problem is that DockerOperator uses
different methods to stream the logs as they come (using attach
stream) and different method to retrieve the logs to actually
return the Xcom value. The latter uses "logs" method of docker
client. The tests have not caught it, because the two methods
were mocked in two different places.

This PR uses the same "stringify()" function to convert both
"logged" logs and those that are pushed as xcom. Also added
test for "no lines returned" case.

Fixes: apache#19027
potiuk added a commit that referenced this pull request Feb 8, 2022
* Fix docker behaviour with byte lines returned

The fix from #21175 did not actually fix the logging behaviour
with non-ascii characters returned by docker logs when xcom
push was enabled. The problem is that DockerOperator uses
different methods to stream the logs as they come (using attach
stream) and different method to retrieve the logs to actually
return the Xcom value. The latter uses "logs" method of docker
client. The tests have not caught it, because the two methods
were mocked in two different places.

This PR uses the same "stringify()" function to convert both
"logged" logs and those that are pushed as xcom. Also added
test for "no lines returned" case.

Fixes: #19027

* Update tests/providers/docker/operators/test_docker.py

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers stale Stale PRs per the .github/workflows/stale.yml policy file

Projects

None yet

Development

Successfully merging this pull request may close these issues.

DockerOperator xcom is broken

4 participants