# Migrate OMOP v5.4 Vocabulary and Concept Tables to DuckDB

## Overview

This notebook demonstrates how to migrate vocabulary and concept tables from [Broadsea](https://github.com/OHDSI/Broadsea) PostgreSQL loaded with OMOP Vocab to a local DuckDB database.

### Import Libraries

We will use Ibis as the primary library to connect to both PostgreSQL and DuckDB. In addition, we use toml to store variables for connecting to PostgreSQL.

In [2]:
import os
import tomllib
import ibis



# Load the TOML file
with open("config.toml", "rb") as file:
    config = tomllib.load(file)

POSTGRES_USER = config['broadsea_postgresql']['POSTGRES_USER']
POSTGRES_PASSWORD = config['broadsea_postgresql']['POSTGRES_PASSWORD']
POSTGRES_DB = config['broadsea_postgresql']['POSTGRES_DB']
POSTGRES_SCHEMA = config['broadsea_postgresql']['POSTGRES_SCHEMA']
POSTGRES_HOST = config['broadsea_postgresql']['POSTGRES_HOST']
POSTGRES_PORT = config['broadsea_postgresql']['POSTGRES_PORT']


con = ibis.postgres.connect(schema=POSTGRES_SCHEMA, database=POSTGRES_DB, host=POSTGRES_HOST, user=POSTGRES_USER, password=POSTGRES_PASSWORD)

con.list_tables()


['attribute_definition',
 'care_site',
 'cdm_source',
 'cohort_definition',
 'concept',
 'concept_ancestor',
 'concept_class',
 'concept_recommended',
 'concept_relationship',
 'concept_synonym',
 'condition_era',
 'condition_occurrence',
 'cost',
 'death',
 'device_exposure',
 'domain',
 'dose_era',
 'drug_era',
 'drug_exposure',
 'drug_strength',
 'fact_relationship',
 'location',
 'measurement',
 'metadata',
 'note',
 'note_nlp',
 'observation',
 'observation_period',
 'payer_plan_period',
 'person',
 'procedure_occurrence',
 'provider',
 'relationship',
 'source_to_concept_map',
 'specimen',
 'visit_detail',
 'visit_occurrence',
 'vocabulary']

In [3]:
%pip install tqdm 

import os
os.environ['PYDEVD_DISABLE_FILE_VALIDATION'] = '1'




### Copy Tables to DuckDB

Next, we will create corresponding tables in DuckDB and copy the tables from the Broadsea PostgreSQL to the local .ddb file. 

In [5]:
import polars as pl
from tqdm import tqdm
import ibis
from concurrent.futures import ThreadPoolExecutor, as_completed

# List of table names
table_names = [
    "concept",
    "procedure_occurrence",
    "observation",
    "death",
    "note_nlp",
    "specimen",
    "location",
    "fact_relationship",
    "cost",
    "condition_era",
    "person",
    "observation_period",
    "visit_occurrence",
    "visit_detail",
    "condition_occurrence",
    "drug_exposure",
    "device_exposure",
    "measurement",
    "note",
    "care_site",
    "provider",
    "payer_plan_period",
    "drug_era",
    "dose_era",
    "cdm_source",
    "concept_ancestor",
    "vocabulary",
    "metadata",
    "domain",
    "concept_class",
    "concept_relationship",
    "relationship",
    "concept_synonym",
    "source_to_concept_map",
    "drug_strength",
    "cohort_definition",
    "attribute_definition",
]


def create_duckdb_table(table_name, con):
    # Fetch the table from Polars connection
    t = con.table(table_name)
    
    # Establish a DuckDB connection for this table
    # Note: Each thread creates its own connection. Adjust the parameters as necessary.
    duckdb_con = ibis.connect("duckdb://omop54.ddb", temp_directory='./temp', threads=8, memory_limit='4G')
    
    # Create table in DuckDB
    duckdb_con.create_table(table_name, t.to_pyarrow(), overwrite=True)
    return f"Table {table_name} processed."

def create_duckdb_tables(table_names, con, max_workers=8):
    # Setup progress bar
    pbar = tqdm(total=len(table_names), desc='Processing tables')
    
    # Use ThreadPoolExecutor to process tables in parallel
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Create a future for each table creation task
        futures = {executor.submit(create_duckdb_table, table_name, con): table_name for table_name in table_names}
        
        # As each future completes, update progress bar
        for future in as_completed(futures):
            table_name = futures[future]
            try:
                result = future.result()
                pbar.write(result)
            except Exception as exc:
                pbar.write(f'{table_name} generated an exception: {exc}')
            finally:
                pbar.update(1)
    
    pbar.close()

# Example usage
# Assuming `con` is your Polars connection object
# Create DuckDB tables with multithreading
create_duckdb_tables(table_names, con)

# Connect to DuckDB for further operations
voc_db = ibis.connect("duckdb://omop54.ddb")




Processing tables:   3%|▎         | 1/37 [00:00<00:22,  1.57it/s]

Table note_nlp processed.
Table specimen processed.


Processing tables:   8%|▊         | 3/37 [00:13<02:42,  4.78s/it]

Table procedure_occurrence processed.


Processing tables:  11%|█         | 4/37 [00:14<01:56,  3.52s/it]

Table observation processed.


Processing tables:  14%|█▎        | 5/37 [00:15<01:23,  2.62s/it]

Table fact_relationship processed.


Processing tables:  16%|█▌        | 6/37 [00:16<01:11,  2.29s/it]

Table location processed.


Processing tables:  19%|█▉        | 7/37 [00:17<00:56,  1.89s/it]

Table death processed.


                                                                 

Table cost processed.


Processing tables:  24%|██▍       | 9/37 [00:19<00:35,  1.28s/it]

Table condition_era processed.


Processing tables:  27%|██▋       | 10/37 [00:24<01:10,  2.60s/it]

Table person processed.


Processing tables:  30%|██▉       | 11/37 [00:26<00:56,  2.16s/it]

Table observation_period processed.


Processing tables:  30%|██▉       | 11/37 [00:26<00:56,  2.16s/it]

Table device_exposure processed.


Processing tables:  35%|███▌      | 13/37 [00:28<00:37,  1.58s/it]

Table visit_detail processed.


Processing tables:  38%|███▊      | 14/37 [00:29<00:31,  1.37s/it]

Table condition_occurrence processed.


Processing tables:  41%|████      | 15/37 [00:30<00:30,  1.40s/it]

Table visit_occurrence processed.


Processing tables:  49%|████▊     | 18/37 [00:32<00:14,  1.27it/s]

Table drug_exposure processed.
Table measurement processed.
Table note processed.


Processing tables:  51%|█████▏    | 19/37 [00:32<00:11,  1.59it/s]

Table care_site processed.
Table provider processed.


Processing tables:  62%|██████▏   | 23/37 [00:32<00:04,  3.31it/s]

Table payer_plan_period processed.
Table drug_era processed.
Table dose_era processed.


Processing tables:  65%|██████▍   | 24/37 [00:32<00:03,  3.64it/s]

Table cdm_source processed.


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Processing tables:  68%|██████▊   | 25/37 [00:42<00:27,  2.32s/it]

Table concept processed.


Processing tables:  70%|███████   | 26/37 [01:11<01:38,  8.94s/it]

Table vocabulary processed.


Processing tables:  73%|███████▎  | 27/37 [01:11<01:07,  6.78s/it]

Table metadata processed.


Processing tables:  76%|███████▌  | 28/37 [01:12<00:46,  5.17s/it]

Table concept_class processed.


Processing tables:  78%|███████▊  | 29/37 [01:13<00:31,  3.94s/it]

Table domain processed.


Processing tables:  78%|███████▊  | 29/37 [02:12<00:31,  3.94s/it]

Table relationship processed.


Processing tables:  81%|████████  | 30/37 [02:16<02:19, 19.87s/it]

drug_strength generated an exception: Rescaling Decimal128 value would cause data loss


Processing tables:  86%|████████▋ | 32/37 [02:20<00:59, 11.86s/it]

Table cohort_definition processed.


Processing tables:  86%|████████▋ | 32/37 [02:24<00:59, 11.86s/it]

Table concept_synonym processed.


Processing tables:  92%|█████████▏| 34/37 [02:29<00:24,  8.04s/it]

Table attribute_definition processed.


Processing tables:  92%|█████████▏| 34/37 [02:33<00:24,  8.04s/it]

Table source_to_concept_map processed.


Processing tables:  95%|█████████▍| 35/37 [02:33<00:13,  6.91s/it]

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Processing tables:  97%|█████████▋| 36/37 [03:47<00:26, 26.88s/it]

Table concept_ancestor processed.


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Processing tables: 100%|██████████| 37/37 [04:32<00:00,  7.38s/it]

Table concept_relationship processed.





In [9]:
import os
import tomllib
import ibis


voc_db = ibis.connect("duckdb://omop54.ddb")

voc_db.list_tables()

concepts_tbl = voc_db.table("concept")


ValueError: Unexpected value for 'dtype': 'datetime64[D]'. Must be 'datetime64[s]', 'datetime64[ms]', 'datetime64[us]', 'datetime64[ns]' or DatetimeTZDtype'.