Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler getting crashed when downgrading from 2.8.0b1 to 2.7.3 #35914

Closed
1 of 2 tasks
vatsrahul1001 opened this issue Nov 28, 2023 · 8 comments · Fixed by #35959
Closed
1 of 2 tasks

Scheduler getting crashed when downgrading from 2.8.0b1 to 2.7.3 #35914

vatsrahul1001 opened this issue Nov 28, 2023 · 8 comments · Fixed by #35959
Assignees
Labels
area:core area:Scheduler Scheduler or dag parsing Issues kind:bug This is a clearly a bug

Comments

@vatsrahul1001
Copy link
Collaborator

vatsrahul1001 commented Nov 28, 2023

Apache Airflow version

2.8.0b1

What happened

The scheduler getting crashed when downgrading from 2.8.0b1 to 2.7.3
we had some running TIs when the downgrade happened, looks like Adopting tasks failing the scheduler.
could be due to this PR

What you think should happen instead

No response

How to reproduce

create 2.8.0b1 deployment
execute a couple of dags
downgrade to 2.7.3
scheduler goes in crash loop

Logs:

[2023-11-28T07:14:26.927+0000] {process_utils.py:131} INFO - Sending 15 to group 32. PIDs of all processes in the group: [32]
[2023-11-28T07:14:26.927+0000] {process_utils.py:86} INFO - Sending the signal 15 to group 32
[2023-11-28T07:14:27.140+0000] {process_utils.py:79} INFO - Process psutil.Process(pid=32, status='terminated', exitcode=0, started='07:14:25') (32) terminated with exit code 0
[2023-11-28T07:14:27.140+0000] {scheduler_job_runner.py:874} INFO - Exited execute loop
[2023-11-28T07:14:27.145+0000] {scheduler_command.py:49} ERROR - Exception when running scheduler job
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/cli/commands/scheduler_command.py", line 47, in _run_scheduler_job
    run_job(job=job_runner.job, execute_callable=job_runner._execute)
  File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", line 77, in wrapper
    return func(*args, session=session, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/jobs/job.py", line 289, in run_job
    return execute_job(job, execute_callable=execute_callable)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/jobs/job.py", line 318, in execute_job
    ret = execute_callable()
          ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/astronomer/airflow/version_check/plugin.py", line 30, in run_before
    fn(*args, **kwargs)
  File "/usr/local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 845, in _execute
    self._run_scheduler_loop()
  File "/usr/local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 927, in _run_scheduler_loop
    self.adopt_or_reset_orphaned_tasks()
  File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", line 77, in wrapper
    return func(*args, session=session, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 1601, in adopt_or_reset_orphaned_tasks
    for attempt in run_with_db_retries(logger=self.log):
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 347, in __iter__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 314, in iter
    return fut.result()
           ^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 1645, in adopt_or_reset_orphaned_tasks
    tis_to_adopt_or_reset = session.scalars(tis_to_adopt_or_reset).all()
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/result.py", line 1476, in all
    return self._allrows()
           ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/result.py", line 401, in _allrows
    rows = self._fetchall_impl()
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/result.py", line 1389, in _fetchall_impl
    return self._real_result._fetchall_impl()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/result.py", line 1813, in _fetchall_impl
    return list(self.iterator)
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/loading.py", line 147, in chunks
    fetch = cursor._raw_all_rows()
            ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/result.py", line 393, in _raw_all_rows
    return [make_row(row) for row in rows]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/result.py", line 393, in <listcomp>
    return [make_row(row) for row in rows]
            ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/sqltypes.py", line 1870, in process
    return loads(value)
           ^^^^^^^^^^^^
AttributeError: Can't get attribute 'ConfDict' on <module 'airflow.models.dagrun' from '/usr/local/lib/python3.11/site-packages/airflow/models/dagrun.py'>

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Astronomer

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@vatsrahul1001 vatsrahul1001 added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Nov 28, 2023
@RNHTTR RNHTTR added area:Scheduler Scheduler or dag parsing Issues and removed needs-triage label for new issues that we didn't triage yet labels Nov 28, 2023
@ephraimbuddy
Copy link
Contributor

I have tried to work around this but there seems to be no option other than reverting #35096. To start with, the field is designated as PickleType and not JSON, and forcing it to be json is causing this downgrade issue which is a breaking change. WDYT @uranusjr @potiuk @jscheffl

@ephraimbuddy
Copy link
Contributor

I’m getting a different error than the reported error in #35095. The error was reported in 2.5.3.
When I revert the change, main currently report this:

  [2023-11-29T13:49:52.170+0000] {app.py:1744} ERROR - Exception on /api/v1/dags/aaabug/dagRuns/manual__2023-11-29T13:48:51.980966+00:00/taskInstances/extract/logs/1 [GET]
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2529, in wsgi_app
    response = self.full_dispatch_request()
  File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1825, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1823, in full_dispatch_request
    rv = self.dispatch_request()
  File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1799, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
  File "/usr/local/lib/python3.8/site-packages/connexion/decorators/decorator.py", line 68, in wrapper
    response = function(request)
  File "/usr/local/lib/python3.8/site-packages/connexion/decorators/uri_parsing.py", line 149, in wrapper
    response = function(request)
  File "/usr/local/lib/python3.8/site-packages/connexion/decorators/validation.py", line 399, in wrapper
    return function(request)
  File "/usr/local/lib/python3.8/site-packages/connexion/decorators/response.py", line 113, in wrapper
    return _wrapper(request, response)
  File "/usr/local/lib/python3.8/site-packages/connexion/decorators/response.py", line 90, in _wrapper
    self.operation.api.get_connexion_response(response, self.mimetype)
  File "/usr/local/lib/python3.8/site-packages/connexion/apis/abstract.py", line 366, in get_connexion_response
    return cls._framework_to_connexion_response(response=response, mimetype=mimetype)
  File "/usr/local/lib/python3.8/site-packages/connexion/apis/flask_api.py", line 165, in _framework_to_connexion_response
    body=response.get_data() if not response.direct_passthrough else None,
  File "/usr/local/lib/python3.8/site-packages/werkzeug/wrappers/response.py", line 314, in get_data
    self._ensure_sequence()
  File "/usr/local/lib/python3.8/site-packages/werkzeug/wrappers/response.py", line 376, in _ensure_sequence
    self.make_sequence()
  File "/usr/local/lib/python3.8/site-packages/werkzeug/wrappers/response.py", line 391, in make_sequence
    self.response = list(self.iter_encoded())
  File "/usr/local/lib/python3.8/site-packages/werkzeug/wrappers/response.py", line 50, in _iter_encoded
    for item in iterable:
  File "/opt/airflow/airflow/utils/log/log_reader.py", line 87, in read_log_stream
    logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
  File "/opt/airflow/airflow/utils/log/log_reader.py", line 64, in read_log_chunks
    logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata)
  File "/opt/airflow/airflow/utils/log/file_task_handler.py", line 447, in read
    log, out_metadata = self._read(task_instance, try_number_element, metadata)
  File "/opt/airflow/airflow/utils/log/file_task_handler.py", line 335, in _read
    worker_log_rel_path = self._render_filename(ti, try_number)
  File "/opt/airflow/airflow/utils/log/file_task_handler.py", line 263, in _render_filename
    context = ti.get_template_context(session=session)
  File "/opt/airflow/airflow/models/taskinstance.py", line 2830, in get_template_context
    return _get_template_context(
  File "/opt/airflow/airflow/models/taskinstance.py", line 576, in _get_template_context
    validated_params = process_params(dag, task, dag_run, suppress_exception=ignore_param_exceptions)
  File "/opt/airflow/airflow/models/param.py", line 356, in process_params
    params.update(dag_run.conf)
  File "/opt/airflow/airflow/models/param.py", line 268, in update
    super().update(*args, **kwargs)
  File "/usr/local/lib/python3.8/_collections_abc.py", line 837, in update
    for key, value in other:
TypeError: 'numpy.int64' object is not iterable

The UI issue is that you can't see logs

@potiuk
Copy link
Member

potiuk commented Nov 29, 2023

I have not seen the change before but - this looks like DB structure change implemented without corresponding migration (this is why it has not been detected by our tests - because our tests perform forwards <-> backwards migration and they have not detected it. I think it is indeed generally fishy - and possibly what you see now with different type of error is result of having data writen by the new DB handled by the old code.

Yes. I'd be for reverting it.

@ephraimbuddy
Copy link
Contributor

cc @PashkPashk

@jscheffl
Copy link
Contributor

jscheffl commented Nov 29, 2023

Sorry, late to the game until I was able to see.

I am not sure about this and maybe when I reviewed this I was not considering a "downgrade" case. Is this something we always promise?
I would understand a breaking change problem if a running task/DAG can not be fetched from DB after upgrade.

Technical details here are: It is not a classic structural change of the DB by adding/altering/dropping a column but rather that the previous dict in the dag_run.conf is now using a custom ConfDict class which wraps the dict and adds validation for serialization. Nothing more.

But in contrast if you Did an upgrade and we implemented (any kind of) new feature and you make a downgrade, is it always promised that new jobs/features consumed can be safely downgraded?

In this case it is not a classic "migration" but a new wrapped object class just doing a validation in a new class wrapper. "migration" in such case would mean de-serializing all values in the DB, and writing back the content.
Same would be true if content of the serialized class would have changed.

This this improvement is not too critical, we might be able to discuss how to handle this, technically it can be re-implemented similar w/o generating the breaking change (with a bit of extra complexity only)

@ephraimbuddy
Copy link
Contributor

@jscheffl being able to downgrade after upgrading is something we promise though it's not written but we have some tests that runs migration upgrade and downgrade for different airflow versions.

In my view, the field type here is pickletype not JSON and validating it to be JSON is a breaking change. That apart, once you downgrade after upgrading to 2.8 with running task, the scheduler enters a crash loop. Even when you disable task adoption, the scheduler won't recover because the field used synonym and calls ConfDict which no longer exist.

In my view, this should be fixed at the params side since in 2.7.3 it's not causing the same problem it caused in 2.5.3 where the problem was reported. I tried different implementations at the dagrun side and all failed. I don't think there's anything else to do their other than changing the field type to JSON which will also cause a breaking change

@Gal40n04ek
Copy link

Hello! I'm trying to run airflow db downgrade==2.5.1 after upgrading the image to 2.7.3 (as a step of possible rollback in case smth goes wrong) and get similar Error of crashing Scheduler:

[2023-12-11T08:48:06.678+0000] {scheduler_job_runner.py:861} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
psycopg2.errors.UndefinedColumn: column slot_pool.include_deferred does not exist
LINE 1: SELECT slot_pool.pool, slot_pool.slots, slot_pool.include_de...
                                                ^


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/airflow/jobs/scheduler_job_runner.py", line 844, in _execute
    self._run_scheduler_loop()
  File "/usr/local/lib/python3.8/site-packages/airflow/jobs/scheduler_job_runner.py", line 976, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/usr/local/lib/python3.8/site-packages/airflow/jobs/scheduler_job_runner.py", line 1089, in _do_scheduling
    num_queued_tis = self._critical_section_enqueue_task_instances(session=session)
  File "/usr/local/lib/python3.8/site-packages/airflow/jobs/scheduler_job_runner.py", line 667, in _critical_section_enqueue_task_instances
    queued_tis = self._executable_task_instances_to_queued(max_tis, session=session)
  File "/usr/local/lib/python3.8/site-packages/airflow/jobs/scheduler_job_runner.py", line 320, in _executable_task_instances_to_queued
    pools = Pool.slots_stats(lock_rows=True, session=session)
  File "/usr/local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/airflow/models/pool.py", line 175, in slots_stats
    pool_rows = session.execute(query)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 1717, in execute
    result = conn._execute_20(statement, params or {}, execution_options)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
    ret = self._execute_context(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedColumn) column slot_pool.include_deferred does not exist
LINE 1: SELECT slot_pool.pool, slot_pool.slots, slot_pool.include_de...
                                                ^

[SQL: SELECT slot_pool.pool, slot_pool.slots, slot_pool.include_deferred 
FROM slot_pool FOR UPDATE NOWAIT]
(Background on this error at: https://sqlalche.me/e/14/f405)

@potiuk
Copy link
Member

potiuk commented Dec 11, 2023

It's a different issue @Gal40n04ek - I suggest that you open a new discussion and explain exactly the circumstances, database you used, error messages that you got when downgrading / upgrading rather than commenting on closed issue - then there might be a chance someone will help you there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core area:Scheduler Scheduler or dag parsing Issues kind:bug This is a clearly a bug
Projects
None yet
6 participants