# updateECRdatastore

This notebook updates the ECR datastore delta table with new ECR records (`PARSED_ECR_PATH`); a new ECR datastore delta table is created if one does not already exist.

In [None]:
# Set paths
STORAGE_ACCOUNT = "$STORAGE_ACCOUNT"
BASE_DATASTORE_DIRECTORY = "ecr-datastore"
DELTA_TABLES_FILESYSTEM = f"abfss://delta-tables@{STORAGE_ACCOUNT}.dfs.core.windows.net/"
ECR_DATASTORE_PATH = DELTA_TABLES_FILESYSTEM + "ecr-datastore"
ECR_DATASTORE_DAILY_EXTRACT_PATH = DELTA_TABLES_FILESYSTEM + "ecr-datastore"
PARSED_ECR_PATH = DELTA_TABLES_FILESYSTEM + "raw_data"
DAILY_EXTRACT_FORMATS = ["parquet","csv"]

In [None]:
from notebookutils import mssparkutils

# Set up for writing to blob storage
delta_bucket_name = "delta-tables"
linked_service_name = "$BLOB_STORAGE_LINKED_SERVICE" 
blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)
wasb_path = 'wasbs://%s@%s.blob.core.windows.net/' % (delta_bucket_name, STORAGE_ACCOUNT)
spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (delta_bucket_name, STORAGE_ACCOUNT), blob_sas_token)
# Try mounting the remote storage directory at the mount point
try:
    mssparkutils.fs.mount(
        wasb_path,
        "/",
        {"LinkedService": linked_service_name}
    )
except:
    print("Already mounted")

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    FloatType,
    BooleanType,
    DateType,
    TimestampType,
    ArrayType
)
from pyspark.sql.functions import explode_outer

# Prepare Schemas
ecr_schema_path = f"abfss://delta-tables@{STORAGE_ACCOUNT}.dfs.core.windows.net/ecr_datastore_config.json"
ecr_schema = spark.read.json(ecr_schema_path,multiLine=True)

def prepare_schemas(ecr_schema):
    table_schemas = {
    "core":{
        "patient_id": ["string", False],
        "person_id": ["string", False],
        "person_id_date_added": ["timestamp", True],
        "iris_id": ["string", True],
        "iris_id_date_added": ["timestamp", True],
        "incident_id": ["string", True],
        "incident_id_date_added": ["timestamp", True]}
    }

    row = ecr_schema.collect()[0].asDict()
    for column_name, column_data in row.items():
        if column_data['data_type'] != "array":
            table_schemas["core"][column_name] = [column_data['data_type'], column_data['nullable']]
        else:
            table_schemas[column_name] = {}
            table_schemas[column_name]['eicr_id'] = ['string', False]
            table_schemas[column_name][column_name] = {}
            
            for secondary_column_name, secondary_column_data in row[column_name]['secondary_schema'].asDict().items():
                table_schemas[column_name][column_name][secondary_column_name] = [secondary_column_data['data_type'], secondary_column_data['nullable']]

    return table_schemas

def get_schemas(table_schemas: dict) -> dict[StructType]:
    """
    Get a Spark StructType object from a JSON schema string.

    :param schema: A dictionary defining the schema(s) of the ECR datastore including 
        the data type of each field and whether null values are allowed. Should be of the form:
        '{"fieldname": [<data type>, <nullable?(True/False)>]}'.
    :return: A dictionary containing a Spark StructType object representing the schema 
    and a dictionary defining field mappings for merge operations for each table. 
    """

    schema_type_map = {
        "string": StringType(),
        "integer": IntegerType(),
        "float": FloatType(),
        "boolean": BooleanType(),
        "date": DateType(),
        "datetime": TimestampType(),
        "number": IntegerType()
    }
    spark_schemas = {}

    for table_name, schema in table_schemas.items():
        spark_schema = StructType()
        flattened_df_schema = StructType()
        merge_schema = {}
        array_fields = []

        for field in schema:
            if isinstance(schema[field], dict):
                for array_field, data in schema[field].items():
                    array_fields.append(StructField(array_field, schema_type_map[data[0]],data[1]))
                    merge_schema[array_field] = "new." + array_field
                    flattened_df_schema.add(StructField(array_field, schema_type_map[data[0]],data[1]))
                spark_schema.add(StructField(field, ArrayType(
                    StructType(array_fields)
                )))
                        
            else:
                spark_schema.add(StructField(field, schema_type_map[schema[field][0]], schema[field][1]))
                flattened_df_schema.add(StructField(field, schema_type_map[schema[field][0]], schema[field][1]))
                merge_schema[field] = "new." + field

        spark_schemas[table_name] = {
            "spark_schema": spark_schema,
            "merge_schema": merge_schema,
            "flattened_df_schema": flattened_df_schema
        }

    return spark_schemas

# Prepare Schemas 
table_schemas = prepare_schemas(ecr_schema)

# Format table schemas for spark
spark_schemas = get_schemas(table_schemas)

# Initialize Spark session
spark = (
    SparkSession.builder.master("local[*]")
    .appName("Update eCR Datastore")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .getOrCreate()
)  

In [None]:
from delta import DeltaTable

def update_ecr_datastore(schemas,table_name,table_schemas,ECR_DATASTORE_PATH, DELTA_TABLES_FILESYSTEM):
    schema = schemas['spark_schema']
    merge_schema = schemas['merge_schema']
    flattened_df_schema = schemas['flattened_df_schema']

    # Read JSON files into a DataFrame with the specified schema
    print("reading raw JSON data...")
    new_ecr_records = spark.read.schema(schema).json(PARSED_ECR_PATH,multiLine=True)

    print("Preparing data to be written to table...")
    # Transform non-core tables
    if table_name != "core":
        # Prep for exploding nested data
        new_ecr_records = new_ecr_records.filter(new_ecr_records[table_name].isNotNull())
        table_schema = table_schemas[table_name]
        explode_cols = []
        for col,col_data in table_schema.items():
            if type(col_data) == dict:
                explode_col = col
                for c in col_data.keys():
                    explode_cols.append(c)

        new_ecr_records = new_ecr_records.selectExpr("eicr_id", f"explode({explode_col}) as alias").select(["eicr_id"] + [f"alias.{column}" for column in explode_cols])
    
    # Temporary correction for quantitaitve values that are currently being parsed incorrectly as strings
    if table_name == "labs":
        string_to_float_columns = ['test_result_quantitative', 'test_result_ref_range_high', 'test_result_ref_range_low']
        for column in string_to_float_columns:
            new_ecr_records=new_ecr_records.withColumn(column, new_ecr_records[column].cast("float").alias(column))
    
    # Check if the core table exists merge records otherwise create/overwrite the table
    # TODO implement a primary key other tables similar to ecir_id in core. 
    #   Then we can stop overwriting in favor of a merge and ultimately only read in new or changed records as opposed to all of them as we are currently doing. 
    print("writing new records...")
    ECR_DATASTORE_PATH = ECR_DATASTORE_PATH + f"-{table_name}"
    if DeltaTable.isDeltaTable(spark, ECR_DATASTORE_PATH) and table_name == "core":
        ecr_datastore = DeltaTable.forPath(spark, ECR_DATASTORE_PATH)

        ecr_datastore.alias("old").merge(
            new_ecr_records.alias("new"), "old.eicr_id = new.eicr_id"
        ).whenNotMatchedInsert(values=merge_schema).execute()
    else:
        # If Delta table doesn't exist, create it.
        new_ecr_records.write.format("delta").mode("overwrite").save(ECR_DATASTORE_PATH)    

    # Optimize query performance by compacting the deltatable
    print("Optimizing...")
    ecr_datastore = DeltaTable.forPath(spark, ECR_DATASTORE_PATH)
    ecr_datastore.optimize().executeCompaction()

    print("Writing daily extracts...")
    # Make a copy of the Delta table in CSV format for easy access.
    ecr_datastore = ecr_datastore.toDF()

    for format in DAILY_EXTRACT_FORMATS:

        # Write standard pyspark directories for each file format
        # Force pyspark to coalesce the results into a single file
        format_path = ECR_DATASTORE_PATH + "." + format
        # modified_datastore_directory = ECR_DATASTORE_PATH + "." + format + "/"
        ecr_datastore.coalesce(1).write.format(format).option("header",True).mode('overwrite').save(format_path)

        # Locate the file which actually has the data amidst the pyspark kruft
        partial_file = ""
        for f in mssparkutils.fs.ls(format_path):
            file_in_namespace = f.path.split("/")[-1]
            if file_in_namespace.startswith("part-") and file_in_namespace.endswith("." + format):
                partial_file = f.path

        # Create a copy of just the data at the root level, formatted appropriately
        mssparkutils.fs.cp(partial_file, DELTA_TABLES_FILESYSTEM + f"/updated_ecr_datastore/updated_ecr_datastore-{table_name}." + format)

        # Now delete the pyspark junk folder by deleting all virtual filepaths
        mssparkutils.fs.rm(format_path, recurse=True)


In [None]:
for table_name, schemas in spark_schemas.items():
    update_ecr_datastore(schemas,table_name,table_schemas, ECR_DATASTORE_PATH, DELTA_TABLES_FILESYSTEM)
