New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-244] Expose task/dag id/run data for debugging/monitoring purposes #1594
[AIRFLOW-244] Expose task/dag id/run data for debugging/monitoring purposes #1594
Conversation
Current coverage is 68.05%@@ master #1594 diff @@
==========================================
Files 116 116
Lines 8310 8321 +11
Methods 0 0
Messages 0 0
Branches 0 0
==========================================
+ Hits 5654 5663 +9
- Misses 2656 2658 +2
Partials 0 0
|
""" | ||
def wrapped(self, *args, **kwargs): | ||
if self.dag: | ||
os.environ['AIRFLOW_AIRBNB_DAG_ID'] = self.dag.dag_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kill AIRBNB (here and elsewhere)
Thinking about this a little bit more I think it might make sense to do away with the decorator and provide these environment variables to all operators. What do you think? |
^ I would tend to agree with that. I don't realize see a reason not to and as we role out increased support that decorator is going to appear in more and more places. |
os.environ['AIRFLOW_AIRBNB_DAG_RUN_ID'] = self.dag.start_date.isoformat() | ||
os.environ['AIRFLOW_AIRBNB_TASK_ID'] = self.task_id | ||
if self.start_date: | ||
os.environ['AIRFLOW_AIRBNB_TASK_RUN_ID'] = self.start_date.isoformat() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/TASK_RUN/TASK_INSTANCE
to be consistent with the nomenclature
Also should ID be START_TIME?
6134296
to
d75b82c
Compare
…ls like Dr. Elephant
raise NotImplementedError() | ||
if self.dag: | ||
os.environ['AIRFLOW_DAG_ID'] = self.dag.dag_id | ||
dagrun = context['dag_run'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aoen this is the correct way to get a dag run, yes? As discussed it is needed in order to retrieve the execution date.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, looks good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the context is prepared here:
https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L1424
and documented here:
http://pythonhosted.org/airflow/code.html#macros
You'll see that you can use context['execution_date']
straight up
Pretty much looks good just a few nits, awesome and speedy work :). I would get one more +1 since this is a fairly core change, e.g. from Paul/Max or one of the other committers like bolkedebruin / criccomini / jlowin . |
os.environ['AIRFLOW_DAG_ID'] = self.dag.dag_id | ||
dagrun = context['dag_run'] | ||
if dagrun and dagrun.execution_date: | ||
os.environ['AIRFLOW_DAGRUN'] = dagrun.execution_date.isoformat() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what happened to my old comments on github, but here they are again:
I think we should add an _EXECUTION_DATE suffix to AIRFLOW_DAGRUN (and same for TASK_INSTANCE below) in case we ever want other things from DAGRUN as env variables e.g. _START_DATE.
Looks like my comments got deleted (maybe due to a rebase?), but I think the super() addition you made in the operators should also be made in the operators in contrib. Also passing tests. |
…DATE suffix to env vars
I'm not big on the super call. Inheritance doesn't make sense to me here as BaseOperator isn't meant to be executed. At first I was thinking that you could just call a Then I thought that a better approach would be to call Operators that want to log more than just the basics could override |
os.environ['AIRFLOW_DAGRUN_EXECUTION_DATE'] = dagrun.execution_date.isoformat() | ||
os.environ['AIRFLOW_TASK_ID'] = self.task_id | ||
if self.start_date: | ||
os.environ['AIRFLOW_TASK_INSTANCE_EXECUTION_DATE'] = self.start_date.isoformat() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be nice to have a 100% predictable pattern where you replace dots by double underscore.
dag.dag_id becomes AIRFLOW_CTX__DAG__DAG_ID
I'd also go with a longer prefix AIRFLOW_CTX__
to avoid potential collisions.
Has any testing been done on this? If you're using this with doctor elephant, curious if it works as expected (especially if subprocs are being spawned). |
@criccomini I'm not sure if I understand exactly what you're asking but yes I tested the Airflow side of this. It's pretty simple since all it does is add some Airflow context information to env variables which can be picked up by the tool of your choice in a sub-process. |
K, that's all I was asking. :) |
Moved to #1607 |
Dear Airflow Maintainers,
Please accept this PR that addresses the following issues:
-https://issues.apache.org/jira/browse/AIRFLOW-244
This PR exposes task/dag id/run data for ingestion by performance analysis tools like Dr. Elephant through environment variables. This can be selectively applied to operators by decorating their execute functions with the new
export_execute_debug_metadata
decorator. Alternatively, if it seems that this is a feature that most operators are going to want, this could be implemented in such a way that the relevant data is always exposed before calling execute.