In [0]:
service_credential = dbutils.secrets.get(scope="adlsconnect",key="client-app-secret")

spark.conf.set("fs.azure.account.auth.type.streamingstoragelake1234.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.streamingstoragelake1234.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.streamingstoragelake1234.dfs.core.windows.net", "39add582-5553-41f7-bd5f-4f455edc1b05")
spark.conf.set("fs.azure.account.oauth2.client.secret.streamingstoragelake1234.dfs.core.windows.net", service_credential)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.streamingstoragelake1234.dfs.core.windows.net", "https://login.microsoftonline.com/a3881e6f-f92f-449c-b7e6-e9df56edaf30/oauth2/token")

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, TimestampType,DateType,IntegerType
from pyspark.sql.functions import sum,col,broadcast,round

schema = StructType([ 
    StructField("Order Id", StringType(), False), 
    StructField("Customer Name", StringType(), True), 
    StructField("Product ID", LongType(), True), 
    StructField("Quantity", LongType(),True),
    StructField("Price", DoubleType(),True),
    StructField("Order Date", StringType(), False),  
    StructField("Order Delivery Address", StringType(), False),    
    StructField("EventProcessedUtcTime", TimestampType(), True),
    StructField("PartitionId", LongType(), True),
    StructField("EventEnqueuedUtcTime", TimestampType(), True) 
])
# Example of how to use this schema when reading a JSON file
df = spark.read.format("parquet").schema(schema)\
    .load("abfss://stgfolder@streamingstoragelake1234.dfs.core.windows.net/stream_data")

renamed_df = df.select(col("Order Id").alias("orderid"),col("Customer Name").alias("customername"),col("Product Id").alias("productId"),col("Quantity"),col("price"),col("Order Date").cast(DateType()).alias("orderDate"),col("Order Delivery Address").alias("address"))

+--------------------+------------------+----------+--------+------+----------+----------------------+---------------------+-----------+--------------------+
|            Order ID|     Customer Name|Product ID|Quantity| Price|Order Date|Order Delivery Address|EventProcessedUtcTime|PartitionId|EventEnqueuedUtcTime|
+--------------------+------------------+----------+--------+------+----------+----------------------+---------------------+-----------+--------------------+
|32f06e7d-71ac-440...|       Robert Dean|         5|      18|941.35|2024-05-12|  USNS Reyes\nFPO A...| 2024-12-02 15:59:...|          3|2024-12-02 15:59:...|
|f51991a3-00c0-444...|     Brian Bridges|        13|       1|610.07|2024-08-24|  25797 Timothy Cor...| 2024-12-02 15:59:...|          3|2024-12-02 15:59:...|
|c8d01c31-3864-440...| Elizabeth Rose MD|        19|      12|268.63|2024-08-10|  743 Schaefer Junc...| 2024-12-02 15:59:...|          3|2024-12-02 15:59:...|
|dde77aca-3d14-43f...|  Frederick Stokes|        12|

In [0]:
date_schema = StructType([ 
                StructField("DateKey", IntegerType(), True), 
                StructField("Date", DateType(), True), 
                StructField("Year", IntegerType(), True), 
                StructField("Quarter", IntegerType(), True), 
                StructField("Month", IntegerType(), True), 
                StructField("Day", IntegerType(), True), 
                StructField("Weekday", IntegerType(), True), 
                StructField("WeekOfYear", IntegerType(), True), 
                StructField("DayName", StringType(), True), 
                StructField("IsHoliday", IntegerType(), True) ])

date_df = spark.read.format("csv").option("header",True).schema(date_schema).load("abfss://stgfolder@streamingstoragelake1234.dfs.core.windows.net/Date_Dimension/date_dimension.csv")

In [0]:
#Product details
product_schema = StructType([ 
                StructField("productId", IntegerType(), True), 
                StructField("productName", StringType(), True)])

product_df = spark.read.format("csv").option("header", "true").schema(product_schema).load("abfss://stgfolder@streamingstoragelake1234.dfs.core.windows.net/Product_Dimension/product_details.csv")


#product_df.write.format('delta').saveAsTable("products")

In [0]:
master_df = renamed_df\
    .join(date_df, renamed_df.orderDate == date_df.Date)\
    .join(broadcast(product_df), renamed_df.productId == product_df.productId)\
    .select(renamed_df.orderid,renamed_df.customername,renamed_df.productId,renamed_df.Quantity,renamed_df.price,renamed_df.orderDate,renamed_df.address,date_df.Date,date_df.Year,date_df.Month,date_df.Quarter,product_df.productName)

In [0]:
#Total Sales As on date
Total_Sales= master_df\
    .groupBy("productName")\
    .agg(sum("Quantity").alias("Total_Sold"), round(sum("price"),2).alias("total_amount"))\
    .select("productName","Total_Sold","total_amount")
Total_Sales.show()
Total_Sales\
    .write\
    .mode("overwrite")\
    .format('delta')\
    .save("abfss://aggdata@streamingstoragelake1234.dfs.core.windows.net/total_sales")

In [0]:
# Monthwise_Sold

month_wise_sales_df = master_df\
    .groupBy("Year","Month","productName")\
    .agg(sum("Quantity").alias("Total_Sold"), round(sum("price"),2).alias("total_amount"))\
    .select("productName","Year","Month","Total_Sold","total_amount")\
    .orderBy("Month")
month_wise_sales_df.show()
month_wise_sales_df.write.mode("overwrite").format('delta').partitionBy('Year','Month').save("abfss://aggdata@streamingstoragelake1234.dfs.core.windows.net/month_wise_sales")