Skip to content

Commit

Permalink
feat(migration): drop fkeys without DELETE CASCADE and add new fkey
Browse files Browse the repository at this point in the history
  • Loading branch information
dlbrittain committed Apr 4, 2023
1 parent d70ad15 commit f97d1f9
Showing 1 changed file with 80 additions and 10 deletions.
90 changes: 80 additions & 10 deletions dynamicannotationdb/migration/migrate.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
import logging

from dynamicannotationdb.database import DynamicAnnotationDB
from dynamicannotationdb.models import AnnoMetadata
from dynamicannotationdb.schema import DynamicSchemaClient
from emannotationschemas.errors import UnknownAnnotationTypeException
from emannotationschemas.migrations.run import run_migration
from geoalchemy2.types import Geometry
from psycopg2.errors import DuplicateSchema
from sqlalchemy import MetaData, create_engine
from sqlalchemy import MetaData, create_engine, ForeignKeyConstraint
from sqlalchemy.engine.url import make_url
from sqlalchemy.pool import NullPool
from sqlalchemy import MetaData, Table
import logging
from sqlalchemy.sql.ddl import AddConstraint
from sqlalchemy.schema import DropConstraint
from sqlalchemy.exc import ProgrammingError

from dynamicannotationdb.database import DynamicAnnotationDB
from dynamicannotationdb.models import AnnoMetadata
from dynamicannotationdb.schema import DynamicSchemaClient
from emannotationschemas.errors import UnknownAnnotationTypeException
from emannotationschemas.migrations.run import run_migration

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)


# SQL commands
def alter_column_name(table_name: str, current_col_name: str, new_col_name: str) -> str:
return f"ALTER TABLE {table_name} RENAME {current_col_name} TO {new_col_name}"
Expand Down Expand Up @@ -77,7 +81,6 @@ def __init__(
)

with temp_engine.connect() as connection:

connection.execute("commit")
database_exists = connection.execute(
f"SELECT 1 FROM pg_catalog.pg_database WHERE datname = '{schema_db}'"
Expand Down Expand Up @@ -194,12 +197,81 @@ def upgrade_table_from_schema(self, table_name: str, dry_run: bool = True):
for index_name, sql_command in index_sql_commands.items():
logging.info(f"Creating index: {index_name}")
conn.execute(sql_command)

self.target_database.base.metadata.reflect()
return migration_map
except Exception as e:
self.target_database.cached_session.rollback()
raise e

def apply_cascade_option_to_tables(self, dry_run: bool = True):
metadata = MetaData(bind=self.target_database.engine)
metadata.reflect(bind=self.target_database.engine)
for table in metadata.tables:
table_metadata = self.target_database.get_table_metadata(table)
if table_metadata:
table = metadata.tables[table]
try:
fkey_mappings = self.add_cascade_delete_to_fkey(table, dry_run)
except Exception as error:
raise error
return fkey_mappings

def add_cascade_delete_to_fkey(self, table: Table, dry_run: bool = True):
table_name = table.name
fkeys_to_drop = {}
fkey_to_add = {}
for fk in self.target_inspector.get_foreign_keys(table_name):
# check if the foreign key has no 'ondelete' option
if not fk["options"].get("ondelete"):
# drop the foreign key constraint
fkey = ForeignKeyConstraint(
[table.c[c] for c in fk["constrained_columns"]],
[fk["referred_table"] + "." + c for c in fk["referred_columns"]],
name=fk["name"],
)
drop_constraint = DropConstraint(fkey)
fkeys_to_drop[fkey.name] = str(drop_constraint)

if not dry_run:
with self.target_database.engine.connect() as conn:
try:
conn.execute(drop_constraint)
except ProgrammingError as error:
logging.error(f"{fkey.name} not present in the database: {error}")

# create a new foreign key constraint with the specified 'ondelete' option
new_fkey = ForeignKeyConstraint(
[table.c[c] for c in fk["constrained_columns"]],
[fk["referred_table"] + "." + c for c in fk["referred_columns"]],
name=fk["name"],
ondelete="CASCADE",
)
add_constraint = AddConstraint(new_fkey)

fkey_to_add[new_fkey.name] = str(add_constraint)

if not dry_run:
with self.target_database.engine.connect() as conn:
# start a transaction
trans = conn.begin()

try:
conn.execute(add_constraint)
trans.commit()
logging.info(
f"Table {table_name} altered with CASCADE DELETE"
)
except Exception as error:
trans.rollback()
raise error
return {
f"Table Name: {table_name}": {
"Fkeys to drop": fkeys_to_drop,
"Fkeys to add": fkey_to_add,
}
}

def upgrade_annotation_models(self, dry_run: bool = True):
"""Upgrades annotation models present in the database
if underlying schemas have changed.
Expand All @@ -218,7 +290,6 @@ def upgrade_annotation_models(self, dry_run: bool = True):
return migrations

def get_table_diff(self, table_name):

target_model_schema = (
self.target_database.cached_session.query(AnnoMetadata.schema_type)
.filter(AnnoMetadata.table_name == table_name)
Expand Down Expand Up @@ -363,7 +434,6 @@ def get_index_from_model(self, table_name: str, model):
}
index_map[sptial_index_name] = spatial_index_map
if column.foreign_keys:

metadata_obj = MetaData()
metadata_obj.reflect(bind=self.target_database.engine)
target_table = metadata_obj.tables.get(table_name)
Expand Down

0 comments on commit f97d1f9

Please sign in to comment.