Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit "logs not found" message when ES logs appear to be missing #21261

Merged
merged 7 commits into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions airflow/providers/elasticsearch/log/es_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,24 @@ def _read(
metadata['end_of_log'] = False if not logs else len(loading_hosts) == 0

cur_ts = pendulum.now()
# Assume end of log after not receiving new log for 5 min,
# as executor heartbeat is 1 min and there might be some
# delay before Elasticsearch makes the log available.
if 'last_log_timestamp' in metadata:
last_log_ts = timezone.parse(metadata['last_log_timestamp'])

# if we are not getting any logs at all after more than N seconds of trying,
# assume logs do not exist
if int(next_offset) == 0 and cur_ts.diff(last_log_ts).in_seconds() > 5:
metadata['end_of_log'] = True
missing_log_message = (
f"*** Log {log_id} not found in Elasticsearch. "
"If your task started recently, please wait a moment and reload this page. "
"Otherwise, the logs for this task instance may have been removed."
)
return [('', missing_log_message)], metadata
if (
# Assume end of log after not receiving new log for N min,
cur_ts.diff(last_log_ts).in_minutes() >= 5
or 'max_offset' in metadata
and int(offset) >= int(metadata['max_offset'])
# if max_offset specified, respect it
or ('max_offset' in metadata and int(offset) >= int(metadata['max_offset']))
):
metadata['end_of_log'] = True

Expand Down
69 changes: 60 additions & 9 deletions tests/providers/elasticsearch/log/test_es_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import json
import logging
import os
import re
import shutil
from unittest import mock
from urllib.parse import quote
Expand All @@ -38,6 +39,19 @@
from .elasticmock import elasticmock


def get_ti(dag_id, task_id, execution_date, create_task_instance):
ti = create_task_instance(
dag_id=dag_id,
task_id=task_id,
execution_date=execution_date,
dagrun_state=DagRunState.RUNNING,
state=TaskInstanceState.RUNNING,
)
ti.try_number = 1
ti.raw = False
return ti
Comment on lines +42 to +52
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about turning this into a fixture?

@pytest.fixture()
def create_running_task_instance(create_task_instance):
    def _create_ti(**kwargs):
        ti = create_task_instance(
            dagrun_state=DagRunState.RUNNING,
            state=TaskInstanceState.RUNNING,
            **kwargs,
        )
        ti.try_number = 1
        ti.raw = False
        return ti

    return _create_ti

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was a fixture but i pulled it out so i could make a TI with diff params...

but i guess you can parametirize a fixture like so?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can but it shouldn’t be generally be needed, it’s easier to make the fixture return a function that takes arguments instead (like this one here). I searched your changes and this implementation seems to be good enough for the usages in this PR. You’d do

@pytest.fixture()
def ti(self, create_running_task_instance):
    yield create_running_task_instance(
        dag_id=self.DAG_ID,
        task_id=self.TASK_ID,
        execution_date=self.EXECUTION_DATE,
    )
    clear_db_runs()
    clear_db_dags()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm confused @uranusjr
i'm not seeing how this fixture helps me.
there is already a fixture here like this now. i just pulled out a portion of it (and still use it in the existing fixture) but i just want to be able to specify a different execution date in my specific test than the one used by the fixture.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you are saying i should just change the fixture so that it returns a create_ti(execution_date) funciton and update all the other tests call that function (insstead of just using a returned TI) then i can do -- lemme know



class TestElasticsearchTaskHandler:
DAG_ID = 'dag_for_testing_es_task_handler'
TASK_ID = 'task_for_testing_es_log_handler'
Expand All @@ -47,16 +61,12 @@ class TestElasticsearchTaskHandler:

@pytest.fixture()
def ti(self, create_task_instance):
ti = create_task_instance(
yield get_ti(
dag_id=self.DAG_ID,
task_id=self.TASK_ID,
execution_date=self.EXECUTION_DATE,
dagrun_state=DagRunState.RUNNING,
state=TaskInstanceState.RUNNING,
create_task_instance=create_task_instance,
)
ti.try_number = 1
ti.raw = False
yield ti
clear_db_runs()
clear_db_dags()

Expand Down Expand Up @@ -131,6 +141,38 @@ def test_read(self, ti):
assert '1' == metadatas[0]['offset']
assert timezone.parse(metadatas[0]['last_log_timestamp']) > ts

@pytest.mark.parametrize('seconds', [3, 6])
def test_read_missing_logs(self, seconds, create_task_instance):
"""
When the log actually isn't there to be found, we only want to wait for 5 seconds.
In this case we expect to receive a message of the form 'Log {log_id} not found in elasticsearch ...'
"""
ti = get_ti(
self.DAG_ID,
self.TASK_ID,
pendulum.instance(self.EXECUTION_DATE).add(days=1), # so logs are not found
create_task_instance=create_task_instance,
)
ts = pendulum.now().add(seconds=-seconds)
logs, metadatas = self.es_task_handler.read(ti, 1, {'offset': 0, 'last_log_timestamp': str(ts)})

assert 1 == len(logs)
if seconds > 5:
# we expect a log not found message when checking began more than 5 seconds ago
assert len(logs[0]) == 1
actual_message = logs[0][0][1]
expected_pattern = r'^\*\*\* Log .* not found in Elasticsearch.*'
assert re.match(expected_pattern, actual_message) is not None
assert metadatas[0]['end_of_log'] is True
else:
# we've "waited" less than 5 seconds so it should not be "end of log" and should be no log message
assert len(logs[0]) == 0
assert logs == [[]]
assert metadatas[0]['end_of_log'] is False
assert len(logs) == len(metadatas)
assert '0' == metadatas[0]['offset']
assert timezone.parse(metadatas[0]['last_log_timestamp']) == ts

def test_read_with_match_phrase_query(self, ti):
similar_log_id = (
f'{TestElasticsearchTaskHandler.TASK_ID}-'
Expand Down Expand Up @@ -210,15 +252,24 @@ def test_read_timeout(self, ti):
ts = pendulum.now().subtract(minutes=5)

self.es.delete(index=self.index_name, doc_type=self.doc_type, id=1)
# in the below call, offset=1 implies that we have already retrieved something
# if we had never retrieved any logs at all (offset=0), then we would have gotten
# a "logs not found" message after 5 seconds of trying
offset = 1
logs, metadatas = self.es_task_handler.read(
ti, 1, {'offset': 0, 'last_log_timestamp': str(ts), 'end_of_log': False}
task_instance=ti,
try_number=1,
metadata={
'offset': offset,
'last_log_timestamp': str(ts),
'end_of_log': False,
},
)
assert 1 == len(logs)
assert len(logs) == len(metadatas)
assert [[]] == logs
assert metadatas[0]['end_of_log']
# offset should be initialized to 0 if not provided.
assert '0' == metadatas[0]['offset']
assert str(offset) == metadatas[0]['offset']
assert timezone.parse(metadatas[0]['last_log_timestamp']) == ts

def test_read_as_download_logs(self, ti):
Expand Down