# OHDSI Mapping Reverse Engineering

### Setup
Import necessary files, set Python path

In [None]:
import sys
import os

current_dir = os.getcwd()
parent_dir = os.path.abspath(os.path.join(current_dir, os.pardir))
sys.path.append(os.path.join(parent_dir, "src"))

In [None]:
import polars as pl
from femr.ontology import Ontology
import pandas as pd
import os

from file_paths import MAPPING_DIR, ATHENA_PATH, MIMIC_DIR
from models.clmbr_t_base import get_tokenizer

In [None]:
tokenizer = get_tokenizer(None)
ontology = Ontology(ATHENA_PATH)

In [None]:
concept = pl.scan_csv(
    os.path.join(ATHENA_PATH, "CONCEPT.csv"), separator="\t", infer_schema_length=0
)
code_col = pl.col("vocabulary_id") + "/" + pl.col("concept_code")
description_col = pl.col("concept_name").alias("ontology_concept_name")
concept_id_col = pl.col("concept_id").cast(pl.Int64).alias("target_concept_id")
processed_omop_concepts = concept.select(
    code_col, concept_id_col, description_col
).collect()

Get all tokenizer codes

In [None]:
tokenizer_codes = set(tokenizer.code_lookup.keys())
tokenizer_codes |= set(tokenizer.numeric_lookup.keys())
tokenizer_codes |= {code for (code, _) in tokenizer.string_lookup.keys()}

In [None]:
def contains_relevant_mappings(mapping_df):
    relevant_mappings = []

    for code in mapping_df["vocabulary_id"].to_list():
        if code in tokenizer_codes:
            relevant_mappings.append(code)
        else:
            if any(
                child_code in tokenizer_codes
                for child_code in ontology.get_all_children(code)
            ):
                relevant_mappings.append(code)
    return relevant_mappings


def filter_dataframe_with_relevant_mappings(mapping_df):
    relevant_mappings = contains_relevant_mappings(mapping_df)
    return mapping_df.filter(pl.col("vocabulary_id").is_in(relevant_mappings))

In [None]:
schema = {
    "concept_name": pl.Utf8,
    "source_concept_id": pl.Int64,
    "target_concept_id": pl.Int64,
    "source_vocabulary_id": pl.Utf8,
    "source_domain_id": pl.Utf8,
    "source_concept_class_id": pl.Utf8,
    "concept_code": pl.Utf8,
}


def check_mapping_file(filename: str):
    name_without_ending = filename.split(".")[0]
    result_dir = os.path.join(MAPPING_DIR, "mapping_results")
    os.makedirs(result_dir, exist_ok=True)

    summary_logs = f"Mapping file: {filename}\n\n"
    detailed_logs = f""

    file_path = os.path.join(parent_dir, os.path.join(MAPPING_DIR, filename))

    mapping_df = pl.read_csv(file_path, schema_overrides=schema)

    processed_mappings = mapping_df.join(
        processed_omop_concepts, on="target_concept_id", how="left", coalesce=True
    )

    filtered_df = filter_dataframe_with_relevant_mappings(processed_mappings)

    relevant_mappings = filtered_df.shape[0]

    brute_force_mappings = []
    manual_mappings = []
    all_relevant_mappings = []

    if relevant_mappings < 10:
        summary_logs += f"Found {relevant_mappings} relevant mappings in tokenizer. Adding to brute force mapping list\n"
        for row in filtered_df.iter_rows(named=True):
            brute_force_mappings.append(
                [
                    name_without_ending,
                    row["concept_code"],
                    row["vocabulary_id"],
                    row["target_concept_id"],
                    row["ontology_concept_name"],
                ]
            )
    else:
        summary_logs += f"Found {relevant_mappings} relevant mappings in tokenizer. Please try to manually map this file!!!\n"

        for row in filtered_df.iter_rows(named=True):
            manual_mappings.append(
                [
                    name_without_ending,
                    row["concept_code"],
                    row["vocabulary_id"],
                    row["target_concept_id"],
                    row["ontology_concept_name"],
                ]
            )

    for row in filtered_df.iter_rows(named=True):
        all_relevant_mappings.append(
            [
                name_without_ending,
                row["concept_code"],
                row["vocabulary_id"],
                row["target_concept_id"],
                row["ontology_concept_name"],
            ]
        )

    # Define headers with specific column widths of 60 characters
    detailed_logs += "Found codes in tokenizer:\n"
    detailed_logs += "{:<60} {:<60} {:<60}\n".format("Code", "Domain", "OMOP Code")

    # Iterate over rows and format each row to align columns
    for concept_code, source_domain, omop_code in filtered_df[
        "concept_code", "source_domain_id", "target_concept_id"
    ].iter_rows():
        detailed_logs += "{:<60} {:<60} {:<60}\n".format(
            concept_code, source_domain, omop_code
        )

    with open(os.path.join(result_dir, name_without_ending + ".log"), "w") as log_file:
        log_file.write(summary_logs)
        log_file.write("\n\n\n")
        log_file.write(detailed_logs)

    return (all_relevant_mappings, brute_force_mappings, manual_mappings)


mapping_files = [f for f in os.listdir(MAPPING_DIR) if f.endswith(".csv")]


brute_force_mapping_data = []
manual_mapping_data = []
all_relevant_mappings_data = []

for filename in mapping_files:
    (added_relevant_mappings, added_brute_force_mappings, added_manual_mappings) = (
        check_mapping_file(filename)
    )
    brute_force_mapping_data.extend(added_brute_force_mappings)
    manual_mapping_data.extend(added_manual_mappings)
    all_relevant_mappings_data.extend(added_relevant_mappings)

df = pd.DataFrame(
    brute_force_mapping_data[1:], columns=brute_force_mapping_data[0]
)  # data[1:] is the data, data[0] is the header
df.to_csv(
    os.path.join(MAPPING_DIR, "mapping_results", "brute_force_mapping.csv"), index=False
)

header = [
    "mapping_source_table",
    "source_code",
    "vocabulary_id",
    "target_concept_id",
    "ontology_concept_name",
]

# Convert the final mappings list into a Polars DataFrame
pl.DataFrame(brute_force_mapping_data, schema=header).write_csv(
    os.path.join(MAPPING_DIR, "mapping_results", "brute_force_mapping.csv")
)

pl.DataFrame(manual_mapping_data, schema=header).write_csv(
    os.path.join(MAPPING_DIR, "mapping_results", "manual_mapping.csv")
)

pl.DataFrame(all_relevant_mappings_data, schema=header).write_csv(
    os.path.join(MAPPING_DIR, "mapping_results", "all_relevant_mappings.csv")
)

## Brute Force Mapping

In [None]:
# Step 1: Load the Mapping File
mapping_file = os.path.join(MAPPING_DIR, "mapping_results", "brute_force_mapping.csv")
mapping_df = pl.read_csv(mapping_file)
mapping_df = mapping_df.filter(
    ~pl.col("mapping_source_table").is_in(["gcpt_vis_admission", "gcpt_per_ethnicity"])
)

# Step 2: Build a Set of Source Codes
source_codes = set(mapping_df["source_code"].to_list())


# Step 3: Function to check for source codes in a file
def check_file_for_codes(filepath, source_codes):
    matched_codes = []
    print(filepath)
    reader = pl.read_csv_batched(filepath, batch_size=25_000, infer_schema_length=0)
    batches = reader.next_batches(20)
    while batches:
        df_current_batches = pl.concat(batches)
        for col in df_current_batches.columns:
            df_current_batches = df_current_batches.with_columns(
                pl.col(col).cast(pl.Utf8)
            )
            matches = df_current_batches[col].is_in(source_codes)
            if matches.any():
                matched_values = df_current_batches.filter(matches)[col].unique()
                for code in matched_values:
                    triple = (os.path.basename(filepath), col, code)
                    if not triple in matched_codes:
                        matched_codes.append(triple)
        batches = reader.next_batches(20)
    return matched_codes


# Step 4: Iterate Over Files in 'hosp' and 'icu' Directories
base_dir = MIMIC_DIR
matched_codes_summary = []

for subdir in ["hosp", "icu"]:
    dir_path = os.path.join(base_dir, subdir)
    for root, dirs, files in os.walk(dir_path):
        for file in files:
            if file.endswith(".csv"):
                filepath = os.path.join(root, file)
                matches = check_file_for_codes(filepath, source_codes)
                if matches:
                    matched_codes_summary.extend(matches)

# Step 5: Save or Display the Results
result_df = pl.DataFrame(
    matched_codes_summary, schema=["filename", "column", "matched_code"]
)
result_df.write_csv(
    os.path.join(MAPPING_DIR, "mapping_results", "matched_codes_summary.csv")
)
print(result_df)

No code exists that can be mapped exactly to one code. Therefore, we ignore these mappings.

In [None]:
result_df.group_by("matched_code").len()

## Manual Mapping
gcpt_drug_ndc.csv

In [None]:
# Define the data types for the necessary columns
prescriptions_dtypes = {"drug": pl.Utf8, "prod_strength": pl.Utf8, "ndc": pl.Utf8}

# Load only the relevant columns from the CSV file with the specified data types and row limit
df_prescriptions = pl.read_csv(
    os.path.join(MIMIC_DIR, "hosp", "prescriptions.csv"),
    dtypes=prescriptions_dtypes,
    columns=["drug", "prod_strength", "ndc"],
)

# List of specific drug types
specific_drug_types = [
    "Bag",
    "Vial",
    "Syringe",
    "Syringe.",
    "Syringe (Neonatal)",
    "Syringe (Chemo)",
    "Soln",
    "Soln.",
    "Sodium Chloride 0.9%  Flush",
]


# Function to create gcpt_source_code
def create_gcpt_source_code(drug, prod_strength):
    # Determine the base drug name
    if drug in specific_drug_types:
        drug_name = ""
    else:
        drug_name = drug if drug is not None else ""

    # Determine the product strength
    prod_strength = prod_strength if prod_strength is not None else ""

    # Concatenate and trim
    combined = f"{drug_name} {prod_strength}".strip()

    # If the combined string is empty, set it explicitly to an empty string
    return combined if combined else ""


# Apply the function to create the gcpt_source_code column
df_prescriptions = df_prescriptions.with_columns(
    [
        pl.struct(["drug", "prod_strength"])
        .apply(lambda row: create_gcpt_source_code(row["drug"], row["prod_strength"]))
        .alias("concept_code")
    ]
)

# Select the relevant fields
df_prescriptions_filtered = df_prescriptions.select(["drug", "ndc", "concept_code"])

# Display the result
df_prescriptions_filtered = df_prescriptions_filtered.unique()

relevant_mapping_cols = [
    "concept_name",
    "source_concept_id",
    "source_vocabulary_id",
    "source_domain_id",
    "source_concept_class_id",
    "concept_code",
    "target_concept_id",
    "vocabulary_id",
    "ontology_concept_name",
]
drug_mapping_df = pl.read_csv(os.path.join(MAPPING_DIR, "gcpt_drug_ndc.csv"))
drug_mapping_df = drug_mapping_df.join(
    processed_omop_concepts, on="target_concept_id", how="left", coalesce=True
).select(relevant_mapping_cols)

joined_drug_mapping = drug_mapping_df.join(
    df_prescriptions_filtered, on="concept_code", how="left"
)

# Filter out rows where both 'ndc' and 'drug' are null
joined_drug_mapping = joined_drug_mapping.filter(
    ~(pl.col("ndc").is_null() & pl.col("drug").is_null())
)

# Step 1: Apply the filter to joined_drug_mapping before counting
filtered_ndc_df = joined_drug_mapping.filter(
    (pl.col("ndc") != "0") & (pl.col("ndc").is_not_null())
)

# Step 2: Count the mappings for each NDC code, treating "0" as null
ndc_counts = filtered_ndc_df.groupby("ndc").count().rename({"count": "count"})

# Step 3: Filter for NDCs with exactly one mapping
ndc_filtered = filtered_ndc_df.join(
    ndc_counts.filter(pl.col("count") == 1), on="ndc", how="inner"
)

# Step 4: Apply the filter to joined_drug_mapping for drug counts
filtered_drug_df = joined_drug_mapping.filter(
    (pl.col("ndc") == "0") | (pl.col("ndc").is_null())
)

# Step 5: Count the mappings for each drug where NDC is "0" or null
drug_counts = filtered_drug_df.groupby("drug").count().rename({"count": "count"})

# Step 6: Filter for drugs with exactly one mapping where NDC is "0" or null
drug_filtered = filtered_drug_df.join(
    drug_counts.filter(pl.col("count") == 1), on="drug", how="inner"
)

# Step 7: Combine both filters (column names now match)
combined_filtered = pl.concat([ndc_filtered, drug_filtered])

# Define the table names
table_name = "prescriptions"
mapping_source_table = "gcpt_drug_ndc"

# Iterate over the DataFrame and construct the final list
final_mappings = []
for row in combined_filtered.iter_rows(named=True):
    if row["ndc"] != "0" and row["ndc"] is not None:
        etl_prefix = "NDC/"
        source_code = row["ndc"]
    else:
        etl_prefix = "MIMIC_IV_Drug/"
        source_code = row["drug"]

    final_mappings.append(
        [
            mapping_source_table,
            table_name,
            etl_prefix,
            source_code,
            row["vocabulary_id"],
            row["target_concept_id"],
            row["ontology_concept_name"],
        ]
    )

gcpt_meas_chartevents_main_mod

In [None]:
items_def_df = pl.read_csv(
    os.path.join(MIMIC_DIR, "icu", "d_items.csv"), columns=["itemid"]
).rename({"itemid": "concept_code"})
chartevents_mapping_df = pl.read_csv(
    os.path.join(MAPPING_DIR, "gcpt_meas_chartevents_main_mod.csv")
)
chartevents_mapping_df = chartevents_mapping_df.join(
    processed_omop_concepts, on="target_concept_id", how="left"
)
joined_chartevents_df = items_def_df.join(
    chartevents_mapping_df, on="concept_code", how="inner"
).select(relevant_mapping_cols)

for row in joined_chartevents_df.iter_rows(named=True):
    final_mappings.append(
        [
            "gcpt_meas_chartevents_main_mod",
            "d_items",
            "MIMIC_IV_ITEM/",
            row["concept_code"],
            row["vocabulary_id"],
            row["target_concept_id"],
            row["ontology_concept_name"],
        ]
    )

gcpt_meas_lab_loinc_mod

In [None]:
lab_def_df = (
    pl.scan_csv(os.path.join(MIMIC_DIR, "hosp", "d_labitems.csv"))
    .rename({"itemid": "concept_code"})
    .collect()
)
lab_loinc_mapping_df = pl.scan_csv(
    os.path.join(MAPPING_DIR, "gcpt_meas_lab_loinc_mod.csv")
).collect()
lab_loinc_mapping_df = lab_loinc_mapping_df.join(
    processed_omop_concepts, on="target_concept_id", how="left"
)
joined_lab_df = lab_def_df.join(
    lab_loinc_mapping_df, on="concept_code", how="inner"
).select(relevant_mapping_cols)

for row in joined_lab_df.iter_rows(named=True):
    final_mappings.append(
        [
            "gcpt_meas_lab_loinc_mod",
            "d_labitems",
            "MIMIC_IV_LABITEM/",
            row["concept_code"],
            row["vocabulary_id"],
            row["target_concept_id"],
            row["ontology_concept_name"],
        ]
    )

gcpt_micro_microtest

In [None]:
# Load the microbiology_events_df with specified columns
microbiology_events_df = pl.read_csv(
    os.path.join(MIMIC_DIR, "hosp", "microbiologyevents.csv"),
    columns=["spec_itemid", "test_itemid", "test_name"],
)

# Create a 'concept_code' column that contains both test_itemid and spec_itemid
microbiology_events_df = microbiology_events_df.with_columns(
    [pl.concat_list(["spec_itemid", "test_itemid"]).alias("concept_code_list")]
)

# Explode the concept_code_list to have one concept_code per row
microbiology_events_df = microbiology_events_df.explode("concept_code_list").rename(
    {"concept_code_list": "concept_code"}
)

# Drop duplicates
microbiology_events_df = microbiology_events_df.unique()

# Load the microtest_mapping_df
microtest_mapping_df = pl.read_csv(
    os.path.join(MAPPING_DIR, "gcpt_micro_microtest.csv")
).join(processed_omop_concepts, on="target_concept_id", how="left")

# Join the microbiology_events_df with microtest_mapping_df on the 'concept_code'
joined_microtest_df = microbiology_events_df.join(
    microtest_mapping_df, on="concept_code", how="inner"
)

# Group by 'test_name' and count the occurrences
test_name_counts = joined_microtest_df.groupby("test_name").agg(
    [pl.count().alias("count")]
)

# Filter to keep only those 'test_name' with exactly one match
test_name_filtered = test_name_counts.filter(pl.col("count") == 1).select("test_name")

# Join back to get the rows with exactly one match
result_df = joined_microtest_df.join(test_name_filtered, on="test_name", how="inner")

for row in result_df.iter_rows(named=True):
    final_mappings.append(
        [
            "gcpt_micro_microtest",
            "microbiologyevents",
            "MIMIC_IV_MicrobiologyTest/",
            row["test_name"],
            row["vocabulary_id"],
            row["target_concept_id"],
            row["ontology_concept_name"],
        ]
    )

gcpt_obs_drgcodes

In [None]:
# Define the schema correctly
drgschema = {"drg_code": pl.Utf8}  # Use pl.Utf8 to specify string type

# Read the CSV file with the correct schema and select the unique rows
drgcodes_df = (
    pl.read_csv(
        os.path.join(MIMIC_DIR, "hosp", "drgcodes.csv"),
        columns=["drg_type", "drg_code", "description"],
        schema_overrides=drgschema,
    )
    .unique()
    .rename({"description": "concept_code"})
)

# Combine "drg_type" and "drg_code" into a single string column
drgcodes_df = drgcodes_df.with_columns(
    (pl.col("drg_type") + "/" + pl.col("drg_code")).alias("source_code")
)

# Convert concept_code to lowercase to ensure case-insensitive join
drgcodes_df = drgcodes_df.with_columns(
    pl.col("concept_code").str.to_lowercase().alias("concept_code")
)

# Select the combined column
drgcodes_df = drgcodes_df.select(["source_code", "concept_code"])

# Load the mapping dataframe and also convert concept_code to lowercase
drg_mapping_df = pl.read_csv(
    os.path.join(MAPPING_DIR, "gcpt_obs_drgcodes.csv")
).with_columns(pl.col("concept_code").str.to_lowercase().alias("concept_code"))

# Perform the first join between drgcodes_df and drg_mapping_df
joined_drg_df = drgcodes_df.join(drg_mapping_df, on="concept_code", how="inner")

# Perform the second join with processed_omop_concepts on target_concept_id
joined_drg_df = joined_drg_df.join(
    processed_omop_concepts, on="target_concept_id", how="left"
)

# Select the relevant columns and filter out rows where vocabulary_id is null
result_df = joined_drg_df.select([*relevant_mapping_cols, "source_code"]).filter(
    pl.col("vocabulary_id").is_not_null()
)

for row in result_df.iter_rows(named=True):
    final_mappings.append(
        [
            "gcpt_obs_drgcodes",
            "drgcodes",
            "",
            row["source_code"],
            row["vocabulary_id"],
            row["target_concept_id"],
            row["ontology_concept_name"],
        ]
    )

gcpt_proc_datetimeevents

In [None]:
items_def_df = pl.read_csv(
    os.path.join(MIMIC_DIR, "icu", "d_items.csv"), columns=["itemid"]
).rename({"itemid": "concept_code"})
datetimeevents_mapping_df = pl.read_csv(
    os.path.join(MAPPING_DIR, "gcpt_proc_datetimeevents.csv")
)
datetimeevents_mapping_df = datetimeevents_mapping_df.join(
    processed_omop_concepts, on="target_concept_id", how="left"
)
joined_datetimeevents_df = items_def_df.join(
    datetimeevents_mapping_df, on="concept_code", how="inner"
).select(relevant_mapping_cols)

for row in joined_datetimeevents_df.iter_rows(named=True):
    final_mappings.append(
        [
            "gcpt_proc_datetimeevents",
            "d_items",
            "MIMIC_IV_ITEM/",
            row["concept_code"],
            row["vocabulary_id"],
            row["target_concept_id"],
            row["ontology_concept_name"],
        ]
    )

gcpt_proc_itemid

In [None]:
items_def_df = pl.read_csv(
    os.path.join(MIMIC_DIR, "icu", "d_items.csv"), columns=["itemid"]
).rename({"itemid": "concept_code"})
proc_itemid_mapping_df = pl.read_csv(os.path.join(MAPPING_DIR, "gcpt_proc_itemid.csv"))
proc_itemid_mapping_df = proc_itemid_mapping_df.join(
    processed_omop_concepts, on="target_concept_id", how="left"
)
joined_proc_itemid_df = items_def_df.join(
    proc_itemid_mapping_df, on="concept_code", how="inner"
).select(relevant_mapping_cols)

for row in joined_proc_itemid_df.iter_rows(named=True):
    final_mappings.append(
        [
            "gcpt_proc_itemid",
            "d_items",
            "MIMIC_IV_ITEM/",
            row["concept_code"],
            row["vocabulary_id"],
            row["target_concept_id"],
            row["ontology_concept_name"],
        ]
    )

gcpt_vis_admission.log

In [None]:
admission_mapping_df = pl.read_csv(os.path.join(MAPPING_DIR, "gcpt_vis_admission.csv"))
# check mapping types
print(admission_mapping_df.select("source_vocabulary_id").unique())

# Verifiy that admission_type and values of mimic_vis_admission_type match
print(
    pl.read_csv(os.path.join(MIMIC_DIR, "hosp", "admissions.csv"))
    .select("admission_type")
    .unique()
)
admission_type_df = admission_mapping_df.filter(
    pl.col("source_vocabulary_id") == "mimiciv_vis_admission_type"
)
print(admission_type_df["concept_code"])

joined_admission_type_df = admission_type_df.join(
    processed_omop_concepts, on="target_concept_id", how="left"
).select(relevant_mapping_cols)

for row in joined_admission_type_df.iter_rows(named=True):
    final_mappings.append(
        [
            "gcpt_vis_admission",
            "admissions",
            "MIMIC_IV_Admission/",
            row["concept_code"],
            row["vocabulary_id"],
            row["target_concept_id"],
            row["ontology_concept_name"],
        ]
    )

discharge_location_df = admission_mapping_df.filter(
    pl.col("source_vocabulary_id") == "mimiciv_vis_discharge_location"
)
joined_discharge_location_df = discharge_location_df.join(
    processed_omop_concepts, on="target_concept_id", how="left"
).select(relevant_mapping_cols)

for row in joined_discharge_location_df.iter_rows(named=True):
    final_mappings.append(
        [
            "gcpt_vis_admission",
            "admissions",
            "MIMIC_IV_Discharge_Location/",
            row["concept_code"],
            row["vocabulary_id"],
            row["target_concept_id"],
            row["ontology_concept_name"],
        ]
    )

service_df = admission_mapping_df.filter(
    pl.col("source_vocabulary_id") == "mimiciv_vis_service"
)
joined_service_df = service_df.join(
    processed_omop_concepts, on="target_concept_id", how="left"
).select(relevant_mapping_cols)

for row in joined_service_df.iter_rows(named=True):
    final_mappings.append(
        [
            "gcpt_vis_admission",
            "services",
            "MIMIC_IV_Service/",
            row["concept_code"],
            row["vocabulary_id"],
            row["target_concept_id"],
            row["ontology_concept_name"],
        ]
    )

admission_location_df = admission_mapping_df.filter(
    pl.col("source_vocabulary_id") == "mimiciv_vis_admission_location"
)
joined_admission_location_df = admission_location_df.join(
    processed_omop_concepts, on="target_concept_id", how="left"
).select(relevant_mapping_cols)

for row in joined_admission_location_df.iter_rows(named=True):
    final_mappings.append(
        [
            "gcpt_vis_admission",
            "admissions",
            "MIMIC_IV_Admission_Location/",
            row["concept_code"],
            row["vocabulary_id"],
            row["target_concept_id"],
            row["ontology_concept_name"],
        ]
    )

We also add the ethnicity file as it is clear form where the data came from
gcpt_per_ethnicity

In [None]:
ethnicity_mapping_df = pl.read_csv(os.path.join(MAPPING_DIR, "gcpt_per_ethnicity.csv"))

joined_ethnicity_mapping_df = ethnicity_mapping_df.join(
    processed_omop_concepts, on="target_concept_id", how="left"
).select(relevant_mapping_cols)

for row in joined_admission_type_df.iter_rows(named=True):
    final_mappings.append(
        [
            "gcpt_per_ethnicity",
            "admissions",
            "MIMIC_IV_Race/",
            row["concept_code"],
            row["vocabulary_id"],
            row["target_concept_id"],
            row["ontology_concept_name"],
        ]
    )

### Individual Mappig
We also add limited individual mappings

Check available gender column values

In [None]:
pl.read_csv(os.path.join(MIMIC_DIR, "hosp", "patients.csv")).select(["gender"]).unique()

In [None]:
individual_mappings = [
    ["individual", "patients", "MIMIC_IV_Gender/", "F", "Gender/F", 8532, "FEMALE"],
    ["individual", "patients", "MIMIC_IV_Gender/", "M", "Gender/M", 8507, "MALE"],
]

final_mappings.extend(individual_mappings)

### Store final mappings in result

In [None]:
# Convert the final mappings list into a Polars DataFrame
final_mappings_df = pl.DataFrame(
    final_mappings,
    schema=[
        "mapping_source_table",
        "table_name",
        "etl_prefix",
        "source_code",
        "vocabulary_id",
        "target_concept_id",
        "ontology_concept_name",
    ],
)

# Save the DataFrame to a CSV file
final_mappings_df.write_csv(
    os.path.join(MAPPING_DIR, "mapping_results", "final_mappings.csv")
)