Construct vocabulary tables, based on tables downloaded from Athena website and available here on s3://hls-eng-data-public/data/rwe/omop-vocabs/ If you like to download a different dataset, downoad the vocabularies from Athena and use databricks dbfs api utilities to upload downloaded vocabularies to dbfs under your vocab_path.

In [25]:


import os
import subprocess
import sys
from typing import List
from pyspark.sql import SparkSession
import logging

# Add parent directory to path for relative import
notebook_dir = os.path.dirname(os.path.abspath('__file__'))
project_root = os.path.abspath(os.path.join(notebook_dir, '..', '..', '..', '..'))
sys.path.append(project_root)

from c1_core.aws_storage_service.aws_storage_bucket import AwsStorageBucket
from c1_core.delta_lake_service.delta_table import DeltaTable


# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Define the base directory


def create_spark_session(app_name="EHR Data Loader", aws_access_key=None, aws_secret_key=None):
    """
    Create and return a Spark session configured for Delta Lake with S3 access.
    """
    # Stop any existing Spark session
    try:
        SparkSession.builder.getOrCreate().stop()
        print("Stopped existing Spark session")
    except:
        print("No existing Spark session to stop")


    # Get the absolute path two levels up from the current file's directory
    jars_home = '/home/developer/projects/delta-spark-handbook/delta-jars'

    print(f"Jars home is set to: {jars_home}")

    # Define the base directory for JAR files
    if not os.path.exists(jars_home):
        raise Exception(f"JAR directory not found at: {jars_home}")

    # Required JARs
    jar_locations = [
        f"{jars_home}/delta-spark_2.12-3.3.0.jar",
        f"{jars_home}/delta-storage-3.3.0.jar",
        f"{jars_home}/hadoop-aws-3.3.4.jar",
        f"{jars_home}/bundle-2.24.12.jar",
        # Add Hadoop client JARs
        f"{jars_home}/hadoop-client-3.3.4.jar",
        f"{jars_home}/hadoop-client-runtime-3.3.4.jar",
        f"{jars_home}/hadoop-client-api-3.3.4.jar"
    ]

    # Verify all JARs exist
    for jar in jar_locations:
        if not os.path.exists(jar):
            raise Exception(f"Required JAR not found: {jar}")

    # Create Hadoop configuration directory
    hadoop_conf_dir = "hadoop-conf"
    os.makedirs(hadoop_conf_dir, exist_ok=True)

    # Write core-site.xml with S3 configuration
    core_site_xml = f"""<?xml version="1.0"?>
<configuration>
    <property>
        <name>fs.s3a.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>
    <property>
        <name>fs.s3a.endpoint</name>
        <value>http://localhost:9000</value>
    </property>
    <property>
        <name>fs.s3a.access.key</name>
        <value>{aws_access_key or 'minioadmin'}</value>
    </property>
    <property>
        <name>fs.s3a.secret.key</name>
        <value>{aws_secret_key or 'minioadmin'}</value>
    </property>
    <property>
        <name>fs.s3a.path.style.access</name>
        <value>true</value>
    </property>
    <property>
        <name>fs.s3a.connection.ssl.enabled</name>
        <value>false</value>
    </property>
</configuration>"""

    with open(f"{hadoop_conf_dir}/core-site.xml", "w") as f:
        f.write(core_site_xml)

    # Set environment variables
    os.environ["HADOOP_CONF_DIR"] = os.path.abspath(hadoop_conf_dir)
    os.environ["SPARK_HOME"] = "/opt/spark"
    os.environ["SPARK_CLASSPATH"] = ":".join(
        [os.path.abspath(jar) for jar in jar_locations])
    os.environ["HADOOP_CLASSPATH"] = os.environ["SPARK_CLASSPATH"]

    # Create Spark session with comprehensive configuration
    builder = (SparkSession.builder
               .appName(app_name)
               .master("local[*]")
               #.master("spark://localhost:7077")
               .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
               .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
               .config("spark.sql.catalogImplementation", "hive")
               .config("spark.hadoop.javax.jdo.option.ConnectionURL", "jdbc:postgresql://localhost:5432/metastore_db")
               .config("spark.hadoop.javax.jdo.option.ConnectionDriverName", "org.postgresql.Driver")
               .config("spark.hadoop.javax.jdo.option.ConnectionUserName", "admin")
               .config("spark.hadoop.javax.jdo.option.ConnectionPassword", "admin")
               .config("spark.hadoop.hive.metastore.uris", "thrift://localhost:9083")
               .config("spark.sql.warehouse.dir", "s3a://wba/warehouse")
               .config("spark.driver.extraClassPath", ":".join([os.path.abspath(jar) for jar in jar_locations]))
               .config("spark.executor.extraClassPath", ":".join([os.path.abspath(jar) for jar in jar_locations]))
               .config("spark.jars.excludes", "org.slf4j:slf4j-log4j12,org.slf4j:slf4j-reload4j,org.slf4j:log4j-slf4j-impl")
               .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
               .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000")
               .config("spark.hadoop.fs.s3a.access.key", aws_access_key or "minioadmin")
               .config("spark.hadoop.fs.s3a.secret.key", aws_secret_key or "minioadmin")
               .config("spark.hadoop.fs.s3a.path.style.access", "true")
               .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
               .config("spark.hadoop.fs.s3a.fast.upload", "true")
               .config("spark.hadoop.fs.s3a.multipart.size", "5242880")
               .config("spark.hadoop.fs.s3a.block.size", "5242880")
               .config("spark.hadoop.fs.s3a.multipart.threshold", "5242880")
               .config("spark.hadoop.fs.s3a.threads.core", "10")
               .config("spark.hadoop.fs.s3a.threads.max", "20")
               .config("spark.hadoop.fs.s3a.max.total.tasks", "50")
               .config("spark.hadoop.fs.s3a.connection.timeout", "60000")
               .config("spark.hadoop.fs.s3a.connection.establish.timeout", "60000")
               .config("spark.hadoop.fs.s3a.socket.timeout", "60000")
               .config("spark.hadoop.fs.s3a.connection.maximum", "50")
               .config("spark.hadoop.fs.s3a.fast.upload.buffer", "bytebuffer")
               .config("spark.hadoop.fs.s3a.fast.upload.active.blocks", "2")
               .config("spark.hadoop.fs.s3a.multipart.purge", "false")
               .config("spark.hadoop.fs.s3a.multipart.purge.age", "86400000")
               .config("spark.hadoop.fs.s3a.retry.limit", "10")
               .config("spark.hadoop.fs.s3a.retry.interval", "1000")
               .config("spark.hadoop.fs.s3a.attempts.maximum", "10")
               .config("spark.hadoop.fs.s3a.connection.request.timeout", "60000")
               .config("spark.hadoop.fs.s3a.threads.keepalivetime", "60000")
               .enableHiveSupport())


             
    return builder.getOrCreate()



In [26]:
import boto3
from urllib.parse import urlparse
from botocore import UNSIGNED
from botocore.config import Config


project_name = 'omop-cdm-100K'
vocab_s3_path = "s3://hls-eng-data-public/data/rwe/omop-vocabs/"

print(f"Using vocabulary tables in {vocab_s3_path}")

# Parse S3 path
parsed = urlparse(vocab_s3_path)
bucket = parsed.netloc
prefix = parsed.path.lstrip('/')

# Initialize S3 client
s3 = boto3.client('s3', config=boto3.session.Config(signature_version=UNSIGNED))

# List objects in S3 prefix
response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)

# Display the list of files
if 'Contents' in response:
    for obj in response['Contents']:
        print(obj['Key'])
else:
    print("No files found in the specified S3 path.")


Using vocabulary tables in s3://hls-eng-data-public/data/rwe/omop-vocabs/
data/rwe/omop-vocabs/CONCEPT.csv.gz
data/rwe/omop-vocabs/CONCEPT_ANCESTOR.csv.gz
data/rwe/omop-vocabs/CONCEPT_CLASS.csv.gz
data/rwe/omop-vocabs/CONCEPT_RELATIONSHIP.csv.gz
data/rwe/omop-vocabs/CONCEPT_SYNONYM.csv.gz
data/rwe/omop-vocabs/DOMAIN.csv.gz
data/rwe/omop-vocabs/DRUG_STRENGTH.csv.gz
data/rwe/omop-vocabs/RELATIONSHIP.csv.gz
data/rwe/omop-vocabs/VOCABULARY.csv.gz


In [27]:
from pyspark.sql.functions import to_date
import tempfile, os, io, gzip, time
from urllib.parse import urlparse
import boto3
from botocore import UNSIGNED
from botocore.config import Config



In [28]:
def ingest():
    """Main function to orchestrate the data processing workflow."""
 
    
    # Configuration

    MINIO_BUCKET = "omop531"
    S3_BUCKET = "hls-eng-data-public"
    S3_PREFIX = "data/rwe/omop-vocabs/"
    
    result = AwsStorageBucket.download_files_from_s3_to_minio(
    minio_bucket_name=MINIO_BUCKET,
    s3_path=f"s3://{S3_BUCKET}/{S3_PREFIX}/",
    data_product_id="test-product",
    environment="test",
    minio_endpoint="http://localhost:9000",  # Use localhost since running on host
    minio_access_key="minioadmin",
    minio_secret_key="minioadmin"
)


In [29]:


def data_load():

    aws_access_key = "minioadmin"
    aws_secret_key = "minioadmin"

    spark = create_spark_session(
        app_name="OMOP Vocab Setup", aws_access_key=aws_access_key, aws_secret_key=aws_secret_key)

    spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY")

    spark.sql("SHOW DATABASES").show()

    database_name = "omop531"

    # Create database if it doesn't exist
    print(f"Creating database {database_name} if it doesn't exist...")
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")
    

    # OMOP tables to process
    # tablelist = [
    #     "DOMAIN", "CONCEPT", "VOCABULARY", "CONCEPT_ANCESTOR", "CONCEPT_RELATIONSHIP",
    #     "RELATIONSHIP", "CONCEPT_SYNONYM", "CONCEPT_CLASS", "DRUG_STRENGTH"
    #]
    
    MINIO_BUCKET = "omop531"
    S3_BUCKET = "hls-eng-data-public"
    S3_LOCAL_BUCKET = "omop531"
    S3_PREFIX = "data/rwe/omop-vocabs"
    
    # tablelist = AwsStorageBucket.list_s3_files_boto(
    #      s3_dir_path=f"s3://{S3_BUCKET}/{S3_PREFIX}/",
    #      file_extension=".gz",
    #      data_product_id="test-product",
    #      environment="test",
    #      public_access=True
    # )
    
    tablelist = AwsStorageBucket.list_s3_files_boto(
         s3_dir_path=f"s3://{S3_LOCAL_BUCKET}/{S3_PREFIX}/",
         file_extension=".gz",
         data_product_id="test-product",
         environment="test",
         public_access=True,
         endpoint_url="http://localhost:9000/",
         aws_access_key_id="minioadmin",
         aws_secret_access_key="minioadmin"
    )
      
    print(tablelist)
    # tablelist = AwsStorageBucket.list_s3_files_minio(
    #     s3_dir_path=f"s3://{S3_LOCAL_BUCKET}/{S3_PREFIX}/",
    #     file_extension=".gz",
    #     data_product_id="test-product",
    #     environment="test",
    #     aws_access_key_id="minioadmin",
    #     aws_secret_access_key="minioadmin"
    # )
    
    # tablelist = [
    #         "s3a://omop531/data/rwe/omop-vocabs/CONCEPT.csv.gz",
    #         "s3a://omop531/data/rwe/omop-vocabs/VOCABULARY.csv.gz",
    #         "s3a://omop531/data/rwe/omop-vocabs/CONCEPT_ANCESTOR.csv.gz",
    #         "s3a://omop531/data/rwe/omop-vocabs/CONCEPT_RELATIONSHIP.csv.gz",
    #         "s3a://omop531/data/rwe/omop-vocabs/RELATIONSHIP.csv.gz",
    #         "s3a://omop531/data/rwe/omop-vocabs/CONCEPT_SYNONYM.csv.gz",
    #         "s3a://omop531/data/rwe/omop-vocabs/CONCEPT_CLASS.csv.gz",
    #         "s3a://omop531/data/rwe/omop-vocabs/DOMAIN.csv.gz",
    #         "s3a://omop531/data/rwe/omop-vocabs/DRUG_STRENGTH.csv.gz"
    #     ]
    # Update paths that start with 's3://' to 's3a://'
    tablelist = [
        table_path.replace("s3://", "s3a://", 1) if table_path.startswith("s3://") else table_path
        for table_path in tablelist
    ]

    # Process each table
    for table_path in tablelist:
        print(f"\n📦 Processing {table_path}...")
        table_file = os.path.basename(table_path)
        table_name = table_file.split(".")[0].lower()  # removes ".gz" or any extension
        DeltaTable.load_file_to_delta(spark, table_path, database_name, table_name)


In [None]:
def main():
    
    # ingest()
    data_load()
    

if __name__ == "__main__":
    main()

Stopped existing Spark session
Jars home is set to: /home/developer/projects/delta-spark-handbook/delta-jars
+---------+
|namespace|
+---------+
|  default|
|  omop531|
+---------+

Creating database omop531 if it doesn't exist...
2025-04-06 15:15:52 - INFO - Listing files in MinIO path: s3://omop531/data/rwe/omop-vocabs/ with extension: .gz


INFO:aws_storage_service:aws_storage_bucket.py:Listing files in MinIO path: s3://omop531/data/rwe/omop-vocabs/ with extension: .gz


2025-04-06 15:15:52 - INFO - Found 9 files.


INFO:aws_storage_service:aws_storage_bucket.py:Found 9 files.


['s3://omop531/data/rwe/omop-vocabs/CONCEPT.csv.gz', 's3://omop531/data/rwe/omop-vocabs/CONCEPT_ANCESTOR.csv.gz', 's3://omop531/data/rwe/omop-vocabs/CONCEPT_CLASS.csv.gz', 's3://omop531/data/rwe/omop-vocabs/CONCEPT_RELATIONSHIP.csv.gz', 's3://omop531/data/rwe/omop-vocabs/CONCEPT_SYNONYM.csv.gz', 's3://omop531/data/rwe/omop-vocabs/DOMAIN.csv.gz', 's3://omop531/data/rwe/omop-vocabs/DRUG_STRENGTH.csv.gz', 's3://omop531/data/rwe/omop-vocabs/RELATIONSHIP.csv.gz', 's3://omop531/data/rwe/omop-vocabs/VOCABULARY.csv.gz']

📦 Processing s3a://omop531/data/rwe/omop-vocabs/CONCEPT.csv.gz...


                                                                                

📁 Calculated table path: s3a://wba/warehouse/omop531.db/concept


25/04/06 15:16:22 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/04/06 15:18:43 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `spark_catalog`.`omop531`.`concept` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
25/04/06 15:18:43 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.


Successfully loaded s3a://omop531/data/rwe/omop-vocabs/CONCEPT.csv.gz into table omop531.concept

📦 Processing s3a://omop531/data/rwe/omop-vocabs/CONCEPT_ANCESTOR.csv.gz...


                                                                                

📁 Calculated table path: s3a://wba/warehouse/omop531.db/concept_ancestor


INFO:py4j.clientserver:Error while receiving.                       (0 + 1) / 1]
Traceback (most recent call last):
  File "/home/developer/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
RuntimeError: reentrant call inside <_io.BufferedReader name=71>
INFO:py4j.clientserver:Closing down clientserver connection
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/developer/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
RuntimeError: reentrant call inside <_io.BufferedReader name=71>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/developer/.local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/developer/

Error loading s3a://omop531/data/rwe/omop-vocabs/CONCEPT_ANCESTOR.csv.gz into table omop531.concept_ancestor: An error occurred while calling o709.save

📦 Processing s3a://omop531/data/rwe/omop-vocabs/CONCEPT_CLASS.csv.gz...


                                                                                

📁 Calculated table path: s3a://wba/warehouse/omop531.db/concept_class


25/04/06 15:28:51 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `spark_catalog`.`omop531`.`concept_class` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.


Successfully loaded s3a://omop531/data/rwe/omop-vocabs/CONCEPT_CLASS.csv.gz into table omop531.concept_class

📦 Processing s3a://omop531/data/rwe/omop-vocabs/CONCEPT_RELATIONSHIP.csv.gz...


                                                                                