In [14]:
from omop_constructs.alchemy.adapters import map_lookup_view, make_and_map_lookup_view, make_source_lookup
from omop_alchemy.cdm.model.clinical import Condition_Occurrence, Measurement
from omop_alchemy.cdm.model.vocabulary import Concept
import sqlalchemy as sa
from orm_loader.helpers import Base

In [2]:
# this factory form can be used when you want to create 
# a lookup to your source data to match directly - 
# in this example, we want to match the condition_status_source_value 
# to the med_id in our source data, so we can use that to link back to source
# while accommodating type directly for convenience

# it also tracks where you've stored the source ID in your schema...which exposes some non-standard usage...

condition_source_lookup = make_source_lookup(
    model=Condition_Occurrence,
    source_column=Condition_Occurrence.condition_status_source_value,
    source_label="med_id",
    cast_to=sa.Integer(),
    name="condition_source_lookup",  # optional but nice
)

Condition_Source = map_lookup_view(
    base=Base,
    subquery=condition_source_lookup,
    name="Condition_Source",
    pk_cols=["condition_occurrence_id"],
)

In [3]:
list(Condition_Source.__table__.columns)

[Column('person_id', Integer(), ForeignKey('person.person_id'), table=<condition_source_lookup>, nullable=False),
 Column('condition_occurrence_id', Integer(), table=<condition_source_lookup>, primary_key=True, nullable=False),
 Column('condition_concept_id', Integer(), ForeignKey('concept.concept_id'), table=<condition_source_lookup>, nullable=False),
 Column('condition_start_date', Date(), table=<condition_source_lookup>, nullable=False),
 Column('condition_end_date', Date(), table=<condition_source_lookup>),
 <sqlalchemy.sql.elements.ColumnClause at 0x132d282f0; med_id>]

In [4]:
# or in a single step...

Measurement_Source = make_and_map_lookup_view(
    base=Base,
    name="Measurement_Source",
    model=Measurement,
    source_column=Measurement.measurement_source_value,
    source_label="obd_id",
    cast_to=sa.String(),
    pk_cols=["measurement_id"],
)

In [5]:
from omop_alchemy.cdm.handlers import make_concept_resolver, ConceptResolverRegistry
from omop_alchemy.cdm.handlers.vocabs_and_mappers import ConceptResolver, strip_uicc, make_stage
from omop_semantics.runtime.default_valuesets import runtime
from omop_semantics.runtime.unknown_handlers import UNKNOWN
import sqlalchemy.orm as so

# for specific target constructs may parent concepts 
# and/or exhaustive lists of concepts to resolve against
# it will depend on how they are used downstream as 
# to whether it is better to resolve at runtime or 
# as a pre-processing step

runtime.staging.t_stage_concepts.ids

{1634213, 1634376, 1634530, 1634654, 1635114, 1635562, 1635564, 1635682}

In [6]:
def build_stage_resolver(session: so.Session, parent_list: list[int], stage_name: str) -> ConceptResolver:
    return make_concept_resolver(
        session,
        name=f"tnm_{stage_name}_stage",
        unknown=UNKNOWN["generic"].concept_id,
        parents=parent_list,
        include_synonyms=True,
        corrections=[strip_uicc, make_stage],
    )

In [None]:
from orm_loader.helpers import configure_logging, bootstrap, explain_sqlite_fk_error, bulk_load_context, configure_logging
from omop_alchemy import get_engine_name, load_environment, TEST_PATH, ROOT_PATH

configure_logging()
load_environment()
engine_string = get_engine_name()
engine = sa.create_engine(engine_string, future=True, echo=False)

2026-02-16 12:22:33,612 | INFO     | sql_loader.omop_alchemy.config | Environment variables loaded from .env file
2026-02-16 12:22:33,613 | INFO     | sql_loader.omop_alchemy.config | Default database engine configured


In [9]:
resolver_registry = ConceptResolverRegistry(engine)

In [11]:
t_stage_resolver = resolver_registry.get("tnm_t_stage", lambda session: build_stage_resolver(session, parent_list=list(runtime.staging.t_stage_concepts.ids), stage_name="t"))
n_stage_resolver = resolver_registry.get("tnm_n_stage", lambda session: build_stage_resolver(session, parent_list=list(runtime.staging.n_stage_concepts.ids), stage_name="n"))
m_stage_resolver = resolver_registry.get("tnm_m_stage", lambda session: build_stage_resolver(session, parent_list=list(runtime.staging.m_stage_concepts.ids), stage_name="m"))
group_stage_resolver = resolver_registry.get("tnm_group_stage", lambda session: build_stage_resolver(session, parent_list=list(runtime.staging.group_stage_concepts.ids), stage_name="group"))

In [18]:
import pandas as pd
with so.Session(engine) as session:
    all_stages = pd.DataFrame(
        session.query(
            *Concept.__table__.columns
        )
        .filter(Concept.concept_id.in_(t_stage_resolver.all_concepts))
    )

In [None]:
# this structure is mostly a convenience function for downstream operations...

# t_stage_resolver.all_concepts gives us a set of all valid tstage concepts at runtime in a cached object so it only hits the 
# database one time, and using the resolver means that it is lazily constructed and cached, so if we never use it, we never 
# pay the cost of querying.

# we could definitely create a construct that itself joins to concept ancestor and resolve on the fly, but that is much more
# expensive to do repeatedly, and it is likely better to resolve once and cache the results in memory

# if delivering as materialised views, it could be done once at build time without this layer of abstraction, but this way we 
# can support both use cases without changing downstream code

all_stages

Unnamed: 0,concept_id,concept_name,domain_id,vocabulary_id,concept_class_id,standard_concept,concept_code,valid_start_date,valid_end_date,invalid_reason
0,1633268,AJCC/UICC 7th pathological T1a Category,Measurement,Cancer Modifier,Staging/Grading,S,p-7th_AJCC/UICC-T1a,2022-05-09,2099-12-31,
1,1633269,AJCC/UICC 8th post therapy pathological T1mi C...,Measurement,Cancer Modifier,Staging/Grading,S,yp-8th_AJCC/UICC-T1mi,2022-05-09,2099-12-31,
2,1633278,AJCC/UICC pathological T2d Category,Measurement,Cancer Modifier,Staging/Grading,S,p-AJCC/UICC-T2d,2022-05-09,2099-12-31,
3,1633279,AJCC/UICC 6th pathological T0 Category,Measurement,Cancer Modifier,Staging/Grading,S,p-6th_AJCC/UICC-T0,2022-05-09,2099-12-31,
4,1633280,AJCC/UICC 6th post therapy clinical Tis(Paget)...,Measurement,Cancer Modifier,Staging/Grading,S,yc-6th_AJCC/UICC-Tis(Paget),2022-05-09,2099-12-31,
...,...,...,...,...,...,...,...,...,...,...
795,1635888,AJCC/UICC pathological Ta Category,Measurement,Cancer Modifier,Staging/Grading,S,p-AJCC/UICC-Ta,2022-05-09,2099-12-31,
796,1635889,AJCC/UICC 7th pathological Tis(DCIS) Category,Measurement,Cancer Modifier,Staging/Grading,S,p-7th_AJCC/UICC-Tis(DCIS),2022-05-09,2099-12-31,
797,1635891,AJCC/UICC post therapy pathological T4c Category,Measurement,Cancer Modifier,Staging/Grading,S,yp-AJCC/UICC-T4c,2022-05-09,2099-12-31,
798,1635892,AJCC/UICC 7th post therapy clinical T3e Category,Measurement,Cancer Modifier,Staging/Grading,S,yc-7th_AJCC/UICC-T3e,2022-05-09,2099-12-31,


In [None]:
from omop_constructs.alchemy.joins.condition_modifier_joins import staging_subqueries, make_condition_modifier_fanout

staging_mapped_views = {
    name: map_lookup_view(
        base=Base,
        subquery=subq,
        name=f"{name.title().replace('_', '')}Modifier",
        pk_cols=["modifier_of_event_id", f"{name}_concept_id"],
    )
    for name, subq in staging_subqueries.items()
}

In [None]:
list(Measurement_Source.__table__.columns)

In [None]:
runtime.condition_modifiers.condition_modifier_values.laterality

In [None]:
list(staging_mapped_views['t_stage'].__table__.columns)

In [None]:
staging_mapped_views

In [None]:
condition_modifier_query = make_condition_modifier_fanout(
    base_cls=Condition_Occurrence,
    modifiers=staging_mapped_views,
    name="Condition_Modifier_Fanout",
)

In [None]:
class Condition_With_Modifiers(Base):
    __table__ = condition_modifier_query
    __mapper_args__ = {
        "primary_key": [
            condition_modifier_query.c.cancer_diagnosis_id
        ]
    }

In [None]:
list(Condition_With_Modifiers.__table__.columns)

In [None]:
runtime.staging