In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

StatementMeta(, 1f62512f-52a4-4d7a-98b4-bfe80c8e9970, 3, Finished, Available)

In [2]:
# create table schema

holidaysSchema = StructType([
    StructField('Country_or_region', StringType()),
    StructField('Holiday_name', StringType()),
    StructField('Norm_Holiday_name', StringType()),
    StructField('Paid_time_off',StringType()),
    StructField('Country_region_code', StringType()),
    StructField('Date', DateType()),
])

# loading in with * to catch new files
df = spark.read.format('csv').option('header', 'true').schema(holidaysSchema).load('Files/bronze/holidays_*')

StatementMeta(, 1f62512f-52a4-4d7a-98b4-bfe80c8e9970, 4, Finished, Available)

In [3]:
display(df.head(5))

StatementMeta(, 1f62512f-52a4-4d7a-98b4-bfe80c8e9970, 5, Finished, Available)

SynapseWidget(Synapse.DataFrame, 5e780f92-5106-4182-a4eb-06bc4c321e69)

In [4]:
# adding new coulmns to track creation and modification of records with timestamps
df = df.withColumn('FileName', input_file_name()) \
       .withColumn('CreatedUTC', current_timestamp()).withColumn('ModifiedUTC', current_timestamp())

StatementMeta(, 1f62512f-52a4-4d7a-98b4-bfe80c8e9970, 6, Finished, Available)

In [5]:
# checking for value consistency
column_name = 'Paid_time_off'
un = df.select(column_name).distinct()
un.show()

StatementMeta(, 1f62512f-52a4-4d7a-98b4-bfe80c8e9970, 7, Finished, Available)

+-------------+
|Paid_time_off|
+-------------+
|        False|
|         null|
|         True|
+-------------+



In [6]:
# based on the above I will update Paid_time_off if Paid_time_off is null to False
df = df.withColumn('Paid_time_off', when(col('Paid_time_off').isNull(), lit('False')).otherwise(col('Paid_time_off')))

StatementMeta(, 1f62512f-52a4-4d7a-98b4-bfe80c8e9970, 8, Finished, Available)

In [7]:
fix = df.select(column_name).distinct()
fix.show()

StatementMeta(, 1f62512f-52a4-4d7a-98b4-bfe80c8e9970, 9, Finished, Available)

+-------------+
|Paid_time_off|
+-------------+
|        False|
|         True|
+-------------+



In [8]:
df.dropDuplicates()

StatementMeta(, 1f62512f-52a4-4d7a-98b4-bfe80c8e9970, 10, Finished, Available)

DataFrame[Country_or_region: string, Holiday_name: string, Norm_Holiday_name: string, Paid_time_off: string, Country_region_code: string, Date: date, FileName: string, CreatedUTC: timestamp, ModifiedUTC: timestamp]

In [9]:
# drop blank country Country_region_code and Country_or_region
columns_to_check = ['Country_region_code', 'Country_or_region']

# Drop rows where both specified columns are null or empty
df = df.dropna(subset=columns_to_check, how="all")

StatementMeta(, 1f62512f-52a4-4d7a-98b4-bfe80c8e9970, 11, Finished, Available)

In [10]:
# checking column values 
c_name = 'Country_region_code'
a = df.select(c_name).distinct()
a.show()

StatementMeta(, 1f62512f-52a4-4d7a-98b4-bfe80c8e9970, 12, Finished, Available)

+-------------------+
|Country_region_code|
+-------------------+
|                 FI|
|                 UA|
|                 NL|
|                 PL|
|                 MX|
|                 AT|
|                 HR|
|                 CZ|
|               null|
|                 PT|
|                 AU|
|                 CA|
|                 GB|
|                 BR|
|                 BY|
|                 DE|
|                 ES|
|                 ZA|
|                 US|
|                 IN|
+-------------------+
only showing top 20 rows



In [11]:
# Filter rows where the specified column has null values
column_to_check_nulls = 'Country_region_code'
filter_nulls = df.filter(col(column_to_check_nulls).isNull())

display(filter_nulls)

StatementMeta(, 1f62512f-52a4-4d7a-98b4-bfe80c8e9970, 13, Finished, Available)

SynapseWidget(Synapse.DataFrame, d8c6c195-97f9-4803-b884-39a9f0fd5527)

In [12]:
# fill null values in the specified column with 'GB'
df = df.fillna({'Country_region_code': 'GB'})

StatementMeta(, 1f62512f-52a4-4d7a-98b4-bfe80c8e9970, 14, Finished, Available)

CREATING DELTA TABLE

In [13]:
from delta.tables import *

StatementMeta(, 1f62512f-52a4-4d7a-98b4-bfe80c8e9970, 15, Finished, Available)

In [14]:
# create schema for the holidays_silver table
DeltaTable.createIfNotExists(spark) \
     .tableName('holidays.holidays_silver') \
     .addColumn('Country_or_region', StringType()) \
     .addColumn('Holiday_name', StringType()) \
     .addColumn('Norm_Holiday_name', StringType()) \
     .addColumn('Paid_time_off', BooleanType()) \
     .addColumn('Country_region_code', StringType()) \
     .addColumn('Date', DateType()) \
     .addColumn('CreatedUTC', DateType()) \
     .addColumn('ModifiedUTC', DateType()) \
     .execute()

StatementMeta(, 1f62512f-52a4-4d7a-98b4-bfe80c8e9970, 16, Finished, Available)

<delta.tables.DeltaTable at 0x7f25110c0100>

In [15]:
# updating existing records in the holidays_silver table and inserting new ones based on Holiday_name, Country_region_code and Date

deltaTable = DeltaTable.forPath(spark, 'Tables/holidays_silver')

dfUpdates = df

deltaTable.alias('silver') \
   .merge(
    dfUpdates.alias('updates'),
    'silver.Holiday_name = updates.Holiday_name and silver.Country_region_code = updates.Country_region_code and silver.Date = updates.Date'
    ) \
    .whenNotMatchedInsert(values =
       {

        'Country_or_region': 'updates.Country_or_region',
        'Holiday_name': 'updates.Holiday_name',
        'Norm_Holiday_name': 'updates.Norm_Holiday_name',
        'Paid_time_off': 'updates.Paid_time_off',
        'Country_region_code': 'updates.Country_region_code',
        'Date': 'updates.Date',
        'CreatedUTC': 'updates.CreatedUTC',
        'ModifiedUTC': 'updates.ModifiedUTC'
       }
    ) \
    .execute()

StatementMeta(, 1f62512f-52a4-4d7a-98b4-bfe80c8e9970, 17, Finished, Available)

Additionally:

1. Introduce Great Expectations framework for data validation from bronze to silver layer
