In [None]:
import sqlalchemy as db

In [None]:
engine = db.create_engine("sqlite:///resources/meta-db/sdp-metadata.db")
connection = engine.connect()
metadata = db.MetaData()
databases = db.Table("databases", metadata, autoload_with=engine)
print("✓ databases table loaded successfully")

In [None]:
# Create all tables from the schema_sentinel models
from schema_sentinel.metadata_manager.model import Base

Base.metadata.create_all(engine)
print("✓ All tables created successfully")

# Now check what tables exist
from sqlalchemy import inspect
inspector = inspect(engine)
table_names = inspector.get_table_names()
print(f"Available tables: {table_names}")

In [None]:
print(databases.columns.keys())

In [None]:
from sqlalchemy import text

with engine.connect() as conn:
    conn.execute(text("SELECT * FROM databases")).fetchall()

In [None]:
from datetime import datetime

import pandas as pd

now = datetime.now()

df = pd.DataFrame(
    {
        "version": ["0.1.0"],
        "environment": ["cert"],
        "database_name": ["SDP"],
        "database_owner": ["US_CERT_DEV_USER"],
        "is_transient": ["FALSE"],
        "comment": ["SDP Database"],
        "created": [now],
        "last_altered": [now],
        "retention_time": [30],
    }
)

In [None]:
# Use SQLAlchemy ORM for better compatibility
from sqlalchemy.orm import Session
from schema_sentinel.metadata_manager.model.database import Database as DatabaseModel

with Session(engine) as session:
    # Check if the record already exists
    existing = session.query(DatabaseModel).filter_by(
        version=df.iloc[0]['version'],
        environment=df.iloc[0]['environment'],
        database_name=df.iloc[0]['database_name']
    ).first()

    if not existing:
        # Create a new database record from the dataframe
        db_record = DatabaseModel(
            database_id=None,  # Will be set by __get_id__()
            version=df.iloc[0]['version'],
            environment=df.iloc[0]['environment'],
            database_name=df.iloc[0]['database_name'],
            database_owner=df.iloc[0]['database_owner'],
            is_transient=df.iloc[0]['is_transient'],
            comment=df.iloc[0]['comment'],
            created=str(df.iloc[0]['created']),
            last_altered=str(df.iloc[0]['last_altered']),
            retention_time=str(df.iloc[0]['retention_time'])
        )
        db_record.database_id = db_record.__get_id__()
        session.add(db_record)
        session.commit()
        print(f"✓ Inserted record: {db_record.database_name} v{db_record.version} ({db_record.environment})")
    else:
        print(f"⚠ Record already exists: {existing.database_name} v{existing.version} ({existing.environment})")

In [None]:
from datetime import datetime

import pandas as pd

now = datetime.now()

df = pd.DataFrame(
    [
        ("SDP", "US_CERT_DEV_USER", "FALSE", "SDP Database", now, now, 30),
        ("SDP", "US_PROD_SYSADMIN_USER", "FALSE", "SDP Database", now, now, 30),
    ],
    columns=("database_name", "database_owner", "is_transient", "comment", "created", "last_altered", "retention_time"),
)
df.reset_index(inplace=True)

In [None]:
df.insert(loc=1, column="version", value="0.1.0")

In [None]:
from datetime import datetime

import pandas as pd

now = datetime.now()

df = pd.DataFrame(
    [
        ("SDP", "US_CERT_DEV_USER", "FALSE", "SDP Database", now, now, 30),
        ("SDP", "US_PROD_SYSADMIN_USER", "FALSE", "SDP Database", now, now, 30),
    ],
    columns=("database_name", "database_owner", "is_transient", "comment", "created", "last_altered", "retention_time"),
)
df.reset_index(inplace=True)
df.insert(loc=1, column="version", value="0.1.0")
df.insert(loc=2, column="environment", value=["cert", "prod"])
df.set_index(["version", "environment", "database_name"], inplace=True)
print(df)

In [None]:
# Replace pandas to_sql with ORM Session approach for SQLAlchemy 1.4/Pandas 3.0 compatibility
with Session(engine) as session:
    # Delete all existing records (simulating if_exists="replace")
    session.query(DatabaseModel).delete()

    # Reset index to access version, environment, database_name as columns
    df_reset = df.reset_index()

    # Add new records
    for _, row in df_reset.iterrows():
        db = DatabaseModel(
            version=row['version'],
            environment=row['environment'],
            database_name=row['database_name'],
            database_owner=row['database_owner'],
            is_transient=row['is_transient'],
            comment=row['comment'],
            created=str(row['created']),
            last_altered=str(row['last_altered']),
            retention_time=str(row['retention_time'])
        )
        # Set the database_id using the model's __get_id__ method
        db.database_id = db.__get_id__()
        session.add(db)

    session.commit()

In [None]:
with Session(engine) as session:
    df = pd.DataFrame(
        [
            ("SDP", "US_DEV_DEV_USER", "FALSE", "SDP Database", now, now, 30),
            ("SDP", "US_NON_PROD_DEV_USER", "FALSE", "SDP Database", now, now, 30),
        ],
        columns=("database_name", "database_owner", "is_transient", "comment", "created", "last_altered", "retention_time"),
    )
    df.reset_index(inplace=True)
    df.insert(loc=1, column="version", value="0.1.0")
    df.insert(loc=2, column="environment", value=["dev", "non_prod"])
    df.set_index(["version", "environment", "database_name"], inplace=True)

    # Replace pandas to_sql with ORM Session approach for SQLAlchemy 1.4/Pandas 3.0 compatibility
    # Reset index to access version, environment, database_name as columns
    df_reset = df.reset_index()

    # Add new records (if_exists="append" means don't delete existing, but skip duplicates)
    for _, row in df_reset.iterrows():
        db = DatabaseModel(
            version=row['version'],
            environment=row['environment'],
            database_name=row['database_name'],
            database_owner=row['database_owner'],
            is_transient=row['is_transient'],
            comment=row['comment'],
            created=str(row['created']),
            last_altered=str(row['last_altered']),
            retention_time=str(row['retention_time'])
        )
        # Set the database_id using the model's __get_id__ method
        db.database_id = db.__get_id__()

        # Check if record already exists to avoid duplicate key error
        existing = session.query(DatabaseModel).filter_by(database_id=db.database_id).first()
        if not existing:
            session.add(db)

    session.commit()

In [None]:
import json

from schema_sentinel.metadata_manager.model.database import Database

db = Database(
    version="0.1.0",
    environment="dev",
    database_name="SDP",
    database_owner="US_DEV_DEV_USER",
    is_transient="NO",
    comment=None,
    created="2023-04-01",
    last_altered="2023-09-01",
    retention_time=30,
)
db.database_id = db.__get_id__()


def get_database_id(database: Database) -> str:
    return database.database_id


def get_schema_id(database: Database, schema_name: str) -> str:
    id = json.loads(database.database_id)
    id["schema_name"] = schema_name
    return json.dumps(id)


# Display database and get schema id
print(db)
schema_id = get_schema_id(db, "AUDIT")
print(schema_id)

In [None]:
from schema_sentinel.metadata_manager.model.database import Database

left = Database(
    version="0.1.0",
    environment="dev",
    database_name="SDP",
    database_owner="US_DEV_DEV_USER",
    is_transient="NO",
    comment=None,
    created="2023-04-01",
    last_altered="2023-09-01",
    retention_time=30,
)
left.database_id = left.__get_id__()

right = Database(
    version="0.1.3",
    environment="dev",
    database_name="SDP",
    database_owner="US_DEV_DEV_USER",
    is_transient="NO",
    comment=None,
    created="2023-04-01",
    last_altered="2023-09-20",
    retention_time=30,
)
right.database_id = right.__get_id__()

In [None]:
left.__get_df__()

In [None]:
right.__get_df__()

In [None]:
left.__side_by_side__(right)

In [None]:
Database.__to_df__(
    [left, right],
    columns=['database_id', 'version', 'environment', 'database_name', 'database_owner',
             'is_transient', 'comment', 'created', 'last_altered', 'retention_time']
)

In [None]:
from schema_sentinel.metadata_manager.model.comparison import Comparison

comparison = Comparison(
    object_type="column",
    comparison_key="MIGRATIONS.SCHEMA_DISCREPANCY.ENVIRONMENT [SDP:0.1.3->0.1.1]",
    source_database_id=json.dumps({"database_name": "SDP", "version": "0.1.3", "environment": "dev"}),
    target_database_id=json.dumps({"database_name": "SDP", "version": "0.1.1", "environment": "dev"}),
    comparison_value=json.dumps(
        {
            "key": "MIGRATIONS.SCHEMA_DISCREPANCY.ENVIRONMENT [SDP:0.1.3->0.1.1]",
            "comparison": {
                "left": "Column",
                "right": "Column",
                "differences": {
                    "ordinal_position": [18, 2],
                    "is_nullable": ["YES", "NO"],
                    "character_maximum_length": [10, 16777216],
                    "character_octet_length": [40, 16777216],
                    "column_default": ["NULL", "'DEV'"],
                    "comment": ["NULL", "One of DEV, NONPROD, CERT or PROD"],
                },
            },
        }
    ),
    comparison_performed_by="user@example.com",
    created="2023-09-20 13:17:59.920765",
)
comparisons = [comparison]
comparison = Comparison(
    object_type="column",
    comparison_key="MIGRATIONS.SCHEMA_DISCREPANCY.ENVIRONMENT [SDP:0.1.1->0.1.3]",
    target_database_id=json.dumps({"database_name": "SDP", "version": "0.1.3", "environment": "dev"}),
    source_database_id=json.dumps({"database_name": "SDP", "version": "0.1.1", "environment": "dev"}),
    comparison_value=json.dumps(
        {
            "key": "MIGRATIONS.SCHEMA_DISCREPANCY.ENVIRONMENT [SDP:0.1.1->0.1.3]",
            "comparison": {
                "left": "Column",
                "right": "Column",
                "differences": {
                    "ordinal_position": [2, 18],
                    "is_nullable": ["NO", "YES"],
                    "character_maximum_length": [16777216, 10],
                    "character_octet_length": [16777216, 40],
                    "column_default": ["'DEV'", "NULL"],
                    "comment": ["One of DEV, NONPROD, CERT or PROD", "NULL"],
                },
            },
        }
    ),
    comparison_performed_by="user@example.com",
    created="2023-09-20 13:17:59.920765",
)
comparison.one_diffs

In [None]:
import json

from schema_sentinel.metadata_manager.model.comparison import Comparison

comparison = Comparison(
    object_type="column_constraint",
    comparison_key="CORE.CUSTOMER_ACCOUNT.UNIVERSE.FK_DARE_DEPOSIT_SUCCESS_CUSTOMER_ACCOUNT_ID [SDP:0.1.3->0.1.1]",
    source_database_id=json.dumps({"database_name": "SDP", "version": "0.1.3", "environment": "dev"}),
    target_database_id=json.dumps({"database_name": "SDP", "version": "0.1.1", "environment": "dev"}),
    comparison_value=json.dumps(
        {
            "key": "CORE.CUSTOMER_ACCOUNT.UNIVERSE.FK_DARE_DEPOSIT_SUCCESS_CUSTOMER_ACCOUNT_ID [SDP:0.1.3->0.1.1]",
            "comparison": {"left": "ColumnConstraint"},
        }
    ),
    comparison_performed_by="user@example.com",
    created="2023-09-20 13:17:59.920765",
)
comparison.one_diffs