Skip to content

Conversation

@kwkwc
Copy link

@kwkwc kwkwc commented Nov 11, 2021

I use task_instance_mutation_hook in Airflow 2.2.1 to dynamically assign queue:

def task_instance_mutation_hook(task_instance: TaskInstance):

    from airflow.settings import Session
    session = Session()
    dag_run = task_instance.get_dagrun(session)
    conf = dag_run.conf
    if conf and conf.get('queue'):
        task_instance.queue = conf.get('queue')

Doing so allows me to specify the queue normally.

This is my database data:

pic_2021-11-11 224552

But when I click on Instance Details or Clear in the UI, the queue is restored to its default value.

pic_2021-11-11 224648

This is the database data that was restored after I clicked on one's Instance Details:

pic_2021-11-11 225112

I found that the problem is that both Instance Details and Clear call the refresh_from_task and the current queue is overwritten.

def refresh_from_task(self, task, pool_override=None):
"""
Copy common attributes from the given task.
:param task: The task object to copy from
:type task: airflow.models.BaseOperator
:param pool_override: Use the pool_override instead of task's pool
:type pool_override: str
"""
self.task = task
self.queue = task.queue

If the queue is overwritten but the dag here is not changed, task_instance_mutation_hook will not be triggered, it is in verify_integrity.

def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session=None):
"""Only run DagRun.verify integrity if Serialized DAG has changed since it is slow"""
latest_version = SerializedDagModel.get_latest_version_hash(dag_run.dag_id, session=session)
if dag_run.dag_hash == latest_version:
self.log.debug("DAG %s not changed structure, skipping dagrun.verify_integrity", dag_run.dag_id)
return
dag_run.dag_hash = latest_version
# Refresh the DAG
dag_run.dag = self.dagbag.get_dag(dag_id=dag_run.dag_id, session=session)
# Verify integrity also takes care of session.flush
dag_run.verify_integrity(session=session)


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@kwkwc kwkwc requested review from XD-DENG, ashb and kaxil as code owners November 11, 2021 16:18
@boring-cyborg
Copy link

boring-cyborg bot commented Nov 11, 2021

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@kwkwc kwkwc closed this Nov 13, 2021
@kwkwc kwkwc force-pushed the fix_dynamic_queue_loss branch from eda8284 to 9519bf6 Compare November 13, 2021 17:53
@kwkwc kwkwc reopened this Nov 13, 2021
@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Dec 29, 2021
@github-actions github-actions bot closed this Jan 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

stale Stale PRs per the .github/workflows/stale.yml policy file

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant