Creating prod ready dynamic codeblocks to have modular and reusable code blocks


In [0]:
raw_path="dbfs:/FileStore/tables/orders.csv"
bronze_path="/tmp/bronze/orders"

**Step 2: **
Read the CSV into the Dataframe

In [0]:
df_raw= spark.read.option("header",True).csv(raw_path)
df_raw.show()
df_raw.printSchema()

+--------+-----------+----------+---------+------------+
|order_id|customer_id|order_date|   status|total_amount|
+--------+-----------+----------+---------+------------+
|    1001|        201|2024-05-01|  Shipped|       250.5|
|    1002|        202|2024-05-02|  Pending|       120.0|
|    1003|        203|2024-05-02|Delivered|       300.0|
|    1004|        204|2024-05-03|  Shipped|      175.75|
|    1005|        205|2024-05-03|Cancelled|        90.0|
+--------+-----------+----------+---------+------------+

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- status: string (nullable = true)
 |-- total_amount: string (nullable = true)



**Step3:** Quick Quality Checks

In [0]:
print("Row count:", df_raw.count())
df_raw.select("status").distinct().show()

Row count: 5
+---------+
|   status|
+---------+
|  Shipped|
|Cancelled|
|Delivered|
|  Pending|
+---------+



**Step 4:**
Writing to Delta format

In [0]:
df_raw.write.format("delta").mode("overwrite").save(bronze_path)

This ensures the data is stored in delta lake format which is versioned, supports ACID transactions and is Highly optimised for powerbi, fabric, spark


**Step 5**
Reading it back and validating

In [0]:
df_bronze=spark.read.format("delta").load(bronze_path)
df_bronze.show()

+--------+-----------+----------+---------+------------+
|order_id|customer_id|order_date|   status|total_amount|
+--------+-----------+----------+---------+------------+
|    1001|        201|2024-05-01|  Shipped|       250.5|
|    1002|        202|2024-05-02|  Pending|       120.0|
|    1003|        203|2024-05-02|Delivered|       300.0|
|    1004|        204|2024-05-03|  Shipped|      175.75|
|    1005|        205|2024-05-03|Cancelled|        90.0|
+--------+-----------+----------+---------+------------+



In [0]:
def load_csv(path):
    return spark.read.option("header", True).csv(path)
def write_delta(df,path)):
    return df.write.format("delta").mode("overwrite").save(path)
