Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6704] Copy common TaskInstance attributes from Task #7324

Merged
merged 6 commits into from Feb 21, 2020

Conversation

yuqian90
Copy link
Contributor

@yuqian90 yuqian90 commented Feb 1, 2020

Certain attributes of TaskInstance such as operator and queue are either not updated or only updated when when task is executed.
This causes some issues:

  • If a task is marked success or failed, the operator field is left as None. This causes bugs when some other code tries to use the operator field to find the name of the class.
  • When the pool or queue of a task is changed in the DAG definition, there is no way to re-run existing TaskInstance using the updated values. Clearing the TaskInstance does not update these attributes.

The fix is to copy TaskInstance from Task in a function refresh_from_task() consistently.


Issue link: AIRFLOW-6704

Make sure to mark the boxes below before creating PR: [x]

  • Description above provides context of the change
  • Commit message/PR title starts with [AIRFLOW-NNNN]. AIRFLOW-NNNN = JIRA ID*
  • Unit tests coverage for changes (not needed for documentation changes)
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions.
  • I will engage committers as explained in Contribution Workflow Example.

* For document-only changes commit message can start with [AIRFLOW-XXXX].


In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

@yuqian90 yuqian90 force-pushed the operator_none branch 2 times, most recently from 8d5959e to 273e79b Compare February 2, 2020 02:54
@codecov-io
Copy link

codecov-io commented Feb 2, 2020

Codecov Report

Merging #7324 into master will decrease coverage by 0.39%.
The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master    #7324     +/-   ##
=========================================
- Coverage   86.59%   86.19%   -0.4%     
=========================================
  Files         871      871             
  Lines       40660    40660             
=========================================
- Hits        35209    35047    -162     
- Misses       5451     5613    +162
Impacted Files Coverage Δ
airflow/models/taskinstance.py 94.28% <100%> (ø) ⬆️
...w/providers/apache/hive/operators/mysql_to_hive.py 100% <0%> (ø) ⬆️
airflow/operators/generic_transfer.py 100% <0%> (ø) ⬆️
airflow/kubernetes/volume_mount.py 44.44% <0%> (-55.56%) ⬇️
airflow/providers/postgres/operators/postgres.py 50% <0%> (-50%) ⬇️
airflow/kubernetes/volume.py 52.94% <0%> (-47.06%) ⬇️
airflow/security/kerberos.py 76.08% <0%> (ø) ⬆️
airflow/kubernetes/pod_launcher.py 47.18% <0%> (-45.08%) ⬇️
airflow/providers/mysql/operators/mysql.py 100% <0%> (ø) ⬆️
...roviders/google/cloud/operators/postgres_to_gcs.py 52.94% <0%> (-32.36%) ⬇️
... and 9 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 288a50a...278638d. Read the comment docs.

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remove the other place where it was set?

@yuqian90
Copy link
Contributor Author

yuqian90 commented Feb 2, 2020

Should we remove the other place where it was set?

I think we should keep them unless it's obvious some of them are redundant. From what I can tell, the constructor __init__() is not the only way a TaskInstance can be created. E.g. sometimes TaskInstance.task is not set when the TaskInstance is created from deserialization or be read from database via sqlalchemy.

One place I think it should be kept is in _run_raw_task. If the DAG author changes the type of an existing task that has been executed before to another class, without changing the task_id, we probably want the next execution of the task to update the TaskInstance.operator to the new type.

@ashb what do you think?

@yuqian90 yuqian90 changed the title [AIRFLOW-6704] Set TaskInstance.operator in constructor [AIRFLOW-6704] Copy common TaskInstance attributes from Task Feb 8, 2020
@yuqian90
Copy link
Contributor Author

yuqian90 commented Feb 8, 2020

@ashb I actually noticed some similar issues for other attributes too. For example, if the pool of an existing task is changed in the DAG code, there's no way to clear existing TaskInstance and re-run them in the new pool. So I extracted the repeated code that sets TaskInstance attributes from task into a function refresh_from_task() and used it everywhere.

Please take another look.

airflow/models/taskinstance.py Outdated Show resolved Hide resolved
airflow/models/taskinstance.py Outdated Show resolved Hide resolved
self.test_mode = test_mode
self.refresh_from_task(task, pool_override=pool)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it needed here if we call self.refresh_from_db immediately?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly trying to preserve the existing behavior and also move some duplicated code into refresh_from_task(). However, you are right that this part is not perfect:

Ideally we should first call refresh_from_db() and then call refresh_from_task(). The call to refresh_from_db() is to load those cumulative values such as self.try_number and self.max_tries from db so that individual runs of the task can increment these numbers. The call to refresh_from_task() is to get those configurable values from the latest DAG definition. However at the moment refresh_from_db() is loading both cumulative values and configurable attributes. So it also sets configurable values such as self.queue and self.operator which are most likely more useful to be read from DAG definition via refresh_task().

This PR is not trying to fix everything. It only consolidate some duplicated code and make attributes such as self.queue and self.pool update-able when tasks are cleared in clear_task_instances(). It's probably worth a separate and bigger PR to make sure refresh_from_db() is only reading those attributes that really should come from db and leave other attributes to refresh_from_task().

@kaxil kaxil merged commit 3ef7118 into apache:master Feb 21, 2020
@kaxil
Copy link
Member

kaxil commented Feb 21, 2020

Thanks @yuqian90 🎉

@yuqian90 yuqian90 deleted the operator_none branch February 22, 2020 03:58
galuszkak pushed a commit to FlyrInc/apache-airflow that referenced this pull request Mar 5, 2020
kaxil pushed a commit that referenced this pull request Mar 23, 2020
kaxil pushed a commit that referenced this pull request Mar 30, 2020
kaxil pushed a commit to astronomer/airflow that referenced this pull request Mar 30, 2020
@shanit-saha
Copy link

shanit-saha commented Jun 9, 2020

On Migrating Airflow from V1.10.2 to V1.10.10 One of our DAG have a task which is of dagrun_operator type.

Code snippet of the task looks something as below. Please assume that DAG dag_process_pos exists

task_trigger_dag_positional = TriggerDagRunOperator(
        trigger_dag_id="dag_process_pos",
        python_callable=set_up_dag_run_preprocessing,
        task_id="trigger_preprocess_dag",
        on_failure_callback=log_failure,
        execution_date=datetime.now(),
        provide_context=False,
        owner='airflow') 

def set_up_dag_run_preprocessing(context, dag_run_obj):
        ti = context['ti']
        dag_name = context['ti'].task.trigger_dag_id
        dag_run = context['dag_run']
        trans_id = dag_run.conf['transaction_id']
        routing_info = ti.xcom_pull(task_ids="json_validation", key="route_info")
        new_file_path = routing_info['file_location']
        new_file_name = os.path.basename(routing_info['new_file_name'])
        file_path = os.path.join(new_file_path, new_file_name)
        batch_id = "123-AD-FF"
        dag_run_obj.payload = {'inputfilepath': file_path,
                               'transaction_id': trans_id,
                               'Id': batch_id}

The DAG runs all fine. In fact the python callable of the task mentioned until the last line. Then it errors out.

[2020-06-09 11:36:22,838] {taskinstance.py:1145} ERROR - No row was found for one()
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/dagrun_operator.py", line 95, in execute
    replace_microseconds=False)
  File "/usr/local/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py", line 141, in trigger_dag
    replace_microseconds=replace_microseconds,
  File "/usr/local/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py", line 98, in _trigger_dag
    external_trigger=True,
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/models/dag.py", line 1471, in create_dagrun
    run.refresh_from_db()
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/models/dagrun.py", line 109, in refresh_from_db
    DR.run_id == self.run_id
  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3446, in one
    raise orm_exc.NoResultFound("No row was found for one()")
sqlalchemy.orm.exc.NoResultFound: No row was found for one()

After which the on_failure_callback of that task is executed and all code of that callable runs perfectly ok as is expected. The query here is why did the dagrun_operator fail after the python callable.
P.S : The DAG that is being triggered by the TriggerDagRunOperator , in this case dag_process_pos starts with task of typedummy_operator

@shanit-saha
Copy link

To add to my above post. It has so been found that when we remove execution_date parameter from the TriggerDagRunOperator it works. Is that a problem or change in the syntax. Also the Airflow-Dag examples provided with Airflow1.10.10 repository does not have a sample code for TriggerDagOperator

@yuqian90
Copy link
Contributor Author

Hi @shanit-saha

execution_date=datetime.now()

I think this line is the cause of the problem. I believe what you actually want to express is to trigger dag_process_pos with execution_date equal to the datetime at the time of executing TriggerDagRunOperator.

However, doing execution_date=datetime.now() at the time of constructing TriggerDagRunOperator means you are triggering dag_process_pos at the time of parsing the DAG that contains the TriggerDagRunOperator. Since the value of execution_date keeps changing each time the DAG is parsed, the result is some kind of unexpected errors.

I think you already realized if you leave execution_date as None, TriggerDagRunOperator defaults to timezone.utcnow().isoformat() which does what you need. Alternatively you can also set execution_date to a jinja template str yourself. The gist is that you should not have an argument that keeps changing whenever the DAG is parsed.

If you think the doc of TriggerDagRunOperator needs to be improved. Pls create an issue for that separately.

@shanit-saha
Copy link

@yuqian90 : Thank You ! for your response.
If so be the case then just wondering the same code with execution_date works perfectly fine with the earlier version of Airflow that we are using i.e. v1.10.2. If the dagrun_operator construct changed, I could not get any literature about that.

@yuqian90
Copy link
Contributor Author

@yuqian90 : Thank You ! for your response.
If so be the case then just wondering the same code with execution_date works perfectly fine with the earlier version of Airflow that we are using i.e. v1.10.2. If the dagrun_operator construct changed, I could not get any literature about that.

Many things changed between 10.2 and 10.10. I haven't looked too carefully into the error you have. I can't tell if this PR is related. Any reason you think this PR is the cause?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants