-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-4084] Fix bug downloading incomplete logs from ElasticSearch #5177
[AIRFLOW-4084] Fix bug downloading incomplete logs from ElasticSearch #5177
Conversation
9050587
to
288437f
Compare
Codecov Report
@@ Coverage Diff @@
## master #5177 +/- ##
==========================================
+ Coverage 78.69% 78.69% +<.01%
==========================================
Files 472 472
Lines 30082 30103 +21
==========================================
+ Hits 23673 23691 +18
- Misses 6409 6412 +3
Continue to review full report at Codecov.
|
288437f
to
dc2d03c
Compare
cc @KevinYang21 |
@cong-zhu Do we have unit test for this feature before? If so we'll still need to update it so we capture the use case which this PR is fixing, if not we might want to add that test. Also the CI is failing now, if you're still working on it then you can mark the PR as a WIP PR by putting a |
@KevinYang21 Yes, we have unit test for this log download feature, though it's not testing against ElasticSearch. CI failed because of python version. I marked this PR as |
73d38a6
to
d13d81c
Compare
@KevinYang21 PTAL, I fixed CI failure and added unit tests. |
airflow/utils/log/es_task_handler.py
Outdated
try: | ||
metadata['max_offset'] = s[max_log_line - 1].execute()[-1].offset if max_log_line > 0 else 0 | ||
except Exception as e: | ||
msg = 'Could not get current log size with log_id: {}, ' \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think we can just pass e to exception and log the msg separately if you want to keep it
|
||
try: | ||
if ti is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: if ti:
LGTM |
d13d81c
to
3f5c600
Compare
@KevinYang21, I updated the way to handle exception. This PR has been verified on my local. |
(cherry picked from commit 56a038a)
(cherry picked from commit 56a038a)
I wonder why this change had to make changes to the I am working on documentation for this class and if we revert this change we will be able to do it as follows. In my opinion this is the original behavior of this code. class TaskHandler(logging.Handler, ABC):
"""
Handler that allows you to write and read information about a specific task.
"""
@abstractmethod
def read(
self, task_instance: TaskInstance, try_number: Optional[int] = None, metadata: Optional[Dict] = None
) -> Tuple[List[str], List[Dict]]:
"""
Read logs of given task instance.
It supports log pagination. To do this, the first call to this function contains an empty metadata
object. As a result, list of logs and list of metadata should be returned. The resulting
metadata should contain the key ``end_of_logs``, which determines whether pagination should be
continued. It is possible to return more metadata objects, but only the first is used, so
you should always return a list with one item.
The remaining keys in the dictionary are sent back to the method without changes, which means that
if you add an additional key with a token or page number, you can expect that the key will be
available in the next request for logs.
If the metadata in the call contains the ``download_logs'' key, then full logs should be
returned without pagination.
:param task_instance: task instance object
:param try_number: task instance try_number to read logs from. If None
it returns all logs separated by try_number
:param metadata: log metadata,
can be used for steaming log reading and auto-tailing.
:return: a list of logs and list of metadata objects.
"""
...
@abstractmethod
def set_context(self, task_instance: TaskInstance) -> None:
"""
Provide task_instance context to airflow task handler.
Different implementations provide different behavior. Examples of behavior are:
* in the case of handlers writing to a file, it may start writing to another file;
* for remote services, it can start adding labels to logs.
This allows us to later search for logs for a single task
:param task_instance: task instance object
"""
... I would like to point out that in this PR there is a duplicate mechanism of handling the case when try_numbers is empty, i.e. the log for all try numbers is downloaded. Previously it was part of the file_task_handler handler, and now it has been copied a second time to another place despite the es_task_handler extended from this handler. |
Jira
Description
Tests
Commits
Documentation
Code Quality
flake8
@pingzh