Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix decryption of trigger kwargs when downgrading. #38743

Merged
merged 2 commits into from Apr 4, 2024

Conversation

ephraimbuddy
Copy link
Contributor

This failed because the query for Triggers after the downgrade lazy loads the taskinstance table(ORM) which doesn't have the task_display_name column at that downloaded point.

The fix was to query specifically on the encrypted_kwargs column.

This failed because the query for Triggers after the downgrade lazy
loads the taskinstance table(ORM) which doesn't have the task_display_name
column at that downloaded point.

The fix was to query specifically on the encrypted_kwargs column.
@ephraimbuddy
Copy link
Contributor Author

Figuring out why this was not detected in our tests...

@potiuk
Copy link
Member

potiuk commented Apr 4, 2024

Figuring out why this was not detected in our tests...

🤔

@potiuk
Copy link
Member

potiuk commented Apr 4, 2024

@ephraimbuddy

Because we are running up-down migration with empty tables I think.

We should likely add some steps in our migration up/down to generate some data -> i.e. run a bunch of example dags - both before migration up and after migration up and before migration down.

@potiuk
Copy link
Member

potiuk commented Apr 4, 2024

Created an issue for that #38744

@ephraimbuddy
Copy link
Contributor Author

ephraimbuddy commented Apr 4, 2024

@ephraimbuddy

Because we are running up-down migration with empty tables I think.

We should likely add some steps in our migration up/down to generate some data -> i.e. run a bunch of example dags - both before migration up and after migration up and before migration down.

I reproduced it in breeze without having data on the tables

@potiuk
Copy link
Member

potiuk commented Apr 4, 2024

Strange. It definitely runs in main migration :

[2024-04-04T02:07:28.405+0000] {db.py:1745} INFO - Applying downgrade migrations.
INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO [alembic.runtime.migration] Will assume transactional DDL.
INFO [alembic.runtime.migration] Running downgrade 677fdbb7fc54 -> 1949afb29106, add new executor field to db

Here:

INFO [alembic.runtime.migration] Running downgrade 1949afb29106 -> ee1467d4aa35, update trigger kwargs type

@potiuk
Copy link
Member

potiuk commented Apr 4, 2024

Are you sure when you reproduced it your Trigger table was empty? looking at the fix, the whole for loop would be skipped if it was empty. Maybe you run it with start-airflow and trigger managed to create somethign there ? Or maybe you had old data ? Breeze by default keeps the data in the specially kept volumes until you run breeze with --db-reset or run breeze down so there could be some dangling data there.

airflow/utils/db.py Show resolved Hide resolved
airflow/utils/db.py Show resolved Hide resolved
@ephraimbuddy
Copy link
Contributor Author

Are you sure when you reproduced it your Trigger table was empty? looking at the fix, the whole for loop would be skipped if it was empty. Maybe you run it with start-airflow and trigger managed to create somethign there ? Or maybe you had old data ? Breeze by default keeps the data in the specially kept volumes until you run breeze with --db-reset or run breeze down so there could be some dangling data there.

I reconfirmed that it was empty. I reproduced it. The error happens on the query itself not on the decryption. The session.query(Triggers) causes the task instance table to be loaded and because ORM already has task_display_name but this no longer exist in db after downgrade, the error occurs. Below is the stacktrace:

INFO  [alembic.runtime.migration] Running downgrade 1fd565369930 -> 88344c1d9134, Add rendered_map_index to TaskInstance.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
psycopg2.errors.UndefinedColumn: column task_instance_1.task_display_name does not exist
LINE 1: ...tance_1.try_number AS task_instance_1_try_number, task_insta...
                                                             ^


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/opt/airflow/airflow/__main__.py", line 58, in main
    args.func(args)
  File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
    return f(*args, **kwargs)
  File "/opt/airflow/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/cli/commands/db_command.py", line 182, in downgrade
    db.downgrade(to_revision=to_revision, from_revision=from_revision, show_sql_only=args.show_sql_only)
  File "/opt/airflow/airflow/utils/session.py", line 79, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/airflow/airflow/utils/db.py", line 1752, in downgrade
    decrypt_trigger_kwargs(session=session)
  File "/opt/airflow/airflow/utils/db.py", line 996, in decrypt_trigger_kwargs
    for trigger in session.query(Trigger):
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/query.py", line 2901, in __iter__
    result = self._iter()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/query.py", line 2916, in _iter
    result = self.session.execute(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1717, in execute
    result = conn._execute_20(statement, params or {}, execution_options)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
    ret = self._execute_context(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedColumn) column task_instance_1.task_display_name does not exist
LINE 1: ...tance_1.try_number AS task_instance_1_try_number, task_insta...
                                                             ^

[SQL: SELECT trigger.kwargs AS trigger_kwargs, trigger.id AS trigger_id, trigger.classpath AS trigger_classpath, trigger.created_date AS trigger_created_date, trigger.triggerer_id AS trigger_triggerer_id, task_instance_1.try_number AS task_instance_1_try_number, task_instance_1.task_display_name AS task_instance_1_task_display_name, dag_run_1.state AS dag_run_1_state, dag_run_1.id AS dag_run_1_id, dag_run_1.dag_id AS dag_run_1_dag_id, dag_run_1.queued_at AS dag_run_1_queued_at, dag_run_1.execution_date AS dag_run_1_execution_date, dag_run_1.start_date AS dag_run_1_start_date, dag_run_1.end_date AS dag_run_1_end_date, dag_run_1.run_id AS dag_run_1_run_id, dag_run_1.creating_job_id AS dag_run_1_creating_job_id, dag_run_1.external_trigger AS dag_run_1_external_trigger, dag_run_1.run_type AS dag_run_1_run_type, dag_run_1.conf AS dag_run_1_conf, dag_run_1.data_interval_start AS dag_run_1_data_interval_start, dag_run_1.data_interval_end AS dag_run_1_data_interval_end, dag_run_1.last_scheduling_decision AS dag_run_1_last_scheduling_decision, dag_run_1.dag_hash AS dag_run_1_dag_hash, dag_run_1.log_template_id AS dag_run_1_log_template_id, dag_run_1.updated_at AS dag_run_1_updated_at, dag_run_1.clear_number AS dag_run_1_clear_number, task_instance_1.task_id AS task_instance_1_task_id, task_instance_1.dag_id AS task_instance_1_dag_id, task_instance_1.run_id AS task_instance_1_run_id, task_instance_1.map_index AS task_instance_1_map_index, task_instance_1.start_date AS task_instance_1_start_date, task_instance_1.end_date AS task_instance_1_end_date, task_instance_1.duration AS task_instance_1_duration, task_instance_1.state AS task_instance_1_state, task_instance_1.max_tries AS task_instance_1_max_tries, task_instance_1.hostname AS task_instance_1_hostname, task_instance_1.unixname AS task_instance_1_unixname, task_instance_1.job_id AS task_instance_1_job_id, task_instance_1.pool AS task_instance_1_pool, task_instance_1.pool_slots AS task_instance_1_pool_slots, task_instance_1.queue AS task_instance_1_queue, task_instance_1.priority_weight AS task_instance_1_priority_weight, task_instance_1.operator AS task_instance_1_operator, task_instance_1.custom_operator_name AS task_instance_1_custom_operator_name, task_instance_1.queued_dttm AS task_instance_1_queued_dttm, task_instance_1.queued_by_job_id AS task_instance_1_queued_by_job_id, task_instance_1.pid AS task_instance_1_pid, task_instance_1.executor AS task_instance_1_executor, task_instance_1.executor_config AS task_instance_1_executor_config, task_instance_1.updated_at AS task_instance_1_updated_at, task_instance_1.rendered_map_index AS task_instance_1_rendered_map_index, task_instance_1.external_executor_id AS task_instance_1_external_executor_id, task_instance_1.trigger_id AS task_instance_1_trigger_id, task_instance_1.trigger_timeout AS task_instance_1_trigger_timeout, task_instance_1.next_method AS task_instance_1_next_method, task_instance_1.next_kwargs AS task_instance_1_next_kwargs 
FROM trigger LEFT OUTER JOIN (task_instance AS task_instance_1 JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance_1.dag_id AND dag_run_1.run_id = task_instance_1.run_id) ON trigger.id = task_instance_1.trigger_id]
(Background on this error at: https://sqlalche.me/e/14/f405)

This can only be reproduced when you downgrade half-way to 2.8.4, that's why we missed it. In the CI, we downgrade from latest to 2.0.0 then backup

@potiuk
Copy link
Member

potiuk commented Apr 4, 2024

Hmm. Still don't get it - why it does not fail when we downgrade to 2.0.0 ? I thought alembic was executing the downgrades step-by-step and it goes 2.9.0 -> 2.8.4 -> 2.8.3 and so on... So why it did not fail in the 2.9.0 -> 2.8.4 step?

@potiuk
Copy link
Member

potiuk commented Apr 4, 2024

🤔

@potiuk
Copy link
Member

potiuk commented Apr 4, 2024

aaaaa

    if _revision_greater(
        config,
        _REVISION_HEADS_MAP["2.9.0"],
        to_revision,
    ):
        decrypt_trigger_kwargs(session=session)

@potiuk
Copy link
Member

potiuk commented Apr 4, 2024

But this condition is likely broken ? (why would it run when migrating to 2.8.4 and not to 2.0.0 ?)

@ephraimbuddy
Copy link
Contributor Author

ephraimbuddy commented Apr 4, 2024

But this condition is likely broken ? (why would it run when migrating to 2.8.4 and not to 2.0.0 ?)

Sorry, I see now. It's not running it because the triggered table can't be found when 2.0.0 is specified (

if not inspect(session.bind).has_table(Trigger.__tablename__):
)

@potiuk
Copy link
Member

potiuk commented Apr 4, 2024

Sorry, I see now. It's not running it because the triggered table can't be found when 2.0.0 is specified (

Interesting, I did not realise we do that - I was under the impression that each step of downgrade (or upgrade) is done separately - and in isolation - no matter what the target version is. So I guess (cc: @hussein-awala ) this is a kind of optimisation (we do not neeed to decrypt if we know the table will be dropped)? Was that the purpose of it ?

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM for now. I do not thing it should block rc2, but we likely have to get some improvements as a follow up

@potiuk potiuk mentioned this pull request Apr 4, 2024
1 task
@potiuk
Copy link
Member

potiuk commented Apr 4, 2024

I updated the description of #38744 to reflect that "follow up" need.

@hussein-awala
Copy link
Member

hussein-awala commented Apr 4, 2024

Sorry, I see now. It's not running it because the triggered table can't be found when 2.0.0 is specified (

Interesting, I did not realise we do that - I was under the impression that each step of downgrade (or upgrade) is done separately - and in isolation - no matter what the target version is. So I guess (cc: @hussein-awala ) this is a kind of optimisation (we do not neeed to decrypt if we know the table will be dropped)? Was that the purpose of it ?

It's necessary because we execute the method each time we run the downgrade command, so later when we want to downgrade 2.9.1+ to 2.9.0+, we should skip the decryption

@potiuk
Copy link
Member

potiuk commented Apr 4, 2024

It's necessary because we execute the method each time we run the downgrade command, so later when we want to downgrade 2.9.1+ to 2.9.0, we should skip the decryption

Yeah. condition to run decrypt is is right - but as @ephraimbuddy it's not the condition to run the method that is wrong, it's the if not inspect(session.bind).has_table(Trigger.__tablename__): that caused the query to be skipped which is really the cause it has not been found before I think

@ephraimbuddy
Copy link
Contributor Author

It's necessary because we execute the method each time we run the downgrade command, so later when we want to downgrade 2.9.1+ to 2.9.0, we should skip the decryption

Yeah. condition to run decrypt is is right - but as @ephraimbuddy it's not the condition to run the method that is wrong, it's the if not inspect(session.bind).has_table(Trigger.__tablename__): that caused the query to be skipped which is really the cause it has not been found before I think

I think the code is right. If the table has been dropped, the decrypt is not run. Because in 2.8.4, we still have the trigger table, the decryption ran and failed. The code path is only evaluated at the end of migrations so if user is going from 2.9.0 to 2.0.0, it will only be run when the migration completes and DB in 2.0.0 state

@ephraimbuddy ephraimbuddy merged commit 567246f into apache:main Apr 4, 2024
41 checks passed
@ephraimbuddy ephraimbuddy deleted the fix-migration-issue-decryption branch April 4, 2024 17:24
@hussein-awala
Copy link
Member

It's necessary because we execute the method each time we run the downgrade command, so later when we want to downgrade 2.9.1+ to 2.9.0, we should skip the decryption

Yeah. condition to run decrypt is is right - but as @ephraimbuddy it's not the condition to run the method that is wrong, it's the if not inspect(session.bind).has_table(Trigger.__tablename__): that caused the query to be skipped which is really the cause it has not been found before I think

I think the code is right. If the table has been dropped, the decrypt is not run. Because in 2.8.4, we still have the trigger table, the decryption ran and failed. The code path is only evaluated at the end of migrations so if user is going from 2.9.0 to 2.0.0, it will only be run when the migration completes and DB in 2.0.0 state

Exactly, I added the condition to fix a failing test.

@ephraimbuddy ephraimbuddy added the type:bug-fix Changelog: Bug Fixes label Apr 4, 2024
ephraimbuddy added a commit that referenced this pull request Apr 4, 2024
* Fix decryption of trigger kwargs when downgrading.

This failed because the query for Triggers after the downgrade lazy
loads the taskinstance table(ORM) which doesn't have the task_display_name
column at that downloaded point.

The fix was to query specifically on the encrypted_kwargs column.

* Properly positon the decryption after downgrade and not in offline migration

(cherry picked from commit 567246f)
@potiuk
Copy link
Member

potiuk commented Apr 4, 2024

I think the code is right. If the table has been dropped, the decrypt is not run. Because in 2.8.4, we still have the trigger table, the decryption ran and failed. The code path is only evaluated at the end of migrations so if user is going from 2.9.0 to 2.0.0, it will only be run when the migration completes and DB in 2.0.0 state

Ah. OK then our migration to 2.0.0 and back is not really testing all the code paths. Maybe we should consider doing migration tests also to the previous latest MINOR version. I think we rarely change past migration scripts - so if we add just migration down one MINOR version, we should catch all such cases (eventually over time we will know 2.9 -> 2.8 works - because we tested it now, 2.10 -> 2.9 works because we will test it in the future and so on).

utkarsharma2 pushed a commit to astronomer/airflow that referenced this pull request Apr 22, 2024
* Fix decryption of trigger kwargs when downgrading.

This failed because the query for Triggers after the downgrade lazy
loads the taskinstance table(ORM) which doesn't have the task_display_name
column at that downloaded point.

The fix was to query specifically on the encrypted_kwargs column.

* Properly positon the decryption after downgrade and not in offline migration
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type:bug-fix Changelog: Bug Fixes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants