Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAGRun API endpoint returns status 500 for some run states #19836

Closed
1 of 2 tasks
frapa-az opened this issue Nov 26, 2021 · 14 comments
Closed
1 of 2 tasks

DAGRun API endpoint returns status 500 for some run states #19836

frapa-az opened this issue Nov 26, 2021 · 14 comments
Labels
area:core kind:bug This is a clearly a bug

Comments

@frapa-az
Copy link

Apache Airflow version

2.0.2

Operating System

Unknown

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

No response

What happened

I am using the REST API.

When I make a request to the DAGRun endpoint and one DAGRun is in state skipped, the request fails with 500 internal server error:

{
    "detail": "'skipped' is not one of ['success', 'running', 'failed']\n\nFailed validating 'enum' in schema['allOf'][0]['properties']['dag_runs']['items']['properties']['state']:\n    {'description': 'DAG State.',\n     'enum': ['success', 'running', 'failed'],\n     'readOnly': True,\n     'type': 'string',\n     'x-scope': ['',\n                 '#/components/schemas/DAGRunCollection',\n                 '#/components/schemas/DAGRun']}\n\nOn instance['dag_runs'][1]['state']:\n    'skipped'",
    "status": 500,
    "title": "Response body does not conform to specification",
    "type": "https://airflow.apache.org/docs/2.0.2/stable-rest-api-ref.html#section/Errors/Unknown"
}

This seemingly happens because the state of the run is skipped, and not one of those three.

What you expected to happen

I would expect state=skipped to be returned by the API.

How to reproduce

No response

Anything else

I have no access and do not know how AirFlow was set-up in our current system.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@frapa-az frapa-az added area:core kind:bug This is a clearly a bug labels Nov 26, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Nov 26, 2021

Thanks for opening your first issue here! Be sure to follow the issue template!

@uranusjr
Copy link
Member

WTH, a DAG run is not supposed to be skipped. There’s a serious bug somewhere.

@ephraimbuddy
Copy link
Contributor

@frapa-az How can we reproduce DagRun to be in skipped state?

@frapa-az
Copy link
Author

frapa-az commented Nov 29, 2021

@ephraimbuddy Thanks for answering!

I manage to reproduce it by simply pausing the DAG (not the DAG run). I do not know exactly if this is timing dependent, but for me it seems rather easy to reproduce. Usually I pause just after the run was scheduled.

image

I have a few in the last few days, so it must not be impossible:

image

@uranusjr
Copy link
Member

Just curious, are any of your DAGs happen to be subDAGs?

@frapa-az
Copy link
Author

frapa-az commented Nov 29, 2021

@uranusjr what do you mean by subDags? I do not unfortunately know the details because we have an abstraction layer on top, but I think there's actually a single DAG that runs all pipelines, creating tasks dinamically based on a json spec. Does that help?

@uranusjr
Copy link
Member

I think there's actually a single DAG that runs all pipelines,

Ah!

SubDAGs are DAGs that belong to another DAG, created with SubDagOperator: https://www.astronomer.io/guides/subdags

I’m guessing your abstraction layer is implemented exactly with this—the top-level DAG creates a task for each of the DAGs you create through the layer, and those DAGs (except the top one) are actually subDAGs from Airflow’s perspective.

From my code search, there is one single possible code path that would set a DAG run’s state to skipped. If a SubDagOperator task is skipped, the task scheduler would set its state to skipped (obviously), and the operator would also set its controlling subDAG’s state to SKIPPED.

This is the exact function that does it:

def verify_dagruns(dag_runs, commit, state, session, current_task):
"""Verifies integrity of dag_runs.
:param dag_runs: dag runs to verify
:param commit: whether dag runs state should be updated
:param state: state of the dag_run to set if commit is True
:param session: session to use
:param current_task: current task
:return:
"""
for dag_run in dag_runs:
dag_run.dag = current_task.subdag
dag_run.verify_integrity()
if commit:
dag_run.state = state
session.merge(dag_run)

This function is only called by get_subdag_runs, which is only called by set_state (all in the same module), which is used to set a task instance’s state.

A DAG run can only be QUEUED, RUNNING, SUCCESS, or FAILED. I think setting the state to FAILED makes the most sense in this case?

@frapa-az
Copy link
Author

frapa-az commented Nov 29, 2021

@uranusjr Wow, thanks for finding out! I can ask the team responsible for this, if they give me more details to ensure that this is really the case. Most probably I'll come back tomorrow with an answer.

I guess setting it to FAILED makes the most sense among those options.

@uranusjr
Copy link
Member

When you do, it’d be most awesome if you could also tell them we appreciate them pushing how Airflow works to the limit that this obscure bug can surface 😛

@frapa-az
Copy link
Author

I digged the code. We're not using Sub-DAGs, but we do something really weird with callbacks. So after all, this is our misuse of AirFlow and not a bug in AirFlow itself. Here the relevant snippet (I believe):

def on_failure_callback(context):
    if isinstance(context['exception'], TaskCanceledException):
        session = settings.Session()
        context['task_instance'].set_state(State.SKIPPED, session=session)
        context['dag_run'].state = State.SKIPPED
        session.merge(context['dag_run'])
        session.commit()
    
task.on_failure_callback = on_failure_callback

However, maybe it would be good to add some validation to the states before saving them to avoid that they go to such inconsistent states? I find it a little strange that AirFlow accepts this to be saved into the database, but then validates it when returning it from the API.

@potiuk
Copy link
Member

potiuk commented Nov 30, 2021

However, maybe it would be good to add some validation to the states before saving them to avoid that they go to such inconsistent states? I find it a little strange that AirFlow accepts this to be saved into the database, but then validates it when returning it from the API.

Currently you are fully in power of what you do with Airflow code, but power also means responsibility. This is the basic premise of Airlfow "open DB access" mode that Airflow currently supports. There is nothig to prevent any DB operations from the code point of view. You'd have to implement complex Databse triggers or tap into SQLAlchemy logic to prevent this (but the latter could be bypassed). Airflow supports "raw" DB operation. Use those powers wisely.

And this one is not very disruptive opration. Currently in Airlfow you can literally delete whole database from callback if you want. So preventing this case is like trying to plug a small hole with your finger where half of the bottom of your boat is missing.

However, we are already working on Multitenancy implementation and specifically it includes DB-Isolation mode, where you won't be able to directly execute DB operations, but you will have some API methods to call (including set_task state on your task).

Until there it makes litle sense to do anything with it.

See the notes here: (https://docs.google.com/document/d/19d0jQeARnWm8VTVc_qSIEDldQ81acRk0KuhVwAd96wo/edit).

@potiuk
Copy link
Member

potiuk commented Nov 30, 2021

Closing this one for now then

@uranusjr
Copy link
Member

FWIW, we can pretty trivially validate state values on assignment (both on DagRun and TaskInstance, even though the latter one isn’t the problem here). Feel free to create a pull request for this. I created #19898 for this.

@potiuk
Copy link
Member

potiuk commented Nov 30, 2021

FWIW, we can pretty trivially validate state values on assignment (both on DagRun and TaskInstance, even though the latter one isn’t the problem here). Feel free to create a pull request for this. I created #19898 for this.

Ah right!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

4 participants