In [0]:
application_id="257f1bbb-d8da-46ed-9dc1-a417530e9324"
directory_id="654ad10e-3170-4844-bb2d-b2c7a9ae5a8d"
service_credential = dbutils.secrets.get('blob-scope-rawstorage123','service-credential-project')

spark.conf.set("fs.azure.account.auth.type.rawstorage123.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.rawstorage123.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.rawstorage123.dfs.core.windows.net", application_id)
spark.conf.set("fs.azure.account.oauth2.client.secret.rawstorage123.dfs.core.windows.net", service_credential)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.rawstorage123.dfs.core.windows.net", f"https://login.microsoftonline.com/{directory_id}/oauth2/token")

In [0]:
display(dbutils.fs.ls("abfss://processed@rawstorage123.dfs.core.windows.net/processed_renamed"))

path,name,size,modificationTime
abfss://processed@rawstorage123.dfs.core.windows.net/processed_renamed/processed_sales_data.csv,processed_sales_data.csv,8381,1733317256000


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

sales_schema = StructType([
    StructField("TransactionID",StringType(), True),
    StructField("CustomerName",StringType(), True),
    StructField("Product", StringType(), True),
    StructField("Quantity",DoubleType(), True),
    StructField("Region", StringType(), True),
    StructField("UnitPrice", DoubleType(), True),
    StructField("TransactionDate", DateType(), True),
    StructField("TotalAmount", IntegerType(), True),
])

df = spark.read.csv(
    "abfss://processed@rawstorage123.dfs.core.windows.net/processed_renamed/processed_sales_data.csv",
    header=True,
    schema=sales_schema
)
df.printSchema()

root
 |-- TransactionID: string (nullable = true)
 |-- CustomerName: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity: double (nullable = true)
 |-- Region: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- TransactionDate: date (nullable = true)
 |-- TotalAmount: integer (nullable = true)



In [0]:
from pyspark.sql.functions import current_timestamp

#df.withColumn("TransactionDate", current_timestamp()).write.mode("overwrite").option("mergeSchema", "true").option("overwriteSchema", "true").format("delta").save("abfss://staging@rawstorage123.dfs.core.windows.net/processed_sales_data").show()

df.withColumn("TimeStamp", current_timestamp()).show()

+--------------------+----------------+-------+--------+------+---------+---------------+-----------+--------------------+
|       TransactionID|    CustomerName|Product|Quantity|Region|UnitPrice|TransactionDate|TotalAmount|           TimeStamp|
+--------------------+----------------+-------+--------+------+---------+---------------+-----------+--------------------+
|c36c12af-d6ef-485...|   Darlene House| Laptop|    10.0| South|    372.0|     2024-03-01|       NULL|2024-12-09 12:19:...|
|6e0af861-fe73-43f...|   Stuart Barron| Laptop|     0.0| North|    481.0|     2024-04-21|       NULL|2024-12-09 12:19:...|
|a359713b-cad7-4e1...|   Maureen Lyons| Tablet|     2.0|  East|    490.0|     2024-06-03|       NULL|2024-12-09 12:19:...|
|fed09892-d3e8-492...|        John Lee| Laptop|     0.0| South|    345.0|     2024-04-03|       NULL|2024-12-09 12:19:...|
|998d01e2-7be6-445...|    Daniel Walls| Laptop|     7.0| South|    223.0|     2024-06-09|       NULL|2024-12-09 12:19:...|
|0c0c249a-20f3-4

In [0]:
from pyspark.sql.functions import expr, year,col

# Clean Price Per Unit and derive Total Amount
#df = df.withColumn("TotalAmount", col("Quantity") * col("UnitPrice"))
df = df.withColumn("DiscountedAmount", col("TotalAmount") * 0.9)

df = df.filter(col("Quantity") > 0)

# Add Transaction Year
#df = df.withColumn("TransactionYear", year(col("TransactionDate")))

# Write to Delta Lake
#df.write.format("csv").mode("overwrite").option("header", "true").save("abfss://staging@rawstorage123.dfs.core.windows.net/processed_sales_data.csv")

#output_path = "abfss://staging@rawstorage123.dfs.core.windows.net/processed_sales_data.csv"  # Replace with your desired output path

df.write.mode("overwrite").partitionBy("Region").format("csv").option("header", "true").save("abfss://staging@rawstorage123.dfs.core.windows.net/processed_sales_data.csv")

display(df)

TransactionID,CustomerName,Product,Quantity,Region,UnitPrice,TransactionDate,TotalAmount,DiscountedAmount
c36c12af-d6ef-485d-b81a-4dfd5016acba,Darlene House,Laptop,10.0,South,372.0,2024-03-01,,
a359713b-cad7-4e1a-8e9d-312ac06e65d8,Maureen Lyons,Tablet,2.0,East,490.0,2024-06-03,,
998d01e2-7be6-4456-9513-954309839c04,Daniel Walls,Laptop,7.0,South,223.0,2024-06-09,,
0c0c249a-20f3-422d-9699-532eee6d8874,Nicholas Cruz,Laptop,1.0,North,384.0,2024-10-10,,
8ec976ad-bd94-4694-983f-8f31d3dd5fbd,Andrea Morales,Laptop,3.0,East,361.0,2024-06-24,,
b01682d6-8b67-4156-bfee-5142b19ffae7,Ann Thomas,Laptop,1.0,East,379.0,2024-07-26,,
ae8ea305-06b2-428e-8bf9-c2c606d07170,Lisa Fowler,Laptop,9.0,North,295.0,2024-11-11,,
34be067d-e014-43a4-a40b-80a88483cf91,Christy Garcia,Laptop,3.0,North,103.0,2024-10-12,,
f116a929-0210-4c4a-818d-6486816b892b,Monica Lawson,Tablet,1.0,North,496.0,2024-06-12,,
c0869c82-9d9d-4374-b14a-ca8430def93e,Brandon Parker,Laptop,4.0,North,140.0,2024-03-21,,


Day 1

In [0]:
from pyspark.sql.functions import current_timestamp

# Add a new column with the current timestamp
df_with_date = df.withColumn("TransactionDate", current_timestamp())

# Write the DataFrame to Delta format with schema merge enabled
df_with_date.write.mode("overwrite").option("mergeSchema", "true").option("overwriteSchema", "true").format("delta").save("abfss://staging@rawstorage123.dfs.core.windows.net/processed_sales_data/")


#df.write.mode("overwrite").mode("overwrite").format("delta").save("abfss://staging@rawstorage123.dfs.core.windows.net/processed_sales_data")

Day 2

In [0]:
from delta.tables import DeltaTable

existing_data = DeltaTable.forPath(spark, "abfss://staging@rawstorage123.dfs.core.windows.net/processed_sales_data/")

existing_data.alias("existing").merge(df.alias("new"),"existing.OrderID=new.OrderID").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-2153813398806621>, line 5[0m
[1;32m      1[0m [38;5;28;01mfrom[39;00m [38;5;21;01mdelta[39;00m[38;5;21;01m.[39;00m[38;5;21;01mtables[39;00m [38;5;28;01mimport[39;00m DeltaTable
[1;32m      3[0m existing_data [38;5;241m=[39m DeltaTable[38;5;241m.[39mforPath(spark, [38;5;124m"[39m[38;5;124mabfss://staging@rawstorage123.dfs.core.windows.net/processed_sales_data/[39m[38;5;124m"[39m)
[0;32m----> 5[0m existing_data[38;5;241m.[39malias([38;5;124m"[39m[38;5;124mexisting[39m[38;5;124m"[39m)[38;5;241m.[39mmerge(df[38;5;241m.[39malias([38;5;124m"[39m[38;5;124mnew[39m[38;5;124m"[39m),[38;5;124m"[39m[38;5;124mexisting.OrderID=new.OrderID[39m[38;5;124m"[39m)[38;5;241m.[39mwhenMatchedUpdateAll()[38;5;241m.[39mwhenNotMatchedInsertAll()[38;5;241m.[39mexec