In [1]:
from db_auth import connection_properties, jdbc_url, pyodbc_url

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pyodbc

In [3]:
def delete_data(tableName):
    conn = pyodbc.connect(pyodbc_url)
    cursor = conn.cursor()
    cursor.execute('DELETE FROM {tableName}'.format(tableName=tableName))
    conn.commit()
    cursor.close()
    conn.close()

In [4]:
def read_table(query):
    return spark.read.jdbc(url=jdbc_url, table=f'({query}) as query', properties=connection_properties)

In [5]:
def write_table(tableName, df):
    df.write.jdbc(url=jdbc_url, table=tableName, mode='append', properties=connection_properties)

In [6]:
spark = SparkSession.builder.appName("Silver_Layer").getOrCreate()

Order TABLE Data

In [7]:
orders_df = read_table('SELECT *, ROW_NUMBER() OVER(PARTITION BY order_id ORDER BY ingestion_timestamp) as row_number FROM raw.orders').filter(col('row_number') == 1)
orders_df= orders_df.withColumn('order_id', upper(col('order_id')))

In [8]:
silver_order_df = read_table('SELECT * FROM [silver].[Dim_Orders]').cache()

print(silver_order_df.count())

200000


In [9]:
curated_order_df = orders_df.alias('raw').join(silver_order_df.alias('silver'), orders_df.order_id == silver_order_df.order_id, 'left_outer')
curated_order_df = curated_order_df.withColumn('created_at', when(col('silver.order_id').isNull(), lit(current_timestamp())).otherwise(col('silver.created_at'))).withColumn('updated_at', when(col('silver.order_id').isNull(), lit(None)).otherwise(lit(current_timestamp())))
curated_order_df = curated_order_df.select('raw.order_id', 'raw.order_date', 'raw.shipping_address', 'created_at', 'updated_at')
curated_order_df.show()

+--------------------+-------------------+--------------------+--------------------+--------------------+
|            order_id|         order_date|    shipping_address|          created_at|          updated_at|
+--------------------+-------------------+--------------------+--------------------+--------------------+
|00011D63-9552-408...|2025-01-23 12:56:03|64417 Brooks Harb...|2025-02-09 15:54:...|2025-02-09 23:20:...|
|00026A9F-A28E-4A8...|2025-02-02 10:58:27|09160 Stevens Poi...|2025-02-09 15:54:...|2025-02-09 23:20:...|
|00037CE7-8A41-41A...|2025-01-27 15:12:06|14130 Natalie Inl...|2025-02-09 15:54:...|2025-02-09 23:20:...|
|00028F7A-21E2-445...|2025-01-22 06:42:04|5017 Lisa Burgs A...|2025-02-09 15:54:...|2025-02-09 23:20:...|
|00065BD1-7CDE-415...|2025-01-24 13:21:49|33974 Matthew Cre...|2025-02-09 15:54:...|2025-02-09 23:20:...|
|000276CB-A588-4BE...|2025-01-05 00:35:59|83452 Lopez Sprin...|2025-02-09 15:54:...|2025-02-09 23:20:...|
|00046F9B-8E73-496...|2025-01-24 08:35:31|9408

In [10]:
print('New data - ' ,curated_order_df.filter(col('updated_at').isNull()).count())
print('Updated data - ' ,curated_order_df.filter(col('updated_at').isNotNull()).count())

New data -  0
Updated data -  200000


In [11]:
delete_data('silver.Dim_Orders')

In [12]:
write_table('silver.Dim_Orders', curated_order_df)

Customer Table Data

In [13]:
customer_df = read_table('SELECT *, ROW_NUMBER() OVER(PARTITION BY customer_id ORDER BY ingestion_timestamp) as row_number FROM raw.[customer]').filter(col('row_number') == 1)
customer_df= customer_df.withColumn('customer_id', upper(col('customer_id')))

In [14]:
silver_customer_df = read_table('SELECT * FROM [silver].[Dim_Customers]').cache()

print(silver_customer_df.count())

200000


In [15]:
curated_customer_df = customer_df.alias('raw').join(silver_customer_df.alias('silver'), customer_df.customer_id == silver_customer_df.customer_id, 'left_outer')
curated_customer_df = curated_customer_df.withColumn('created_at', when(col('silver.customer_id').isNull(), lit(current_timestamp())).otherwise(col('silver.created_at'))).withColumn('updated_at', when(col('silver.customer_id').isNull(), lit(None)).otherwise(lit(current_timestamp())))
curated_customer_df= curated_customer_df.select('raw.customer_id', 'raw.name', 'raw.email', 'raw.address', 'raw.age', 'raw.gender', 'raw.signup_date', 'created_at', 'updated_at')

In [16]:
print('New data - ' ,curated_customer_df.filter(col('updated_at').isNull()).count())
print('Updated data - ' ,curated_customer_df.filter(col('updated_at').isNotNull()).count())

New data -  0
Updated data -  200000


In [17]:
delete_data('silver.Dim_Customers')

In [18]:
write_table('silver.Dim_Customers', curated_customer_df)

Product Table Data

In [19]:
product_df = read_table('SELECT *, ROW_NUMBER() OVER(PARTITION BY product_id ORDER BY ingestion_timestamp) as row_number FROM raw.[products]').filter(col('row_number') == 1)
product_df= product_df.withColumn('product_id', upper(col('product_id')))

In [20]:
silver_product_df = read_table('SELECT * FROM [silver].[Dim_Products]').cache()

print(silver_product_df.count())

200000


In [21]:
curated_product_df = product_df.alias('raw').join(silver_product_df.alias('silver'), product_df.product_id == silver_product_df.product_id, 'left_outer')
curated_product_df = curated_product_df.withColumn('created_at', when(col('silver.product_id').isNull(), lit(current_timestamp())).otherwise(col('silver.created_at'))).withColumn('updated_at', when(col('silver.product_id').isNull(), lit(None)).otherwise(lit(current_timestamp())))
curated_product_df= curated_product_df.select('raw.product_id', 'raw.name', 'raw.category', 'raw.brand', 'raw.cost_price', 'raw.selling_price', 'created_at', 'updated_at')

In [22]:
print('New data - ' ,curated_product_df.filter(col('updated_at').isNull()).count())
print('Updated data - ' ,curated_product_df.filter(col('updated_at').isNotNull()).count())

New data -  0
Updated data -  200000


In [23]:
delete_data('silver.Dim_Products')

In [24]:
write_table('silver.Dim_Products', curated_product_df)

Fact Sales Table Data

In [25]:
sales_df = read_table('SELECT *, ROW_NUMBER() OVER(PARTITION BY transaction_id ORDER BY ingestion_timestamp) as row_number FROM raw.sales_transaction').filter(col('row_number') == 1)
sales_df= sales_df.withColumn('transaction_id', upper(col('transaction_id'))).withColumn('order_id', upper(col('order_id'))).withColumn('customer_id', upper(col('customer_id'))).withColumn('product_id', upper(col('product_id')))

In [26]:
sales_df.show()

+--------------------+--------------------+--------------------+------------+--------+-----+-------------+-------------------+---------------+-------------------+--------------------+----------+
|      transaction_id|            order_id|         customer_id|total_amount|discount|  tax|refund_amount|   transaction_date|raw_data_source|ingestion_timestamp|          product_id|row_number|
+--------------------+--------------------+--------------------+------------+--------+-----+-------------+-------------------+---------------+-------------------+--------------------+----------+
|0000D098-A76D-44B...|24C5CAD7-9D99-4B4...|A2EDC8E4-71BF-436...|      496.21|   95.72|34.58|        11.96|2025-02-05 15:17:59|     POS System|2025-01-24 03:19:55|0D5ED4C5-8244-437...|         1|
|00028E8B-FAB2-4FB...|C2D1BC30-698D-4CD...|666156E3-A8D6-494...|      464.99|   65.44| 8.57|        34.85|2025-02-04 02:36:09|     POS System|2025-01-07 20:49:13|EA6912DB-C73C-446...|         1|
|0002BA29-481D-40E...|3AC

In [27]:
silver_sales_df = read_table('SELECT * FROM [silver].[Fact_Sales_Transactions] fst WHERE fst.order_id IN ( SELECT order_id FROM [silver].[Dim_Orders]) and fst.customer_id IN (SELECT customer_id FROM [silver].[Dim_Customers]) and fst.product_id IN (SELECT product_id FROM [silver].Dim_Products)')

print(silver_sales_df.count())

0


In [28]:
curated_sales_df = sales_df.alias('raw').join(silver_sales_df.alias('silver'), sales_df.transaction_id == silver_sales_df.transaction_id, 'left_outer')
curated_sales_df = curated_sales_df.withColumn('created_at', when(col('silver.transaction_id').isNull(), lit(current_timestamp())).otherwise(col('silver.created_at'))).withColumn('updated_at', when(col('silver.transaction_id').isNull(), lit(None)).otherwise(lit(current_timestamp())))
curated_sales_df = curated_sales_df.select('raw.transaction_id', 'raw.order_id', 'raw.customer_id', 'raw.product_id', 'raw.transaction_date', 'raw.discount', 'raw.tax', 'raw.refund_amount', 'raw.total_amount', 'created_at', 'updated_at')

In [29]:
curated_sales_df = curated_sales_df.withColumn('discount', coalesce(col('discount'), lit(0))).withColumn('tax', coalesce(col('tax'), lit(0))).withColumn('refund_amount', coalesce(col('refund_amount'), lit(0)))

In [30]:
delete_data('[silver].[Fact_Sales_Transactions]')

In [31]:
write_table('[silver].[Fact_Sales_Transactions]', curated_sales_df)