Skip to content

Commit

Permalink
Add missing AUTOINC/SERIAL for FAB tables (#26885)
Browse files Browse the repository at this point in the history
* Add missing AUTOINC/SERIAL for FAB tables

In 1.10.13 we introduced a migration that creates the tables with the
server_default but that migration only did anything if the tables didn't
already exist. But the tables created by the FAB model have a default
(but not a server_default).

Oh, and the final bit of the puzzle, in 2.4 we finally "took control" of
the FAB security models in to airflow and those do not have the default
set.

* Update airflow/migrations/versions/0118_2_4_2_add_missing_autoinc_fab.py

* Fix static checks

* Run migrations with with a pool of a connection.

Without this `create_session()` will open a new connection, and that causes mysql
to hang waiting to get a "metadata lock on table".

Using the "stock" pool with size=1 and max_overflow=0 doesn't work, that
instead times out if you try to get a new connection from the pool.
SingletonThreadPool instead returns the existing active connection which
is what we want.

Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
(cherry picked from commit 7efdeed)
  • Loading branch information
ashb authored and ephraimbuddy committed Oct 18, 2022
1 parent 79047c5 commit 7f48924
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 11 deletions.
8 changes: 6 additions & 2 deletions airflow/migrations/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations

import contextlib
from logging.config import fileConfig

from alembic import context
Expand Down Expand Up @@ -89,9 +90,12 @@ def run_migrations_online():
and associate a connection with the context.
"""
connectable = settings.engine
with contextlib.ExitStack() as stack:
connection = config.attributes.get('connection', None)

if not connection:
connection = stack.push(settings.engine.connect())

with connectable.connect() as connection:
context.configure(
connection=connection,
transaction_per_migration=True,
Expand Down
78 changes: 78 additions & 0 deletions airflow/migrations/versions/0118_2_4_2_add_missing_autoinc_fab.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Add missing auto-increment to columns on FAB tables
Revision ID: b0d31815b5a6
Revises: ecb43d2a1842
Create Date: 2022-10-05 13:16:45.638490
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = 'b0d31815b5a6'
down_revision = 'ecb43d2a1842'
branch_labels = None
depends_on = None
airflow_version = '2.4.2'


def upgrade():
"""Apply migration.
If these columns are already of the right type (i.e. created by our
migration in 1.10.13 rather than FAB itself in an earlier version), this
migration will issue an alter statement to change them to what they already
are -- i.e. its a no-op.
These tables are small (100 to low 1k rows at most), so it's not too costly
to change them.
"""
conn = op.get_bind()
if conn.dialect.name in ['mssql', 'sqlite']:
# 1.10.12 didn't support SQL Server, so it couldn't have gotten this wrong --> nothing to correct
# SQLite autoinc was "implicit" for an INTEGER NOT NULL PRIMARY KEY
return

for table in (
'ab_permission',
'ab_view_menu',
'ab_role',
'ab_permission_view',
'ab_permission_view_role',
'ab_user',
'ab_user_role',
'ab_register_user',
):
with op.batch_alter_table(table) as batch:
kwargs = {}
if conn.dialect.name == 'postgresql':
kwargs['type_'] = sa.Sequence(f'{table}_id_seq').next_value()
else:
kwargs['autoincrement'] = True
batch.alter_column("id", existing_type=sa.Integer(), existing_nullable=False, **kwargs)


def downgrade():
"""Unapply add_missing_autoinc_fab"""
# No downgrade needed, these _should_ have applied from 1.10.13 but didn't due to a previous bug!
15 changes: 9 additions & 6 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,14 @@ def configure_vars():
DONOT_MODIFY_HANDLERS = conf.getboolean('logging', 'donot_modify_handlers', fallback=False)


def configure_orm(disable_connection_pool=False):
def configure_orm(disable_connection_pool=False, pool_class=None):
"""Configure ORM using SQLAlchemy"""
from airflow.utils.log.secrets_masker import mask_secret

log.debug("Setting up DB connection pool (PID %s)", os.getpid())
global engine
global Session
engine_args = prepare_engine_args(disable_connection_pool)
engine_args = prepare_engine_args(disable_connection_pool, pool_class)

if conf.has_option('database', 'sql_alchemy_connect_args'):
connect_args = conf.getimport('database', 'sql_alchemy_connect_args')
Expand Down Expand Up @@ -319,7 +319,7 @@ def configure_orm(disable_connection_pool=False):
}


def prepare_engine_args(disable_connection_pool=False):
def prepare_engine_args(disable_connection_pool=False, pool_class=None):
"""Prepare SQLAlchemy engine args"""
default_args = {}
for dialect, default in DEFAULT_ENGINE_ARGS.items():
Expand All @@ -331,7 +331,10 @@ def prepare_engine_args(disable_connection_pool=False):
'database', 'sql_alchemy_engine_args', fallback=default_args
) # type: ignore

if disable_connection_pool or not conf.getboolean('database', 'SQL_ALCHEMY_POOL_ENABLED'):
if pool_class:
# Don't use separate settings for size etc, only those from sql_alchemy_engine_args
engine_args['poolclass'] = pool_class
elif disable_connection_pool or not conf.getboolean('database', 'SQL_ALCHEMY_POOL_ENABLED'):
engine_args['poolclass'] = NullPool
log.debug("settings.prepare_engine_args(): Using NullPool")
elif not SQL_ALCHEMY_CONN.startswith('sqlite'):
Expand Down Expand Up @@ -413,10 +416,10 @@ def dispose_orm():
engine = None


def reconfigure_orm(disable_connection_pool=False):
def reconfigure_orm(disable_connection_pool=False, pool_class=None):
"""Properly close database connections and re-configure ORM"""
dispose_orm()
configure_orm(disable_connection_pool=disable_connection_pool)
configure_orm(disable_connection_pool=disable_connection_pool, pool_class=pool_class)


def configure_adapters():
Expand Down
17 changes: 16 additions & 1 deletion airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -1527,8 +1527,23 @@ def upgradedb(
initdb(session=session, load_connections=False)
return
with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
import sqlalchemy.pool

log.info("Creating tables")
command.upgrade(config, revision=to_revision or 'heads')
val = os.environ.get('AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE')
try:
# Reconfigure the ORM ot use _EXACTLY_ one connection, otherwise some db engines hang forever
# trying to ALTER TABLEs
os.environ['AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE'] = '1'
settings.reconfigure_orm(pool_class=sqlalchemy.pool.SingletonThreadPool)
command.upgrade(config, revision=to_revision or 'heads')
finally:
if val is None:
os.environ.pop('AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE')
else:
os.environ['AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE'] = val
settings.reconfigure_orm()

reserialize_dags(session=session)
add_default_pool_if_not_exists(session=session)
synchronize_log_template(session=session)
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
543fe0db520047b59d9d036b17d54b07f1031a2b6ef1f46dcd2ae970f14ab0e6
030fee7fdf6b154b107a12331400a26843f4ebb4abaab84566aeca7c26d79450
4 changes: 3 additions & 1 deletion docs/apache-airflow/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=================================+===================+===================+==============================================================+
| ``ecb43d2a1842`` (head) | ``1486deb605b4`` | ``2.4.0`` | Add processor_subdir column to DagModel, SerializedDagModel |
| ``b0d31815b5a6`` (head) | ``ecb43d2a1842`` | ``2.4.2`` | Add missing auto-increment to columns on FAB tables |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``ecb43d2a1842`` | ``1486deb605b4`` | ``2.4.0`` | Add processor_subdir column to DagModel, SerializedDagModel |
| | | | and CallbackRequest tables. |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``1486deb605b4`` | ``f4ff391becb5`` | ``2.4.0`` | add dag_owner_attributes table |
Expand Down

0 comments on commit 7f48924

Please sign in to comment.