Notebook: GSA_SAM_Update_SCD_PUBLIC<br>
Created by: Joshua Wilshere<br>
Created On: 4/3/24<br><br>
Synapse Spark Pool Version: 3.4<br><br>
Dependencies: 
1. Unzipped pipe-delimited SAM monthly file in source_dir
    - Historical Public Files available here: https://sam.gov/data-services/Entity%20Registration/Public%20-%20Historical?privacy=Public
    - Current Public Files available here: https://sam.gov/data-services/Entity%20Registration/Public%20V2?privacy=Public
2. Accurate list of SAM column headers in GSA_SAM_COLUMN_HEADERS_PUBLIC.csv
3. Data already written to target (silver_path) via GSA_SAM_Initial_Delta_Table_Creation_PUBLIC notebook</ol><br><br> 
Purpose: Write data incrementally from incoming daily or monthly SAM file to the target data source

In [62]:
# Processes in this notebook
# 1. Load SAM file column headers into Pandas dataframe
# 2. Create dictionary of column headers with all dtypes set to string
# 3. Load new monthly SAM file into Pandas dataframe using dictionary of column headers and datatypes
# 4. Convert Pandas dataframe to Spark dataframe
# 5. Combine composite key columns into single unique entity key
# 6. Add initial audit columns
# 7. Convert NullType() Columns to StringType()
# 8. Create hash code of fields to track changes in (AUD_HASH_CODE)
# 9. Create surrogate unique key (AUD_SEQ_ID)
# 10. Create spark dataframe from target (silver) delta lake data set
# 11. Define and prepare the rows in the incoming monthly file that are new or updates to existing records
# 12. Update the AUD_ACTIVE_FLAG of updated records in the target (silver) from 'Y' to 'N'
#   Option 1: Using Merge Command. This option is slower than Option 2 on this data set.
#   Option 2: Using replaceWhere Command. This option is faster than Option 1 on this data set.
#       2(a): Create dataframe of records to change from Active to Inactive via left semi join on target data
#       2(b) Update records in target using "replaceWhere" using the AUD_SEQ_ID filter defined in previous step
# 13. Insert new and updated records to target via delta append

# Adapted from: https://iterationinsights.com/article/how-to-implement-slowly-changing-dimensions-scd-type-2-using-delta-table/

StatementMeta(spk3u4py3u10, 1, 63, Finished, Available, Finished)

In [63]:
# To Do: 
# 1. Add in code to flag entities as deleted
# 2. Test process to overwrite entire dataframe with new, inserted, and updated records
#    a. Also test using delta files partitioned on AUD_ACTIVE_FLAG. Can append inactive records to 'N' partition, drop from 'Y' partition
#       and then insert new active records

StatementMeta(spk3u4py3u10, 1, 64, Finished, Available, Finished)

# <font size="5">0. Initialize packages, file names, and file paths

In [64]:
import pandas as pd
import os

# https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html
#from pyspark.sql.functions import concat, col, coalesce, lit, sha2, concat_ws, row_number, cast, window
from delta.tables import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from datetime import datetime


pd.options.display.max_columns = None

StatementMeta(spk3u4py3u10, 1, 65, Finished, Available, Finished)

In [65]:
# Set the file name of column header definitions
header_csv = 'GSA_SAM_COLUMN_HEADERS_PUBLIC.csv'

# Set the source file to be loaded
source_file = 'SAM_PUBLIC_MONTHLY_V2_20250504.dat'

# Set the source_date as the file date
source_date = source_file[-12:-4]

# Convert source date from YYYYMMDD to YYYY-MM-DD for implicit conversion in merge and filter statements
source_dt = datetime.strptime(source_date, '%Y%m%d').strftime('%Y-%m-%d')

StatementMeta(spk3u4py3u10, 1, 66, Finished, Available, Finished)

In [None]:
source_dir = 'abfss://bronze@<storage account name>.dfs.core.usgovcloudapi.net/GSA_SAM_PUBLIC/'
silver_path = 'abfss://silver@<storage account name>.dfs.core.usgovcloudapi.net/GSA_SAM_PUBLIC/'
gold_path = 'abfss://gold@<storage account name>.dfs.core.usgovcloudapi.net/GSA_SAM_PUBLIC/'

StatementMeta(spk3u4py3u10, 1, 67, Finished, Available, Finished)

# <font size="5">1. Load SAM file column headers into Pandas dataframe

In [None]:
# Create dataframe with column names
#col_df = pd.read_csv(source_dir + header_csv,
col_df = pd.read_csv(os.path.join(source_dir + header_csv),
    storage_options = {'linked_service' : '<linked service name>'})

StatementMeta(spk3u4py3u10, 1, 68, Finished, Available, Finished)

In [68]:
#col_df

StatementMeta(spk3u4py3u10, 1, 69, Finished, Available, Finished)

# <font size="5">2. Create dictionary of column headers with all dtypes set to string 

In [69]:
# Create list of column names and dictionary of column names with all datatypes set to 'str'
var_dict = {}
var_list = []
for i in range(col_df.shape[1]):
    #var_str = ''
    #var_str = "'{}':'object'".format(col_df.columns[i])
    var_dict[col_df.columns[i]] = 'str'
    var_list.append(col_df.columns[i])

StatementMeta(spk3u4py3u10, 1, 70, Finished, Available, Finished)

In [70]:
#print(var_dict)

StatementMeta(spk3u4py3u10, 1, 71, Finished, Available, Finished)

In [71]:
#print(var_list)

StatementMeta(spk3u4py3u10, 1, 72, Finished, Available, Finished)

In [72]:
source_path = os.path.join(source_dir, source_file)
#print(source_path)

StatementMeta(spk3u4py3u10, 1, 73, Finished, Available, Finished)

# <font size="5">3. Load daily SAM file into Pandas dataframe using dictionary of column headers and datatypes

In [None]:
# Create dataframe from datafile using columns and datatypes set above
pdf = pd.read_csv(source_path,
    #delimiter='|',
    sep='|',
    names=var_list,
    dtype=var_dict,
    quoting=3, #3 = QUOTE_NONE
    doublequote=False,
    # skip garbage header and footer rows
    skiprows=1,
    skipfooter=1,
    # python engine must be specified for skipfooter to work
    engine='python',
    ### for testing errors/rejected records ###
    # 'error', raise an Exception when a bad line is encountered.
    # 'warn', raise a warning when a bad line is encountered and skip that line.
    # 'skip', skip bad lines without raising or warning when they are encountered.
    on_bad_lines = 'warn',

    storage_options = {'linked_service' : '<linked service name>'})

StatementMeta(spk3u4py3u10, 1, 74, Finished, Available, Finished)

In [74]:
# Confirm that all dtypes are set to 'object'
# Expected output: Series([], dtype: object)
print(pdf.dtypes[pdf.dtypes != 'object'])

StatementMeta(spk3u4py3u10, 1, 75, Finished, Available, Finished)

Series([], dtype: object)


In [75]:
pdf.head(2)

StatementMeta(spk3u4py3u10, 1, 76, Finished, Available, Finished)

Unnamed: 0,UNIQUE_ENTITY_ID,BLANK_DEPRECATED,ENTITY_EFT_INDICATOR,CAGE_CODE,DODAAC,SAM_EXTRACT_CODE,PURPOSE_OF_REGISTRATION,INITIAL_REGISTRATION_DATE,REGISTRATION_EXPIRATION_DATE,LAST_UPDATE_DATE,ACTIVATION_DATE,LEGAL_BUSINESS_NAME,DBA_NAME,ENTITY_DIVISION_NAME,ENTITY_DIVISION_NUMBER,PHYSICAL_ADDRESS_LINE_1,PHYSICAL_ADDRESS_LINE_2,PHYSICAL_ADDRESS_CITY,PHYSICAL_ADDRESS_PROVINCE_OR_STATE,PHYSICAL_ADDRESS_ZIP/POSTAL_CODE,PHYSICAL_ADDRESS_ZIP_CODE_4,PHYSICAL_ADDRESS_COUNTRY_CODE,PHYSICAL_ADDRESS_CONGRESSIONAL_DISTRICT,D&B_OPEN_DATA_FLAG,ENTITY_START_DATE,FISCAL_YEAR_END_CLOSE_DATE,ENTITY_URL,ENTITY_STRUCTURE,STATE_OF_INCORPORATION,COUNTRY_OF_INCORPORATION,BUSINESS_TYPE_COUNTER,BUS_TYPE_STRING,PRIMARY_NAICS,NAICS_CODE_COUNTER,NAICS_CODE_STRING,PSC_CODE_COUNTER,PSC_CODE_STRING,CREDIT_CARD_USAGE,CORRESPONDENCE_FLAG,MAILING_ADDRESS_LINE_1,MAILING_ADDRESS_LINE_2,MAILING_ADDRESS_CITY,MAILING_ADDRESS_ZIP/POSTAL_CODE,MAILING_ADDRESS_ZIP_CODE_4,MAILING_ADDRESS_COUNTRY,MAILING_ADDRESS_STATE_OR_PROVINCE,GOVT_BUS_POC_FIRST_NAME,GOVT_BUS_POC_MIDDLE_INITIAL,GOVT_BUS_POC_LAST_NAME,GOVT_BUS_POC_TITLE,GOVT_BUS_POC_ST_ADD_1,GOVT_BUS_POC_ST_ADD_2,GOVT_BUS_POC_CITY,GOVT_BUS_POC_ZIP/POSTAL_CODE,GOVT_BUS_POC_ZIP_CODE_4,GOVT_BUS_POC_COUNTRY_CODE,GOVT_BUS_POC_STATE_OR_PROVINCE,ALT_GOVT_BUS_POC_FIRST_NAME,ALT_GOVT_BUS_POC_MIDDLE_INITIAL,ALT_GOVT_BUS_POC_LAST_NAME,ALT_GOVT_BUS_POC_TITLE,ALT_GOVT_BUS_POC_ST_ADD_1,ALT_GOVT_BUS_POC_ST_ADD_2,ALT_GOVT_BUS_POC_CITY,ALT_GOVT_BUS_POC_ZIP/POSTAL_CODE,ALT_GOVT_BUS_POC_ZIP_CODE_4,ALT_GOVT_BUS_POC_COUNTRY_CODE,ALT_GOVT_BUS_POC_STATE_OR_PROVINCE,PAST_PERF_POC_POC_FIRST_NAME,PAST_PERF_POC_POC_MIDDLE_INITIAL,PAST_PERF_POC_POC_LAST_NAME,PAST_PERF_POC_POC_TITLE,PAST_PERF_POC_ST_ADD_1,PAST_PERF_POC_ST_ADD_2,PAST_PERF_POC_CITY,PAST_PERF_POC_ZIP/POSTAL_CODE,PAST_PERF_POC_ZIP_CODE_4,PAST_PERF_POC_COUNTRY_CODE,PAST_PERF_POC_STATE_OR_PROVINCE,ALT_PAST_PERF_POC_FIRST_NAME,ALT_PAST_PERF_POC_MIDDLE_INITIAL,ALT_PAST_PERF_POC_LAST_NAME,ALT_PAST_PERF_POC_TITLE,ALT_PAST_PERF_POC_ST_ADD_1,ALT_PAST_PERF_POC_ST_ADD_2,ALT_PAST_PERF_POC_CITY,ALT_PAST_PERF_POC_ZIP/POSTAL_CODE,ALT_PAST_PERF_POC_ZIP_CODE_4,ALT_PAST_PERF_POC_COUNTRY_CODE,ALT_PAST_PERF_POC_STATE_OR_PROVINCE,ELEC_BUS_POC_FIRST_NAME,ELEC_BUS_POC_MIDDLE_INITIAL,ELEC_BUS_POC_LAST_NAME,ELEC_BUS_POC_TITLE,ELEC_BUS_POC_ST_ADD_1,ELEC_BUS_POC_ST_ADD_2,ELEC_BUS_POC_CITY,ELEC_BUS_POC_ZIP/POSTAL_CODE,ELEC_BUS_POC_ZIP_CODE_4,ELEC_BUS_POC_COUNTRY_CODE,ELEC_BUS_POC_STATE_OR_PROVINCE,ALT_ELEC_POC_BUS_POC_FIRST_NAME,ALT_ELEC_POC_BUS_POC_MIDDLE_INITIAL,ALT_ELEC_POC_BUS_POC_LAST_NAME,ALT_ELEC_POC_BUS_POC_TITLE,ALT_ELEC_POC_BUS_ST_ADD_1,ALT_ELEC_POC_BUS_ST_ADD_2,ALT_ELEC_POC_BUS_CITY,ALT_ELEC_POC_BUS_ZIP/POSTAL_CODE,ALT_ELEC_POC_BUS_ZIP_CODE_4,ALT_ELEC_POC_BUS_COUNTRY_CODE,ALT_ELEC_POC_BUS_STATE_OR_PROVINCE,NAICS_EXCEPTION_COUNTER,NAICS_EXCEPTION_STRING,DEBT_SUBJECT_TO_OFFSET_FLAG,EXCLUSION_STATUS_FLAG,SBA_BUSINESS_TYPES_COUNTER,SBA_BUSINESS_TYPES_STRING,NO_PUBLIC_DISPLAY_FLAG,DISASTER_RESPONSE_COUNTER,DISASTER_RESPONSE_STRING,ENTITY_EVS_SOURCE,FLEX_FIELD_1,FLEX_FIELD_2,FLEX_FIELD_3,FLEX_FIELD_4,FLEX_FIELD_5,FLEX_FIELD_6,FLEX_FIELD_7,FLEX_FIELD_8,FLEX_FIELD_9,FLEX_FIELD_10,FLEX_FIELD_11,FLEX_FIELD_12,FLEX_FIELD_13,FLEX_FIELD_14,FLEX_FIELD_15,FLEX_FIELD_16,FLEX_FIELD_17,FLEX_FIELD_18,FLEX_FIELD_19,END_OF_RECORD_INDICATOR
0,C111ATT311C8,,,53YC5,,A,Z2,20131112,20250625,20240627,20240627,K & K CONSTRUCTION SUPPLY INC,,,,11400 WHITE ROCK RD,,RANCHO CORDOVA,CA,95742,6600,USA,6,,20060525,1215,www.kkconstructionsupply.com,2L,NV,USA,5,2X~8W~A2~HQ~XS,423390.0,9,423310Y~423320Y~423390Y~423510Y~423710Y~423990...,1,5680.0,Y,,11400 WHITE ROCK ROAD,,RANCHO CORDOVA,95742,7518,USA,CA,TRACY,,LOVELAND,DIRECTOR,11400 WHITE ROCK ROAD,,RANCHO CORDOVA,95742,,USA,CA,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,TRACY,,LOVELAND,PRESIDENT,11400 WHITE ROCK ROAD,,RANCHO CORDOVA,95742,,USA,CA,,,,,,,,,,,,0,,N,,0,,,0,,E&Y,,,,,,,,,,,,,,,,,,,,!end
1,C111BG66D155,,,6M9A6,,A,Z1,20111228,20250724,20240729,20240726,NEW ADVANCES FOR PEOPLE WITH DISABILITIES,NAPD,NAPD,,3400 N SILLECT AVE,,BAKERSFIELD,CA,93308,6363,USA,20,,19750301,922,www.napd-bak.org,8H,CA,USA,1,A8,,0,,0,,Y,,3400 N. SILLECT AVENUE,,BAKERSFIELD,93308,1815,USA,CA,RICHARD,,BARENCHI,DIRECTOR OF OPERATIONS & FINANCE,3400 N. SILLECT AVENUE,,BAKERSFIELD,93308,,USA,CA,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,RICHARD,,BARENCHI,DIRECTOR OF OPERATIONS & FINANCE,3400 N. SILLECT AVENUE,,BAKERSFIELD,93308,,USA,CA,,,,,,,,,,,,0,,N,,0,,,0,,E&Y,,,,,,,,,,,,,,,,,,,,!end


In [76]:
pdf.shape[0]

StatementMeta(spk3u4py3u10, 1, 77, Finished, Available, Finished)

877338

# <font size="5">4. Convert Pandas dataframe to Spark dataframe

In [77]:
# Convert pandas dataframe to spark dataframe
sam_monthly_df = spark.createDataFrame(pdf)

StatementMeta(spk3u4py3u10, 1, 78, Finished, Available, Finished)

In [78]:
base_cols = sam_monthly_df.columns

StatementMeta(spk3u4py3u10, 1, 79, Finished, Available, Finished)

In [79]:
#display(sam_monthly_df.limit(10))

StatementMeta(spk3u4py3u10, 1, 80, Finished, Available, Finished)

# <font size="5">5. Combine composite key columns into single unique entity key

In [80]:
# Create unique entity key by concatenating UNIQUE_ENTITY_ID,CAGE_CODE,DODAAC

# Add column entityKey to dataframe
# Replace null/undefined values with '' in concat expression
sam_monthly_df= sam_monthly_df.withColumn("entityKey",concat(coalesce(col("UNIQUE_ENTITY_ID"),lit('')),
    coalesce(col("CAGE_CODE"),lit('')),
    coalesce(col("DODAAC"),lit(''))))

StatementMeta(spk3u4py3u10, 1, 81, Finished, Available, Finished)

In [81]:
# Confirm column was added
#sam_monthly_df.columns

StatementMeta(spk3u4py3u10, 1, 82, Finished, Available, Finished)

In [82]:
# Confirm contents of entityKey
display(sam_monthly_df.limit(10).select('entityKey', 'UNIQUE_ENTITY_ID','CAGE_CODE','DODAAC'))

StatementMeta(spk3u4py3u10, 1, 83, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 9fabdeac-495e-42ad-8137-88434fb63aaf)

# <font size="5">6. Add initial audit columns

In [83]:
# Add audit directory and filename columns
# Spark SQL date/time patterns: https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
sam_monthly_df = sam_monthly_df.withColumn("AUD_DIRECTORY",lit(source_dir)).\
                withColumn("AUD_FILENAME",lit(source_file)).\
                withColumn("AUD_ACTIVE_FLAG", lit("Y")).\
                withColumn("EffectiveFromDate", to_date(lit(source_date), "yyyyMMdd")).\
                withColumn("EffectiveToDate", lit(None).cast("date"))

StatementMeta(spk3u4py3u10, 1, 84, Finished, Available, Finished)

In [84]:
# Confirm column was added
#sam_monthly_df.columns

StatementMeta(spk3u4py3u10, 1, 85, Finished, Available, Finished)

In [85]:
# Create list of audit columns shared across tables
aud_cols = ['AUD_DIRECTORY', 'AUD_FILENAME', 'AUD_ACTIVE_FLAG', 'EffectiveFromDate', 'EffectiveToDate']

StatementMeta(spk3u4py3u10, 1, 86, Finished, Available, Finished)

In [86]:
# Confirm column contents
display(sam_monthly_df.limit(2).select(*aud_cols))

StatementMeta(spk3u4py3u10, 1, 87, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, b144d042-bdc0-4231-9261-e4aa37c15764)

In [87]:
# Assign the spark dataframe schema to variable schemaStruct
# .schema returns a StructType object
schemaStruct = sam_monthly_df.schema

StatementMeta(spk3u4py3u10, 1, 88, Finished, Available, Finished)

# <font size="5">7. Convert NullType() Columns to StringType()

In [88]:
# Convert the spark dataframe schema into its own dataframe
# This loop iterates over the object and extracts the necessary elements
# https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.types.StructType.html
# https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.types.StructField.html#pyspark.sql.types.StructField
schemaList = []
for i in schemaStruct.fieldNames():
    schemaList.append(
        {
            'fieldname' : schemaStruct[i].name,
            'datatype': str(schemaStruct[i].dataType),
            'nullable': schemaStruct[i].nullable
        }
    )

schema_df = pd.DataFrame(schemaList)
#schema_df

StatementMeta(spk3u4py3u10, 1, 89, Finished, Available, Finished)

In [89]:
# Check to see if any of the columns are are NullType() prior to writing spark dataframe to delta lake
# NullType() columns will not be written to the delta table, so must be converted to another data type
schema_df.loc[schema_df['datatype']==r'NullType()']

StatementMeta(spk3u4py3u10, 1, 90, Finished, Available, Finished)

Unnamed: 0,fieldname,datatype,nullable
1,BLANK_DEPRECATED,NullType(),True
23,D&B_OPEN_DATA_FLAG,NullType(),True
38,CORRESPONDENCE_FLAG,NullType(),True
122,FLEX_FIELD_1,NullType(),True
123,FLEX_FIELD_2,NullType(),True
124,FLEX_FIELD_3,NullType(),True
125,FLEX_FIELD_4,NullType(),True
126,FLEX_FIELD_5,NullType(),True
127,FLEX_FIELD_6,NullType(),True
128,FLEX_FIELD_7,NullType(),True


In [90]:
# Iterates over the dataframe's schema and casts any NullType() columns to StringType()
for i in schemaStruct.fieldNames():
    if str(schemaStruct[i].dataType) == 'NullType()':
        sam_monthly_df = sam_monthly_df.withColumn(i,col(i).cast('string'))

StatementMeta(spk3u4py3u10, 1, 91, Finished, Available, Finished)

# <font size="5">8. Create hash code of fields to track changes in (AUD_HASH_CODE)

In [91]:
# Compile list of columns to not include the AUD_HASH_CODE column of the sam_main table
key_list = ['UNIQUE_ENTITY_ID', 'CAGE_CODE', 'DODAAC', 'END_OF_RECORD_INDICATOR']
remove_list = ['SAM_EXTRACT_CODE', 'END_OF_RECORD_INDICATOR']
remove_list.extend(key_list)

StatementMeta(spk3u4py3u10, 1, 92, Finished, Available, Finished)

In [92]:
# Create list of columns (hash_cols) that should have changes tracked in sam_main
hash_cols = var_list.copy()
for i in remove_list:
    if i in hash_cols:
        hash_cols.remove(i)

StatementMeta(spk3u4py3u10, 1, 93, Finished, Available, Finished)

In [93]:
#print(hash_cols)

StatementMeta(spk3u4py3u10, 1, 94, Finished, Available, Finished)

In [94]:
# Create the audit hash code of change-tracked columns 
sam_monthly_df = sam_monthly_df.withColumn("AUD_HASH_CODE",lit(sha2(concat_ws("~", *hash_cols), 256)))

StatementMeta(spk3u4py3u10, 1, 95, Finished, Available, Finished)

# <font size="5">9. Create surrogate unique key (AUD_SEQ_ID) and derived Partition Key

In [122]:
# Create the temporary surrogate key/sequence id for the record
window_sort = ['entityKey','AUD_HASH_CODE']
w = Window().orderBy(*window_sort)
sam_monthly_df = sam_monthly_df.withColumn("AUD_SEQ_ID", row_number().over(w))
sam_monthly_df = sam_monthly_df.withColumn("AUD_SEQ_ID",col("AUD_SEQ_ID").cast('long'))

StatementMeta(spk3u4py3u10, 1, 123, Finished, Available, Finished)

In [181]:
# Create Partition Key column based on the first letter of the Unique Entity ID, which has a relatively equal distribution across possible values
sam_monthly_df = sam_monthly_df.withColumn("PARTITION_KEY", substring(col("UNIQUE_ENTITY_ID"), 1, 1))

StatementMeta(spk3u4py3u10, 1, 182, Finished, Available, Finished)

In [182]:
display(sam_monthly_df.limit(2).select("PARTITION_KEY"))

StatementMeta(spk3u4py3u10, 1, 183, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 8ca566be-cbd1-4566-9683-336985265b8c)

In [183]:
print(sam_monthly_df.columns)

StatementMeta(spk3u4py3u10, 1, 184, Finished, Available, Finished)

['entityKey', 'AUD_HASH_CODE', 'AUD_SEQ_ID', 'AUD_ACTIVE_FLAG', 'AUD_DIRECTORY', 'AUD_FILENAME', 'EffectiveFromDate', 'EffectiveToDate', 'UNIQUE_ENTITY_ID', 'BLANK_DEPRECATED', 'ENTITY_EFT_INDICATOR', 'CAGE_CODE', 'DODAAC', 'SAM_EXTRACT_CODE', 'PURPOSE_OF_REGISTRATION', 'INITIAL_REGISTRATION_DATE', 'REGISTRATION_EXPIRATION_DATE', 'LAST_UPDATE_DATE', 'ACTIVATION_DATE', 'LEGAL_BUSINESS_NAME', 'DBA_NAME', 'ENTITY_DIVISION_NAME', 'ENTITY_DIVISION_NUMBER', 'PHYSICAL_ADDRESS_LINE_1', 'PHYSICAL_ADDRESS_LINE_2', 'PHYSICAL_ADDRESS_CITY', 'PHYSICAL_ADDRESS_PROVINCE_OR_STATE', 'PHYSICAL_ADDRESS_ZIP/POSTAL_CODE', 'PHYSICAL_ADDRESS_ZIP_CODE_4', 'PHYSICAL_ADDRESS_COUNTRY_CODE', 'PHYSICAL_ADDRESS_CONGRESSIONAL_DISTRICT', 'D&B_OPEN_DATA_FLAG', 'ENTITY_START_DATE', 'FISCAL_YEAR_END_CLOSE_DATE', 'ENTITY_URL', 'ENTITY_STRUCTURE', 'STATE_OF_INCORPORATION', 'COUNTRY_OF_INCORPORATION', 'BUSINESS_TYPE_COUNTER', 'BUS_TYPE_STRING', 'PRIMARY_NAICS', 'NAICS_CODE_COUNTER', 'NAICS_CODE_STRING', 'PSC_CODE_COUNTER',

In [152]:
# Create list with order of audit columns to appear at beginning of the table
initial_aud_cols = ['entityKey','AUD_HASH_CODE', 'AUD_SEQ_ID',  'AUD_ACTIVE_FLAG', 'AUD_DIRECTORY', 'AUD_FILENAME','EffectiveFromDate', 'EffectiveToDate']

StatementMeta(spk3u4py3u10, 1, 153, Finished, Available, Finished)

In [184]:
# After generating all audit columns, reorder column list
# Delta Lake indexes on initial 32 columns in table, so join conditions should be up front
sam_monthly_df = sam_monthly_df.select(*initial_aud_cols,*base_cols, "PARTITION_KEY")

StatementMeta(spk3u4py3u10, 1, 185, Finished, Available, Finished)

In [185]:
# Convert INITIAL_REGISTRATION_DATE to date type
sam_monthly_df = sam_monthly_df.withColumn("INITIAL_REGISTRATION_DATE",to_date(col("INITIAL_REGISTRATION_DATE"), "yyyyMMdd"))
display(sam_monthly_df.limit(2).select("INITIAL_REGISTRATION_DATE"))

StatementMeta(spk3u4py3u10, 1, 186, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a72bc6d4-551e-4238-b111-54c9cf9ceb48)

In [186]:
#display(sam_monthly_df.limit(2))

StatementMeta(spk3u4py3u10, 1, 187, Finished, Available, Finished)

In [187]:
# Ensure no columns in sam_monthly_df are NullType()
# This cell will only print output if NullType() columns are found
for i in sam_monthly_df.schema.fieldNames():
    if str(sam_monthly_df.schema[i].dataType) == 'NullType()':
        print('sam_monthly_df column {} is NullType()'.format(i))

StatementMeta(spk3u4py3u10, 1, 188, Finished, Available, Finished)

In [157]:
# # Write first sam_main data frame to silver as the baseline
# # Write Dataframe as Delta Table (silver)
# if not DeltaTable.isDeltaTable(spark, silver_main_path):
#     sam_monthly_df.write.format("delta").mode("overwrite").save(silver_main_path)

StatementMeta(spk3u4py3u10, 1, 158, Finished, Available, Finished)

In [188]:
# Create list of columns used in join condition
condition_cols = ['entityKey', 'AUD_HASH_CODE', 'AUD_ACTIVE_FLAG']

StatementMeta(spk3u4py3u10, 1, 189, Finished, Available, Finished)

In [189]:
# Create list of all columns in silver sam_monthly_df dataframe
silver_columns = sam_monthly_df.columns

StatementMeta(spk3u4py3u10, 1, 190, Finished, Available, Finished)

In [190]:
print(silver_columns)

StatementMeta(spk3u4py3u10, 1, 191, Finished, Available, Finished)

['entityKey', 'AUD_HASH_CODE', 'AUD_SEQ_ID', 'AUD_ACTIVE_FLAG', 'AUD_DIRECTORY', 'AUD_FILENAME', 'EffectiveFromDate', 'EffectiveToDate', 'UNIQUE_ENTITY_ID', 'BLANK_DEPRECATED', 'ENTITY_EFT_INDICATOR', 'CAGE_CODE', 'DODAAC', 'SAM_EXTRACT_CODE', 'PURPOSE_OF_REGISTRATION', 'INITIAL_REGISTRATION_DATE', 'REGISTRATION_EXPIRATION_DATE', 'LAST_UPDATE_DATE', 'ACTIVATION_DATE', 'LEGAL_BUSINESS_NAME', 'DBA_NAME', 'ENTITY_DIVISION_NAME', 'ENTITY_DIVISION_NUMBER', 'PHYSICAL_ADDRESS_LINE_1', 'PHYSICAL_ADDRESS_LINE_2', 'PHYSICAL_ADDRESS_CITY', 'PHYSICAL_ADDRESS_PROVINCE_OR_STATE', 'PHYSICAL_ADDRESS_ZIP/POSTAL_CODE', 'PHYSICAL_ADDRESS_ZIP_CODE_4', 'PHYSICAL_ADDRESS_COUNTRY_CODE', 'PHYSICAL_ADDRESS_CONGRESSIONAL_DISTRICT', 'D&B_OPEN_DATA_FLAG', 'ENTITY_START_DATE', 'FISCAL_YEAR_END_CLOSE_DATE', 'ENTITY_URL', 'ENTITY_STRUCTURE', 'STATE_OF_INCORPORATION', 'COUNTRY_OF_INCORPORATION', 'BUSINESS_TYPE_COUNTER', 'BUS_TYPE_STRING', 'PRIMARY_NAICS', 'NAICS_CODE_COUNTER', 'NAICS_CODE_STRING', 'PSC_CODE_COUNTER',

# <font size="5">10. Create spark dataframe from target (silver) delta lake data set

In [191]:
# Initialize silver/target data frame for lookup
sam_main_target_df = spark.read.format("delta").load(silver_path)

StatementMeta(spk3u4py3u10, 1, 192, Finished, Available, Finished)

# <font size="5">11. Define and prepare the rows in the incoming daily file that are new or updates to existing records

In [192]:
# Define the rows to update and join method and conditions via left anti join on incoming monthly file
#.where("AUD_ACTIVE_FLAG = 'Y' AND SAM_EXTRACT_CODE = '3'") \ # SAM_EXTRACT_CODE = '3' is only valid for daily files
RowsToInsert = sam_monthly_df \
            .alias("source") \
            .where("AUD_ACTIVE_FLAG = 'Y'") \
            .join(sam_main_target_df.alias("target"),condition_cols,'leftanti') \
            .select(*silver_columns) \
            .orderBy(col('source.entityKey'))

StatementMeta(spk3u4py3u10, 1, 193, Finished, Available, Finished)

In [193]:
display(RowsToInsert.limit(1))

StatementMeta(spk3u4py3u10, 1, 194, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a2318373-2b3f-412b-a5a5-cf4bbea4e8eb)

In [194]:
print(RowsToInsert.count())

StatementMeta(spk3u4py3u10, 1, 195, Finished, Available, Finished)

567967


In [195]:
display(RowsToInsert.select("UNIQUE_ENTITY_ID", "LEGAL_BUSINESS_NAME", "entityKey", "AUD_SEQ_ID", "AUD_HASH_CODE", "AUD_DIRECTORY", "AUD_FILENAME", "AUD_ACTIVE_FLAG", "EffectiveFromDate", "EffectiveToDate").filter(col("UNIQUE_ENTITY_ID") == lit('F743Q1LZ3VN9')))

StatementMeta(spk3u4py3u10, 1, 196, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5a2137f8-230f-4ab0-8d7b-f81eb34f1831)

In [196]:
# Retrieve maximum surrogate key (AUD_SEQ_ID) in silver delta table
maxTableKey = DeltaTable.forPath(spark, silver_path).toDF().agg({"AUD_SEQ_ID":"max"}).collect()[0][0]
print(maxTableKey)

StatementMeta(spk3u4py3u10, 1, 197, Finished, Available, Finished)

848539


In [197]:
# Increment surrogate key in stage table by maxTableKey
RowsToInsert = RowsToInsert.withColumn("AUD_SEQ_ID", col("AUD_SEQ_ID") + maxTableKey)

StatementMeta(spk3u4py3u10, 1, 198, Finished, Available, Finished)

In [198]:
# Uncomment and run only if you receive a schema mismatch error in the next step when trying
#   to make an update to the silver table
# Returns list of columns in sam_monthly_df that don't exist in the sam_main_target_df df
res = [x for x in sam_main_target_df.columns + RowsToInsert.columns if x not in sam_main_target_df.columns or x not in RowsToInsert.columns]
print(res)

StatementMeta(spk3u4py3u10, 1, 199, Finished, Available, Finished)

[]


# <font size="5">12. Update the AUD_ACTIVE_FLAG of updated records in the target (silver) from 'Y' to 'N'

## <font size="4"> Option 1: Using Merge Command. This option is slower than Option 2 on this data set.

In [199]:
# # Merge statement to expire old records
# # Before Z Ordering this took 3 min 58 seconds
# # After Z ordering was 3 min 16 seconds
# DeltaTable.forPath(spark, silver_path).alias("original").merge(
#     source = RowsToInsert.alias("updates"),
#     condition = 'original.entityKey = updates.entityKey'
# ).whenMatchedUpdate(
#     condition = "original.AUD_ACTIVE_FLAG = 'Y' AND original.AUD_HASH_CODE <> updates.AUD_HASH_CODE",
#     set = {                                      
#         "AUD_ACTIVE_FLAG": "'N'",
#         "EffectiveToDate": lit(source_dt)
#     }
# ).execute()

StatementMeta(spk3u4py3u10, 1, 200, Finished, Available, Finished)

## <font size="4"> Option 2: Using replaceWhere Command. This option is faster than Option 1 on this data set.

### <font size="3"> 2(a): Create dataframe of records to change from Active to Inactive via left semi join on target data

In [200]:
# Create dataframe of target rows to flag as inactive
inactive_condition = [sam_main_target_df.entityKey == sam_monthly_df.entityKey, sam_main_target_df.AUD_HASH_CODE != sam_monthly_df.AUD_HASH_CODE]
InactiveRows = sam_main_target_df \
            .alias("target") \
            .where("AUD_ACTIVE_FLAG = 'Y'") \
            .join(sam_monthly_df.alias("source"), inactive_condition,'leftsemi') \
            .select(*silver_columns) \
            .orderBy(col('entityKey')) 
#display(InactiveRows.orderBy("entityKey"))

StatementMeta(spk3u4py3u10, 1, 201, Finished, Available, Finished)

In [201]:
# Count records to be flagged as inactive for comparison after write
# Note - it appears that the dataframe is emptied as it's written to the delta lake file
inactive_count = InactiveRows.count()
print(inactive_count)
print(RowsToInsert.count())

StatementMeta(spk3u4py3u10, 1, 202, Finished, Available, Finished)

482495
567967


In [203]:
# Flag rows as inactive and update EffectiveToDate
InactiveRows = InactiveRows.withColumn("AUD_ACTIVE_FLAG", lit('N')).withColumn("EffectiveToDate", to_date(lit(source_date), "yyyyMMdd"))

StatementMeta(spk3u4py3u10, 1, 204, Finished, Available, Finished)

In [205]:
# Create list of AUD_SEQ_IDs in the InactiveRows dataframe and convert them to a string to use in a FILTER clause
## Ex: [1, 2, 3, 4] becomes "AUD_SEQ_ID IN (1, 2, 3, 4)"
# replace_ids = []
# for i in InactiveRows.collect():
#     replace_ids.append(i['AUD_SEQ_ID'])
# Note - using rdd+flatMap instead of the loop method above decreased operation time significantly
replace_ids = InactiveRows.select("AUD_SEQ_ID").rdd.flatMap(lambda x: x).collect()
replace_clause = "AUD_SEQ_ID IN (" + str(replace_ids).replace('[','').replace(']','') + ")"

StatementMeta(spk3u4py3u10, 1, 206, Finished, Available, Finished)

### <font size="3"> 2(b) Update records in target using "replaceWhere" using the AUD_SEQ_ID filter defined in previous step

In [206]:
# Update records in target using "replaceWhere" using the AUD_SEQ_ID filter defined in previous step
# Write operation took 1 min 7 sec to replace 8738 records (daily file)
# Write operation took 74 minutes to replace 482495 records (public monthly file) when partitioned on AUD_ACTIVE_FLAG
# Write operation took 25 minutes to replace 482495 records (public monthly file) when partitioned on the first character of the UNIQUE ENTITY ID (16 partitions)
InactiveRows.write.format("delta").mode("overwrite").option("replaceWhere", replace_clause).save(silver_path)

StatementMeta(spk3u4py3u10, 1, 207, Finished, Available, Finished)

In [207]:
# Display 5 records that were changed from active to inactive
display(spark.read.format("delta").load(silver_path).filter("AUD_ACTIVE_FLAG == 'N' AND EffectiveToDate == '"+str(source_dt)+"'").limit(5))

StatementMeta(spk3u4py3u10, 1, 208, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a7c8f134-d12e-44e9-85ae-7d4b1b602b6e)

In [208]:
# Compre count of records flagged as inactive as of the source file date in the target data to the count obtained earlier from the data frame
# Should output True if write succeeded
print(spark.read.format("delta").load(silver_path).filter("AUD_ACTIVE_FLAG == 'N' AND EffectiveToDate == '"+str(source_dt)+"'").count() == inactive_count)

StatementMeta(spk3u4py3u10, 1, 209, Finished, Available, Finished)

True


# <font size="5">13. Insert new and updated records to target via delta append

In [209]:
# Insert/append the new and updated records to the target delta file
RowsToInsert.write.format("delta").mode("append").save(silver_path)

StatementMeta(spk3u4py3u10, 1, 210, Finished, Available, Finished)

In [210]:
# Display 5 of the newly inserted records
display(spark.read.format("delta").load(silver_path).filter("AUD_ACTIVE_FLAG == 'Y' AND EffectiveFromDate == '"+str(source_dt)+"'").limit(5))

StatementMeta(spk3u4py3u10, 1, 211, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, ac75d99c-bbbd-4d1a-b451-bfd1486a5786)