Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a migration script for encrypted trigger kwargs #38358

Merged
merged 9 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.

"""encrypt trigger kwargs
"""update trigger kwargs type

Revision ID: 1949afb29106
Revises: ee1467d4aa35
Expand Down Expand Up @@ -45,26 +45,10 @@ def get_session() -> sa.orm.Session:


def upgrade():
"""Apply encrypt trigger kwargs"""
session = get_session()
try:
op.alter_column(table_name="trigger", column_name="kwargs", type_=sa.String())
for trigger in session.query(Trigger).all():
# convert dict to string and encrypt it
trigger.encrypted_kwargs = trigger._encrypt_kwargs(trigger.encrypted_kwargs)
session.commit()
finally:
session.close()
"""Update trigger kwargs type to string"""
op.alter_column(table_name="trigger", column_name="kwargs", type_=sa.String())
hussein-awala marked this conversation as resolved.
Show resolved Hide resolved


def downgrade():
"""Unapply encrypt trigger kwargs"""
session = get_session()
try:
op.alter_column(table_name="trigger", column_name="kwargs", type_=ExtendedJSON())
for trigger in session.query(Trigger).all():
# decrypt string and convert it to dict
trigger.encrypted_kwargs = trigger._decrypt_kwargs(trigger.encrypted_kwargs)
session.commit()
finally:
session.close()
"""Unapply update trigger kwargs type to string"""
op.alter_column(table_name="trigger", column_name="kwargs", type_=ExtendedJSON())
32 changes: 32 additions & 0 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,26 @@ def synchronize_log_template(*, session: Session = NEW_SESSION) -> None:
session.add(LogTemplate(filename=filename, elasticsearch_id=elasticsearch_id))


def encrypt_trigger_kwargs(*, session: Session) -> None:
"""Encrypt trigger kwargs."""
from airflow.models.trigger import Trigger

for trigger in session.query(Trigger):
# convert dict to string and encrypt it
trigger.encrypted_kwargs = trigger._encrypt_kwargs(trigger.encrypted_kwargs)
session.commit()


def decrypt_trigger_kwargs(*, session: Session) -> None:
"""Decrypt trigger kwargs."""
from airflow.models.trigger import Trigger

for trigger in session.query(Trigger):
# decrypt the string and convert it to dict
trigger.encrypted_kwargs = trigger._decrypt_kwargs(trigger.encrypted_kwargs)
session.commit()


def check_conn_id_duplicates(session: Session) -> Iterable[str]:
"""
Check unique conn_id in connection table.
Expand Down Expand Up @@ -1639,6 +1659,12 @@ def upgradedb(
_reserialize_dags(session=session)
add_default_pool_if_not_exists(session=session)
synchronize_log_template(session=session)
if _revision_greater(
config,
_REVISION_HEADS_MAP["2.9.0"],
_get_current_revision(session=session),
):
encrypt_trigger_kwargs(session=session)


@provide_session
Expand Down Expand Up @@ -1711,6 +1737,12 @@ def downgrade(*, to_revision, from_revision=None, show_sql_only=False, session:
else:
log.info("Applying downgrade migrations.")
command.downgrade(config, revision=to_revision, sql=show_sql_only)
if _revision_greater(
config,
_REVISION_HEADS_MAP["2.9.0"],
to_revision,
):
decrypt_trigger_kwargs(session=session)


def drop_airflow_models(connection):
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
be94cc607bc5f9d98dffe83b79b26dacebe351c1b2686cf68266032bad5a04ae
038a5b1e0dc0555e22139727d954d36c7c7749ffc6297054df53b42cb5e48587
4 changes: 2 additions & 2 deletions docs/apache-airflow/img/airflow_erd.svg
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion docs/apache-airflow/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Here's the list of all the Database Migrations that are executed via when you ru
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=================================+===================+===================+==============================================================+
| ``1949afb29106`` (head) | ``ee1467d4aa35`` | ``2.9.0`` | encrypt trigger kwargs |
| ``1949afb29106`` (head) | ``ee1467d4aa35`` | ``2.9.0`` | update trigger kwargs type |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``ee1467d4aa35`` | ``b4078ac230a1`` | ``2.9.0`` | add display name for dag and task instance |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
Expand Down