Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6544] add log_id to end_of_log mark log record #7141

Closed
wants to merge 32 commits into from

Conversation

larryzhu2018
Copy link

@larryzhu2018 larryzhu2018 commented Jan 11, 2020

The “end of log” marker does not include the primary key of the logs: log_id. The issue is then airflow-web does not know when to stop tailing the logs.
We print the end of log marker using handler emit() method that adds the dag_id, task_id, execution_date and try_number into the log records. Previously we do not have tests to validate the log_id look up logic from elasticsearch and such a test is added here.


Issue link: AIRFLOW-6544

  • Description above provides context of the change

  • Commit message/PR title starts with [AIRFLOW-NNNN]. AIRFLOW-NNNN = JIRA ID*

  • Unit tests coverage for changes (not needed for documentation changes)

  • Commits follow "How to write a good git commit message"

  • Relevant documentation is updated including usage instructions.

  • I will engage committers as explained in Contribution Workflow Example.

* For document-only changes commit message can start with [AIRFLOW-XXXX].


In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

@boring-cyborg
Copy link

boring-cyborg bot commented Jan 11, 2020

Congratulations on your first Pull Request and welcome to the Apache Airflow community!
If you have any issues or are unsure about any anything please check our
Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)

Here are some useful points:

  • Pay attention to the quality of your code (flake8, pylint and type annotations). Our pre-commits
    will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory).
    Adding a new operator? Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing
    locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from
    Committers.

Apache Airflow is a community-driven project and together we are making it better 🚀.

In case of doubts contact the developers at:
Mailing List: dev@airflow.apache.org
Slack: https://apache-airflow-slack.herokuapp.com/

@larryzhu2018 larryzhu2018 changed the title add log_id to end-of-file mark and also add an index config for logs [AIRFLOW-6544] add log_id to end-of-file mark and also add an index config for logs Jan 12, 2020
@codecov-io
Copy link

codecov-io commented Jan 13, 2020

Codecov Report

Merging #7141 into master will increase coverage by 0.76%.
The diff coverage is 66.66%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master   #7141      +/-   ##
=========================================
+ Coverage   85.24%     86%   +0.76%     
=========================================
  Files         683     866     +183     
  Lines       39155   40559    +1404     
=========================================
+ Hits        33378   34884    +1506     
+ Misses       5777    5675     -102
Impacted Files Coverage Δ
airflow/config_templates/airflow_local_settings.py 70.21% <0%> (-1.53%) ⬇️
airflow/utils/log/es_task_handler.py 93.57% <80%> (+0.18%) ⬆️
airflow/operators/postgres_operator.py 0% <0%> (-100%) ⬇️
airflow/operators/mysql_to_hive.py 0% <0%> (-100%) ⬇️
...rflow/contrib/sensors/sagemaker_training_sensor.py 0% <0%> (-100%) ⬇️
airflow/contrib/operators/snowflake_operator.py 0% <0%> (-95.84%) ⬇️
airflow/operators/s3_to_hive_operator.py 0% <0%> (-93.97%) ⬇️
airflow/contrib/hooks/azure_data_lake_hook.py 0% <0%> (-93.11%) ⬇️
airflow/contrib/hooks/grpc_hook.py 0% <0%> (-91.94%) ⬇️
airflow/contrib/sensors/azure_cosmos_sensor.py 0% <0%> (-81.25%) ⬇️
... and 722 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update d654d69...1bb12f3. Read the comment docs.

airflow/config_templates/default_airflow.cfg Outdated Show resolved Hide resolved
airflow/utils/log/es_task_handler.py Show resolved Hide resolved
self.handler.stream.write(self.end_of_log_mark)
if self.write_stdout:
print()
self.handler.emit(logging.makeLogRecord({'msg': self.end_of_log_mark}))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this change where the end of log mark goes?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I sent email to them and suggested those changes are bad and copied that message to you. Please see email I sent to you and authors of 5528.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I meant that I sent the email to Andrii and the two authors of 5528.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from 5528:
When the end_of_log_mark is wrapped in a log record, the end_of_log_mark can no longer be
able to match the log line in _read:

    metadata['end_of_log'] = False if not logs \
        else logs[-1].message == self.end_of_log_mark.strip()
 It leads to the UI keeps calling backend and generates lots of load to ES.

By removing the log_id from the end-of-log mark, it would make it worse as the ui would continue to try to find the end-of-log mark and it won't ever find it as it searches the end-of-log mark by log_id.

I am not sure what the sentence mean by "When the end_of_log_mark is wrapped in a log record". I also observed that the end-of-log mark might end up within the same line of other log lines and it would prevent us from finding the end-of-log mark in those cases. To address that, I always add an obnoxious print right in front of the end-of-log mark line, to ensure the "end-of-log" mark is always in a separate line when printing to console. This is import for filebeat/logstash on kubernetes to pick up the end-of-log mark log line in a separate document.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means the last line would be something like [2020-01-16 07:58:32,712] {es_task_handler.py:XXX} INFO [end_of_log_mark] and thus made the reader unable to understand it.

I'm a bit lost how did this removed the log id from the end_of_log_mark. Isn't the log_id we constructed in this file only for log fetching? My understanding is that the log_id is determined when we upload the log, e.g. when we pipe stdout to logstash or when we upload file through filebeat to logstash.

Maybe I was understanding this wrong and there is indeed a bug. In that case I would agree on spliting this change into two PRs for sanity purpose.

Copy link
Contributor

@pingzh pingzh Jan 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @larryzhu2018 for your description:
image

It looks like you have write_stdout as true, in fact, my pr: #7199 fixed the always true for write_stdout. could you please test the case when write_stdout is False.

Our set up is:
write_stdout is False
json is False
END_OF_LOG_MARK = u'\u0004\n'

This is what we see in the Kibana
image

I remember if the end_of_log is wrapped in a logging.makeLogRecord it will start with

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this works as long as you do not put white spaces into your end_of_log_mark. I think you would just ask for troubles by putting white space characters into the end-of-log mark.

I am not sure why using white space will cause troubles.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @larryzhu2018 for your description:
image

It looks like you have write_stdout as true, in fact, my pr: #7199 fixed the always true for write_stdout. could you please test the case when write_stdout is False.
[larryzhu2018]: the test case I have in the PR, test_close_with_log_id, is setting write_stdout to False so please check.

Our set up is:
write_stdout is False
json is False
END_OF_LOG_MARK = u'\u0004\n'

This is what we see in the Kibana
image

I remember if the end_of_log is wrapped in a logging.makeLogRecord it will start with

image
it is confusing in kibana because you use a white space as the mark. what do you think this would look like if you use "end_of_log_for_airflow_task_instance" as that is what I use in deployment.

There's a lot of disagreement going on here.

@larryzhu2018 Can you please give me DETAILED instructions on how to test this bug and your fix locally, assujming I have no current elasticsearch or kibana set up currently. (Docker would be preferred.)

Please note that I did not change the production code here. I only reverted Ping's recent fix that broke logging using elastic search. I also provided unit test case that tests the log_id logic, show how log_id is used and I also showed here the ingest processor we use so any one who has an elasticsearch can copy and paste and try it out. Is this sufficient? I won't have time to add docker, and elastic ingestion nodes etc as I did not add the elastic-search logging support myself. I only reverted a recent change because the authors did not understand the code logic or how it is supposed to work. Do you see the community service I provided here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this works as long as you do not put white spaces into your end_of_log_mark. I think you would just ask for troubles by putting white space characters into the end-of-log mark.

I am not sure why using white space will cause troubles.
see the .strip() call as I quoted earlier. This basically means if you have empty spaces in your end-of-log mark, it won't work and it will confuse the users.

else logs[-1].message == self.end_of_log_mark.strip()

Does this make sense now?

But please do not remove .strip() call to "fix" this. It is hard to guarantee the event pipelines preserve whitespaces so it is best to not use whitespace in your end-of-log mark.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 else logs[-1].message == self.end_of_log_mark.strip()

I think this will not work if the end_of_log_mark is wrapped as a log record, as the message will be the format that Kevin mentioned and it also depends on the log formatter.
I did not change any production code but did add a test case to show how this works.

in short, you will need to deploy an ingest log processor, and the ones we have look like this:

"description" : "cluster json log Pipeline",
"processors" : [
{
"rename" : {
"field" : "message",
"target_field" : "raw_message"
}
},
{
"json" : {
"field" : "raw_message",
"add_to_root" : false,
"target_field" : "json_target"
}
},
{
"grok" : {
"field" : "json_target.message",
"patterns" : [
"Job %{DATA:job_id}: Subtask %{DATA} %{GREEDYDATA:json_msg}",
"%{GREEDYDATA}"
]
}
},
{
"json" : {
"field" : "json_msg",
"add_to_root" : true,
"if" : "ctx.job_id != null"
}
},
{
"json" : {
"field" : "raw_message",
"add_to_root" : true,
"if" : "ctx.job_id == null"
}
},
{
"remove" : {
"field" : "json_msg",
"ignore_missing" : true
}
},
{
"remove" : {
"field" : "json_target"
}
},
{
"set" : {
"field" : "event.kind",
"value" : "tasks",
"if" : "ctx.message != null"
}
},
{
"set" : {
"field" : "event.dataset",
"value" : "airflow",
"if" : "ctx.dag_id != null && ctx.task_id != null"
}
},
{
"set" : {
"field" : "log_id",
"value" : "{{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}",
"if" : "ctx.event?.dataset == 'airflow'"
}
},
{
"set" : {
"field" : "offset",
"value" : "{{log.offset}}",
"if" : "ctx.event?.dataset == 'airflow'"
}
}
],
"on_failure" : [
{
"set" : {
"field" : "error.message",
"value" : "{{ _ingest.on_failure_message }}"
}
}
]

@ashb
Copy link
Member

ashb commented Jan 14, 2020

cc @andriisoldatenko @schnie

@andriisoldatenko
Copy link
Contributor

andriisoldatenko commented Jan 14, 2020

@ashb IMO better to split 2 unrelated changes into 2 tickets:

  • add index (good idea)
  • fix bug another ticket?
    general note: i would like to see test case to proof bug and improve test coverage after this change.

@ash
Copy link

ash commented Jan 14, 2020

If you can spell @ashb correctly, that would be great.

@andriisoldatenko
Copy link
Contributor

If you can spell @ashb correctly, that would be great.

sorry my mistake :(

@ashb
Copy link
Member

ashb commented Jan 14, 2020

@larryzhu2018 Yup, this is two tickets as Andrii mentions.

And this seems to revert the change from #6159 -- so you will need to explain this change in much more detail as to why you think it your version is now right.

@larryzhu2018 larryzhu2018 changed the title [AIRFLOW-6544] add log_id to end-of-file mark and also add an index config for logs [AIRFLOW-6544] add log_id to end_of_log mark log record and also add an index config for logs Jan 18, 2020
Copy link
Author

@larryzhu2018 larryzhu2018 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue link: AIRFLOW-6544
The “end of log” marker does not include the primary key of the logs: log_id. The issue is then airflow-web does not know when to stop tailing the logs.
We print the end of log marker using handler emit() method that adds the dag_id, task_id, execution_date and try_number into the log records. Previously we do not have tests to validate the log_id look up logic from elastic search and such a test is added here.

@larryzhu2018 larryzhu2018 changed the title [AIRFLOW-6544] add log_id to end_of_log mark log record and also add an index config for logs [AIRFLOW-6544] add log_id to end_of_log mark log record Jan 19, 2020
@turbaszek
Copy link
Member

reviewers please help to check the latest iteration

You have to add a description above --- in Pr message:

Your description goes here.
---
Issue link: [AIRFLOW-6544](https://issues.apache.org/jira/browse/AIRFLOW-6544)

@larryzhu2018
Copy link
Author

reviewers please help to check the latest iteration

You have to add a description above --- in Pr message:

Your description goes here.
---
Issue link: [[AIRFLOW-6544](https://issues.apache.org/jira/browse/AIRFLOW-6544)](https://issues.apache.org/jira/browse/[AIRFLOW-6544](https://issues.apache.org/jira/browse/AIRFLOW-6544))

is it correct now?

@ashb
Copy link
Member

ashb commented Jan 29, 2020

There's a lot of disagreement going on here.

@larryzhu2018 Can you please give me DETAILED instructions on how to test this bug and your fix locally, assujming I have no current elasticsearch or kibana set up currently. (Docker would be preferred.)

@larryzhu2018
Copy link
Author

There's a lot of disagreement going on here.

@larryzhu2018 Can you please give me DETAILED instructions on how to test this bug and your fix locally, assujming I have no current elasticsearch or kibana set up currently. (Docker would be preferred.)

Please note that I did not change the production code here. I only reverted Ping's recent fix that broke logging using elastic search. I also provided unit test case that tests the log_id logic, show how log_id is used and I also showed here the ingest processor we use so any one who has an elasticsearch can copy and paste and try it out. Is this sufficient? I won't have time to add docker, and elastic ingestion nodes etc as I did not add the elastic-search logging support myself. I only reverted a recent change because the authors did not understand the code logic or how it is supposed to work. Do you see the community service I provided here?

@ashb
Copy link
Member

ashb commented Jan 29, 2020

I only reverted Ping's recent fix that broke logging using elastic search.

The disagreement is wether it's even broken or not. You say it is, Ping says it isn't.

If Ping/Kevin don't respond soon then I'll try and follow your ingest instructions and check our the before and after.

@larryzhu2018
Copy link
Author

larryzhu2018 commented Jan 29, 2020 via email

@larryzhu2018
Copy link
Author

larryzhu2018 commented Jan 31, 2020 via email

@derlaft
Copy link

derlaft commented Apr 1, 2020

Any updates on this? Not sure why, but this seems stuck. Any help wanted?

@larryzhu2018
Copy link
Author

Any updates on this? Not sure why, but this seems stuck. Any help wanted?

can you please help to move this forward?

@larryzhu2018
Copy link
Author

Can you please help to move this forward?

@pingzh
Copy link
Contributor

pingzh commented Apr 26, 2020

Can you please help to move this forward?

testing now

@pingzh
Copy link
Contributor

pingzh commented Apr 26, 2020

I replicated the change in our internal staging env, the webserver does not stop fetching the log until it timed out. As you can see the last line is
image

Our setting is END_OF_LOG_MARK = u'\u0004\n'


[elasticsearch]
# Elasticsearch host
host =
# Format of the log_id, which is used to query for a given tasks logs
log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
# Used to mark the end of a log stream for a task
end_of_log_mark = end_of_log
# Qualified URL for an elasticsearch frontend (like Kibana) with a template argument for log_id
# Code will construct log_id using the log_id template from the argument above.
# NOTE: The code will prefix the https:// automatically, don't include that here.
frontend =
# Write the task logs to the stdout of the worker, rather than the default files
write_stdout = False
# Instead of the default log formatter, write the log lines as JSON
json_format = False
# Log fields to also attach to the json output, if enabled
json_fields = asctime, filename, lineno, levelname, message

this is the log from the log file:
image


One thing i also noticed is that in your code, the ELASTICSEARCH_WRITE_STDOUT: str = conf.get('elasticsearch', 'WRITE_STDOUT') is always true, since it is using the conf.get. this is fixed in this PR: #7199

@larryzhu2018
Copy link
Author

I replicated the change in our internal staging env, the webserver does not stop fetching the log until it timed out. As you can see the last line is
image

Our setting is END_OF_LOG_MARK = u'\u0004\n'

logging pipeline in general does not work with whitespaces. Can you please change this to "end_of_log_for_airflow_task_instance" and also as I mentioned before you will need to turn on json format for the elastic search scenarios to work well because otherwise parsing the dag_id, task_id etc would be harder in elasticsearch. Please see the ingestion pipeline that I shared out earlier.

Here are the configurations I use for enabling logging for elasticsearch

config:
AIRFLOW__CORE__REMOTE_LOGGING: "True"
AIRFLOW__ELASTICSEARCH__HOST: "dev-iad-cluster-ingest.controltower:9200"
AIRFLOW__ELASTICSEARCH__LOG_ID_TEMPLATE: "{dag_id}-{task_id}-{execution_date}-{try_number}"
AIRFLOW__ELASTICSEARCH__END_OF_LOG_MARK: "end_of_log_for_airflow_task_instance"
AIRFLOW__ELASTICSEARCH__WRITE_STDOUT: "True"
AIRFLOW__ELASTICSEARCH__JSON_FORMAT: "True"
AIRFLOW__ELASTICSEARCH__JSON_FIELDS: "asctime, filename, lineno, levelname, message"
AIRFLOW__ELASTICSEARCH__INDEX: "filebeat-*"
AIRFLOW__LOGGING__COLORED_CONSOLE_LOG: "False"


[elasticsearch]
# Elasticsearch host
host =
# Format of the log_id, which is used to query for a given tasks logs
log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
# Used to mark the end of a log stream for a task
end_of_log_mark = end_of_log
# Qualified URL for an elasticsearch frontend (like Kibana) with a template argument for log_id
# Code will construct log_id using the log_id template from the argument above.
# NOTE: The code will prefix the https:// automatically, don't include that here.
frontend =
# Write the task logs to the stdout of the worker, rather than the default files
write_stdout = False
# Instead of the default log formatter, write the log lines as JSON
json_format = False
# Log fields to also attach to the json output, if enabled
json_fields = asctime, filename, lineno, levelname, message

this is the log from the log file:
image

One thing i also noticed is that in your code, the ELASTICSEARCH_WRITE_STDOUT: str = conf.get('elasticsearch', 'WRITE_STDOUT') is always true, since it is using the conf.get. this is fixed in this PR: #7199

thanks. I did not change this. this does not impact my scenarios as I deploy airflow in kubernetes and I need to have the write-standout always be true.

@stale
Copy link

stale bot commented Jun 10, 2020

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Jun 10, 2020
@ashb ashb removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Jun 11, 2020
@kaxil kaxil added the pinned Protect from Stalebot auto closing label Jul 14, 2020
@potiuk potiuk closed this Sep 18, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:logging pinned Protect from Stalebot auto closing
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet