In [1]:
import sqlalchemy as sa
import sqlalchemy.orm as so
from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker
from sqlalchemy.exc import IntegrityError
from pathlib import Path
import pandas as pd
import numpy as np
import tempfile, logging, os
from orm_loader.tables.base import CSVLoadableTableInterface  
from orm_loader.loaders import LoaderContext
from orm_loader.loaders.loader_interface import ParquetLoader, LoaderInterface, PandasLoader

from orm_loader.helpers import configure_logging, bootstrap, explain_sqlite_fk_error, bulk_load_context, configure_logging

from omop_alchemy import get_engine_name, load_environment, TEST_PATH, ROOT_PATH
from omop_alchemy.cdm.model.vocabulary import (
    Domain,
    Vocabulary,
    Concept_Class,
    Relationship,
    Concept,
    Concept_Ancestor,
    Concept_Relationship,
    Concept_Synonym,
)


In [2]:
logging.basicConfig(level=logging.INFO)

class Base(DeclarativeBase):
    pass

engine_string = "postgresql+psycopg2://airflow:airflow@0.0.0.0:5433/mosaiq"
engine = sa.create_engine(engine_string, echo=False, future=True)
Base.metadata.bind = engine

class TestTable(Base, CSVLoadableTableInterface):
    __tablename__ = "test_table"

    id: so.Mapped[int] = so.mapped_column(primary_key=True)
    name: so.Mapped[str] = so.mapped_column(nullable=False)

Base.metadata.create_all(engine)

In [3]:
tmp = Path(tempfile.mkdtemp())

csv_initial = tmp / "test_table.csv"
csv_replace = tmp / "test_table_replace.csv"
csv_empty = tmp / "test_table_empty.csv"

pd.DataFrame(
    [
        {"id": 1, "name": "alpha"},
        {"id": 2, "name": "beta"},
        {"id": 3, "name": "gamma"},
    ]
).to_csv(csv_initial, index=False, sep="\t")

pd.DataFrame(
    [
        {"id": 2, "name": "beta_updated"},
        {"id": 3, "name": "gamma_updated"},
    ]
).to_csv(csv_replace, index=False, sep="\t")

csv_empty.touch()


In [4]:
session = Session(engine)

In [5]:
TestTable.load_csv(path=csv_initial, session=session)

INFO:orm_loader.loaders.loading_helpers:Bulk loading _staging_test_table via COPY (encoding=utf-8, delimiter=	)


3

In [6]:
session.commit()

In [7]:
ATHENA_INITIAL_LOAD = [
    Domain,
    Vocabulary,
    Concept_Class,
    Relationship,
    Concept
]

ATHENA_SUBSEQUENT_LOAD = [
    Concept_Ancestor,
    Concept_Relationship,
    Concept_Synonym,
]

configure_logging()
load_environment()

2026-01-22 17:10:53,957 | INFO     | sql_loader.omop_alchemy.config | Environment variables loaded from .env file
INFO:sql_loader.omop_alchemy.config:Environment variables loaded from .env file


In [8]:
engine_string = get_engine_name('cdm')

2026-01-22 17:10:54,687 | INFO     | sql_loader.omop_alchemy.config | Database engine configured for schema 'cdm'
INFO:sql_loader.omop_alchemy.config:Database engine configured for schema 'cdm'


In [9]:
engine = sa.create_engine(engine_string, future=True, echo=False)
bootstrap(engine, create=True)

Session = sessionmaker(bind=engine, future=True)
session = Session()

INFO:orm_loader.helpers.bootstrap:Bootstrapping schema (create=True)


In [None]:
source_path = Path(os.environ['SOURCE_PATH'])


p = ParquetLoader()

In [None]:
with bulk_load_context(session):
    for model in ATHENA_INITIAL_LOAD:
        _ = model.load_csv(
            session,
            source_path / f"{model.__tablename__.upper()}.csv",
            dedupe=False,
            merge_strategy="upsert",
            loader=p,
        )
    session.commit()

2026-01-22 17:12:05,251 | INFO     | sql_loader.orm_loader.helpers.bulk | Disabled foreign key checks for bulk load
INFO:sql_loader.orm_loader.helpers.bulk:Disabled foreign key checks for bulk load
INFO:orm_loader.loaders.loading_helpers:Bulk loading _staging_domain via COPY (encoding=utf-8, delimiter=	)
INFO:orm_loader.loaders.loading_helpers:Bulk loading _staging_vocabulary via COPY (encoding=utf-8, delimiter=	)
INFO:orm_loader.loaders.loading_helpers:Bulk loading _staging_concept_class via COPY (encoding=utf-8, delimiter=	)
INFO:orm_loader.loaders.loading_helpers:Bulk loading _staging_relationship via COPY (encoding=utf-8, delimiter=	)
INFO:orm_loader.loaders.loading_helpers:Bulk loading _staging_concept via COPY (encoding=utf-8, delimiter=	)


In [None]:
with bulk_load_context(session):
    for model in ATHENA_SUBSEQUENT_LOAD:
        _ = model.load_csv(
            session,
            source_path / f"{model.__tablename__.upper()}.csv",
            dedupe=False,
            chunksize=60_000_000, # parquet loader chunk is bytes not rows
            merge_strategy="replace",
            loader=p,
        )
    session.commit()