In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

## Initiating SparkSession

In [0]:
spark = SparkSession.builder.appName("Retaildata_transformation").getOrCreate()

In [0]:
configs = {
  "fs.azure.account.auth.type": "OAuth",
  "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
  "fs.azure.account.oauth2.client.id": "<CLIENT-ID>",
  "fs.azure.account.oauth2.client.secret": "<CLIENT-SECRET>",
  "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<TENANT-ID>/oauth2/token"
}

# Mount point
dbutils.fs.mount(
  source = "wasbs://mycontainer@mystorageaccount.blob.core.windows.net/",
  mount_point = "/mnt/newblobpipeline3",
  extra_configs = {
    "fs.azure.account.key.mystorageaccount.blob.core.windows.net": "your-actual-storage-account-key-here"
  }
)



In [0]:

df = spark.read.option("header", "true") \
               .format("csv") \
               .load("/mnt/newpipeline3/bronze/sample_retail_orders.csv")

display(df)


# df = spark.read.json("/mnt/newblobpipeline2/<your-file-name>.json")


OrderID,CustomerID,ProductName,OrderDate,Quantity,PricePerUnit,Country,DiscountPercent,TaxPercent,StockAvailable
ORD1001,CUST005,Hard Drive,12/29/2023,2,300.0,Australia,5.0,12.0,41
ORD1002,CUST003,Hard Drive,10/30/2023,4,,Germany,15.0,,29
ORD1003,CUST005,Tablet,10/8/2023,4,300.0,UK,10.0,12.0,1
ORD1004,CUST001,Speaker,10/22/2023,1,0.0,UK,0.0,5.0,38
ORD1005,CUST002,Webcam,11/29/2023,,75.5,UK,10.0,5.0,20
ORD1006,CUST004,Hard Drive,11/28/2023,4,50.0,USA,,5.0,48
ORD1007,,Webcam,10/30/2023,2,75.5,Spain,0.0,10.0,15
ORD1008,CUST004,Speaker,NotADate,1,300.0,Spain,10.0,,40
ORD1009,CUST005,,12/14/2023,,75.5,Australia,5.0,18.0,26
ORD1010,CUST001,Tablet,11/14/2023,,75.5,India,15.0,18.0,23


In [0]:
df1 = df.dropna(subset=["CustomerID","ProductName","OrderDate"])
display(df1)

OrderID,CustomerID,ProductName,OrderDate,Quantity,PricePerUnit,Country,DiscountPercent,TaxPercent,StockAvailable
ORD1001,CUST005,Hard Drive,12/29/2023,2,300.0,Australia,5.0,12.0,41
ORD1002,CUST003,Hard Drive,10/30/2023,4,,Germany,15.0,,29
ORD1003,CUST005,Tablet,10/8/2023,4,300.0,UK,10.0,12.0,1
ORD1004,CUST001,Speaker,10/22/2023,1,0.0,UK,0.0,5.0,38
ORD1005,CUST002,Webcam,11/29/2023,,75.5,UK,10.0,5.0,20
ORD1006,CUST004,Hard Drive,11/28/2023,4,50.0,USA,,5.0,48
ORD1008,CUST004,Speaker,NotADate,1,300.0,Spain,10.0,,40
ORD1010,CUST001,Tablet,11/14/2023,,75.5,India,15.0,18.0,23
ORD1011,CUST001,Charger,10/13/2023,five,50.0,Spain,0.0,18.0,39
ORD1012,CUST005,Webcam,10/15/2023,1,50.0,Canada,,10.0,15


In [0]:
df2 = df.fillna({"Quantity" :0, "PricePerUnit":0.0,"DiscountPercent":0,"TaxPercent":0 })
display(df2)

OrderID,CustomerID,ProductName,OrderDate,Quantity,PricePerUnit,Country,DiscountPercent,TaxPercent,StockAvailable
ORD1001,CUST005,Hard Drive,12/29/2023,2,300.0,Australia,5,12,41
ORD1002,CUST003,Hard Drive,10/30/2023,4,0.0,Germany,15,0,29
ORD1003,CUST005,Tablet,10/8/2023,4,300.0,UK,10,12,1
ORD1004,CUST001,Speaker,10/22/2023,1,0.0,UK,0,5,38
ORD1005,CUST002,Webcam,11/29/2023,0,75.5,UK,10,5,20
ORD1006,CUST004,Hard Drive,11/28/2023,4,50.0,USA,0,5,48
ORD1007,,Webcam,10/30/2023,2,75.5,Spain,0,10,15
ORD1008,CUST004,Speaker,NotADate,1,300.0,Spain,10,0,40
ORD1009,CUST005,,12/14/2023,0,75.5,Australia,5,18,26
ORD1010,CUST001,Tablet,11/14/2023,0,75.5,India,15,18,23


In [0]:
df4 = df2.withColumn("Quanrity",col("Quantity").cast("int"))\
         .withColumn("DiscountPercent", col("DiscountPercent").cast("double"))\
         .withColumn("OrderDate", to_date(col("OrderDate"), "yyyy-MM-dd"))\
         .withColumn("TotalAmount", expr("Quantity* PricePerUnit"))\
         .withColumn("Country", lower("Country"))
display(df4)

OrderID,CustomerID,ProductName,OrderDate,Quantity,PricePerUnit,Country,DiscountPercent,TaxPercent,StockAvailable,Quanrity,TotalAmount
ORD1001,CUST005,Hard Drive,,2,300.0,australia,5.0,12,41,2.0,600.0
ORD1002,CUST003,Hard Drive,,4,0.0,germany,15.0,0,29,4.0,0.0
ORD1003,CUST005,Tablet,,4,300.0,uk,10.0,12,1,4.0,1200.0
ORD1004,CUST001,Speaker,,1,0.0,uk,0.0,5,38,1.0,0.0
ORD1005,CUST002,Webcam,,0,75.5,uk,10.0,5,20,0.0,0.0
ORD1006,CUST004,Hard Drive,,4,50.0,usa,0.0,5,48,4.0,200.0
ORD1007,,Webcam,,2,75.5,spain,0.0,10,15,2.0,151.0
ORD1008,CUST004,Speaker,,1,300.0,spain,10.0,0,40,1.0,300.0
ORD1009,CUST005,,,0,75.5,australia,5.0,18,26,0.0,0.0
ORD1010,CUST001,Tablet,,0,75.5,india,15.0,18,23,0.0,0.0


In [0]:
df5 = df4.drop_duplicates(["OrderID","ProductName"])\
        .filter(col("Quantity") > 0)
display(df5)

OrderID,CustomerID,ProductName,OrderDate,Quantity,PricePerUnit,Country,DiscountPercent,TaxPercent,StockAvailable,Quanrity,TotalAmount
ORD1001,CUST005,Hard Drive,,2,300.0,australia,5.0,12,41,2,600.0
ORD1002,CUST003,Hard Drive,,4,0.0,germany,15.0,0,29,4,0.0
ORD1003,CUST005,Tablet,,4,300.0,uk,10.0,12,1,4,1200.0
ORD1004,CUST001,Speaker,,1,0.0,uk,0.0,5,38,1,0.0
ORD1006,CUST004,Hard Drive,,4,50.0,usa,0.0,5,48,4,200.0
ORD1007,,Webcam,,2,75.5,spain,0.0,10,15,2,151.0
ORD1008,CUST004,Speaker,,1,300.0,spain,10.0,0,40,1,300.0
ORD1012,CUST005,Webcam,,1,50.0,canada,0.0,10,15,1,50.0
ORD1014,,USB Drive,,4,0.0,australia,5.0,5,32,4,0.0
ORD1017,CUST002,Speaker,,4,50.0,spain,0.0,12,39,4,200.0


In [0]:
windowSpec = Window.partitionBy("CustomerID").orderBy(col("TotalAmount").desc())
df5 = df4.withColumn("Rank", rank().over(windowSpec))

In [0]:
df6=df5.write.mode("overwrite").option("header","true").save("/mnt/newpipeline3/silver/order_detail.csv")

In [0]:
df6=df5.write.format("delta")\
    .mode("overwrite")\
    .option("header","true")\
    .save("/mnt/newpipeline3/silver/order_detail.csv")