In [None]:
# verify loaded packages
import pkg_resources

for p in pkg_resources.working_set :

    print( p )

In [1]:
# resources :
#   https://learn.microsoft.com/en-us/azure/synapse-analytics/sql/query-parquet-files
#   https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-what-is-delta-lake
#   https://learn.microsoft.com/en-us/azure/synapse-analytics/how-to-analyze-complex-schema


# import necessary packages and libraries
import json, datetime, time

import pandas as pd

from azure.storage.blob import BlobServiceClient, BlobClient

from pyspark.sql.types import *

from pyspark.sql.functions import *

from delta import *

In [2]:
# define necessary connections to storage ( source and destination )
# abfss because my storage account has HNS enabled
adlsAcct = "<storage account url>"
adlsSas = "<sas token>"
adlsCont = "insiders"

# define variables for blob store connection
storage_acct = "sparkmtndatalake"
container_name = "insiders"
linked_svc = "SparkMtnLake"

# sas token will be pulled from linked service definition
sas_token = mssparkutils.credentials.getConnectionStringOrCreds( linked_svc )

spark.conf.set( "fs.azure.sas.%s.%s.dfs.core.usgovcloudapi.net" % ( container_name, storage_acct), sas_token )

httpsUrl = "https://%s.x.y.z.abc/" % ( storage_acct )
abfssUrl = "abfss://%s@%s.x.y.z.abc/" % ( container_name, storage_acct )

rawFldr = "/raw/"
bronzeFldr =  "/bronze/"
silverFldr = "/silver/"

blobSvcConn = BlobServiceClient( httpsUrl, credential = sas_token )

contClient = blobSvcConn.get_container_client( container_name )

In [None]:
##############################
# bronze layer steps, part 1 #
##############################

obsFileList = []
claimFileList = []

for item in contClient.walk_blobs( rawFldr ) :

    # print( item.name )

    for subItem in contClient.walk_blobs( item.name ) :

        subItemName = subItem.name

        # print( subItemName )

        subItemSplit = subItemName.split( "/")

        # print( subItemSplit )

        fileName = subItemSplit[ -1 ]

        # print( fileName )

        ######################################
        # copy patient files to bronze layer #
        ######################################

        if fileName.upper() == "PATIENT.NDJSON" :

            # srcBlob = blobSvcConn.get_blob_client( container_name, subItemName )
            srcBlob = httpsUrl + container_name + "/" + subItemName + sas_token
            # print( srcBlob )

            newSIName = subItemName.replace( "raw/", "bronze/reference_data/" )

            print( "Copying ", srcBlob, " to ", newSIName )

            destBlob = blobSvcConn.get_blob_client( container_name, newSIName )

            destBlob.start_copy_from_url( srcBlob )

            # check status of copy
            status = None

            for i in range( 100 ) :

                blobProps = destBlob.get_blob_properties()

                status = blobProps.copy.status

                print( "Copy Status: ", status )

                if status == "success" :

                    break

                time.sleep( 10 ) # ten second increments

        elif fileName.upper() == "OBSERVATION.NDJSON" :

            obsFileList.append( subItemName )

        elif fileName.upper() == "CLAIM.NDJSON" :

            claimFileList.append( subItemName )

In [None]:
########################
# bronze layer, part 2 #
########################
# get file lists created above
print( "*****" )
print( "Partiioning Claims Files by Issued Year :")
print( "*****" )

# print( claimFileList )

filesToLoadList = []

# cheated a little here
writeUrlPath = abfssUrl + "bronze/"
firstFilePath = abfssUrl + "raw/000047ca-00c7-492b-bf65-740805144cd2/"

# create schema from first Claims file
pathClaimSchema = firstFilePath + "Claim.ndjson"
claimSchema = spark.read.option( "multiline", "true" ).json( pathClaimSchema ).schema

for file in claimFileList :

    fileToLoad = abfssUrl + file

    # print( fileToLoad )

    filesToLoadList.append( fileToLoad )

# single Claims dataframe with all Claims files
claimsDf = spark.read.option( "multiline", "true" ).option( "columnNameOfCorruptRecord", "corruptRecord" ).schema( claimSchema ).json( filesToLoadList )
# claimsDf.show( 10 )

# historic data write
exportClaimsDf = claimsDf.withColumn( "year", date_format( col( "created" ), "yyyy" ) ).repartition( "year" )

writePath = writeUrlPath + "historic_data/Claim/"

# exportClaimsDf.filter( ( exportClaimsDf.year >= 2016 ) & ( exportClaimsDf.year < 2021 ) ).write.partitionBy( "year" ).mode( "overwrite" ).json( writePath )

# incremental data write
exportClaimsDf = claimsDf.withColumn( "year", date_format( col( "created" ), "yyyy" ) ).withColumn( "month", date_format( col( "created" ), "mm" ) ).withColumn( "day", date_format( col( "created" ), "dd" ) ).repartition( "year", "month", "day" )

writePath = writeUrlPath + "incremental_data/Claim/"

# exportClaimsDf.filter( "year >= 2021" ).write.partitionBy( "year", "month", "day" ).mode( "overwrite" ).json( writePath )

print( "*****" )
print( "Partitioning Observations Files by Issued Year :" )
print( "*****" )

filesToLoadList = []

pathObsSchema = firstFilePath + "Observation.ndjson"
obSchema = spark.read.option( "multiline", "true" ).json( pathObsSchema ).schema

for file in obsFileList :

    fileToLoad = abfssUrl + file

    filesToLoadList.append( fileToLoad )

# single dataframe for all Observation records
obsDf = spark.read.option( "multiline", "true" ).option( "columnNameOfCorruptRecord", "corruptRecord" ).schema( obSchema ).json( filesToLoadList )

# historic data write
exportObsDf = obsDf.withColumn( "year", date_format( col( "issued" ), "yyyy" ) ).repartition( "year" )

writePath = writeUrlPath + "historic_data/Observation/"

# exportObsDf.filter( ( exportObsDf.year >= 2016 ) & ( exportObsDf.year < 2021 ) ).write.partitionBy( "year" ).mode( "overwrite" ).json( writePath )

# incremental data write
exportObsDf = obsDf.withColumn( "year", date_format( col( "issued" ), "yyyy" ) ).withColumn( "month", date_format( col( "issued" ), "mm" ) ).withColumn( "day", date_format( col( "issued" ), "dd" ) ).repartition( "year", "month", "day" )

writePath = writeUrlPath + "incremental_data/Observation/"

# exportObsDf.filter( "year >= 2021" ).write.partitionBy( "year", "month", "day" ).mode( "overwrite" ).json( writePath )