### YEAR PRICE TREND

In [0]:
from pyspark.sql import functions as F

silver_lineitem_path ="abfss://silver@retailkiran.dfs.core.windows.net/retail/lineitem_cleaned"
silver_orders_path = "abfss://silver@retailkiran.dfs.core.windows.net/retail/orders_cleaned"

lineitem = spark.read.format("delta").load(silver_lineitem_path)
orders = spark.read.format("delta").load(silver_orders_path)

In [0]:
year_price_trend = lineitem.join(orders, "order_id") \
    .withColumn("order_year", F.year("order_date")) \
    .groupBy("order_year") \
    .agg(
        F.avg("base_price").alias("avg_retail_price"),
        F.min("base_price").alias("min_price"),
        F.max("base_price").alias("max_price"),
        (F.max("base_price") - F.min("base_price")).alias("price_volatility")
    ) \
    .orderBy("order_year")

In [0]:
display(year_price_trend)

order_year,avg_retail_price,min_price,max_price,price_volatility
1992,35580.354365,904.0,94799.5,93895.5
1993,35920.360799,913.01,93848.5,92935.49
1994,35833.564762,907.0,94749.5,93842.5
1995,36084.82579,906.0,94399.0,93493.0
1996,35758.957663,904.0,94949.5,94045.5
1997,35485.554991,909.0,94899.5,93990.5
1998,35654.054356,915.01,94499.0,93583.99


In [0]:
year_price_trend.write.format("delta")\
             .mode("overwrite")\
             .saveAsTable("kirandb.retail.year_price_trend")
        

In [0]:
%sql
select * from kirandb.retail.year_price_trend;

order_year,avg_retail_price,min_price,max_price,price_volatility
1992,35580.354365,904.0,94799.5,93895.5
1993,35920.360799,913.01,93848.5,92935.49
1994,35833.564762,907.0,94749.5,93842.5
1995,36084.82579,906.0,94399.0,93493.0
1996,35758.957663,904.0,94949.5,94045.5
1997,35485.554991,909.0,94899.5,93990.5
1998,35654.054356,915.01,94499.0,93583.99


## CUSTOMER SUMMARY

In [0]:
df_gold = spark.read.format("delta").load("abfss://silver@retailkiran.dfs.core.windows.net/retail/customer_cleaned")

df_gold_segmented = df_gold.withColumn(
    "customer_priority",
    F.when(F.col("account_balance") > 7000, "Platinum")
     .when(F.col("account_balance") > 3000, "Gold")
     .otherwise("Standard")
)

display(df_gold_segmented)

customer_id,customer_name_id,address,nation_id,phone_number,country_code,account_balance,market_segment,comment,customer_priority
148,148,BhSPlEWGvIJyT9swk vCWE,11,21-562-498-6636,21,2135.6,household,ing to the carefully ironic requests. carefully regular dependencies about the theodolites wake furious,Standard
463,463,"LV7MN7Tkm2NSo4Q3lwvjxGQyRJjRZRf,M",8,18-167-214-5805,18,-654.5,household,quickly along the final ideas. slyly regular accounts are iro,Standard
471,471,tGr0DtrK 91IgzfeZrSPpPIia3,4,14-574-118-1005,14,5716.9,furniture,es. unusual accounts try to solve ca,Gold
496,496,Y8oYLlHme6Z4fEzkTu,12,22-173-644-7922,22,8174.82,machinery,quickly bold packages. decoys among the blithely pending accounts lose according to the deposits.,Platinum
833,833,"t3qDCo,Yh MZcJFV6PibeY,MUunz",6,16-624-307-4875,16,-526.14,furniture,uickly final orbits across the blithely express accounts integrate furiously among the final sheaves. blithe,Standard
1088,1088,"YjXQtOJoM0nhClEy0,WFdNxvJ1g6xpn kL2ommEv",22,32-324-225-2635,32,2098.62,building,ly special ideas. slyly unusual requests haggle,Standard
1238,1238,"HGCJI27,RIIQcS20,DcJbMQuUmN3Vhzdm",6,16-302-171-7578,16,4299.22,building,"ly special requests. unusual, special asymptotes according to the blithely express pinto beans wake en",Gold
1342,1342,FD6UNqfsYMKkf3ZFZdI4EaYMZ,16,26-340-733-2096,26,1520.34,machinery,"y around the final, special foxes.",Standard
243,243,te2FOn8xJzJinZc,7,17-297-684-7972,17,620.73,automobile,nic deposits. evenly pending deposits boost fluffily careful,Standard
392,392,H7M6JObndO,17,27-601-793-2507,27,8492.33,building,efully bold ideas. bold requests sleep carefully blithe instructions. carefully final accounts are blithely quickly,Platinum


In [0]:
df_customer_summary = df_gold_segmented.groupBy("market_segment") \
    .agg(
        F.count("customer_id").alias("total_customers"),
        F.sum("account_balance").alias("total_customer_value"),
        F.avg("account_balance").alias("avg_customer_balance")
    )

display(df_customer_summary)

market_segment,total_customers,total_customer_value,avg_customer_balance
building,337,1444587.8,4286.610682
machinery,288,1296958.61,4503.328507
furniture,279,1265282.8,4535.063799
household,294,1279340.66,4351.498844
automobile,302,1395695.72,4621.509007


In [0]:
df_customer_summary.write.format("delta")\
             .mode("overwrite")\
             .saveAsTable("kirandb.retail.customer_summary") 

In [0]:
%sql
select * from kirandb.retail.customer_summary;

market_segment,total_customers,total_customer_value,avg_customer_balance
building,337,1444587.8,4286.610682
machinery,288,1296958.61,4503.328507
furniture,279,1265282.8,4535.063799
household,294,1279340.66,4351.498844
automobile,302,1395695.72,4621.509007


## GEO DATA (NATION & REGION)

In [0]:

nations = spark.read.format("delta").load("abfss://silver@retailkiran.dfs.core.windows.net/retail/nation_cleaned")
regions = spark.read.format("delta").load("abfss://silver@retailkiran.dfs.core.windows.net/retail/region_cleaned")

df_geo = nations.join(
    regions, 
    nations.region_id == regions.region_id, 
    "inner"
).select(
    nations["nation_id"],
    nations["nation_name"],
    regions["region_name"].alias("parent_region")
)
display(df_geo)

nation_id,nation_name,parent_region
12,JAPAN,ASIA
22,RUSSIA,EUROPE
1,ARGENTINA,AMERICA
13,JORDAN,MIDDLE EAST
6,FRANCE,EUROPE
16,MOZAMBIQUE,AFRICA
3,CANADA,AMERICA
20,SAUDI ARABIA,MIDDLE EAST
5,ETHIOPIA,AFRICA
19,ROMANIA,EUROPE


In [0]:
df_geo.write.format("delta")\
             .mode("overwrite")\
             .saveAsTable("kirandb.retail.geo")

### LINE ITEM PERFORMANCE COMPARISON

In [0]:
from pyspark.sql import functions as F

customers = spark.read.format("delta").load("abfss://silver@retailkiran.dfs.core.windows.net/retail/customer_cleaned")
nations = spark.read.format("delta").load("abfss://silver@retailkiran.dfs.core.windows.net/retail/nation_cleaned")
regions = spark.read.format("delta").load("abfss://silver@retailkiran.dfs.core.windows.net/retail/region_cleaned")
orders = spark.read.format("delta").load("abfss://silver@retailkiran.dfs.core.windows.net/retail/orders_cleaned")
lineitem = spark.read.format("delta").load("abfss://silver@retailkiran.dfs.core.windows.net/retail/lineitem_cleaned")

line_item_comparison = lineitem \
    .join(orders, "order_id") \
    .join(customers, "customer_id") \
    .join(nations, "nation_id") \
    .join(regions, "region_id") \
    .groupBy("region_name", "market_segment", "shipping_mode") \
    .agg(
        F.sum("quantity").alias("total_qty"),
        F.avg(F.datediff("receipt_date", "ship_date")).alias("avg_shipping_days")
    )

display(line_item_comparison)

region_name,market_segment,shipping_mode,total_qty,avg_shipping_days
AMERICA,automobile,RAIL,7684.0,15.942372881355931
MIDDLE EAST,automobile,MAIL,9226.0,16.333333333333332
AFRICA,automobile,TRUCK,9357.0,14.870689655172416
AFRICA,building,SHIP,12564.0,15.816733067729084
EUROPE,household,AIR,7128.0,14.4468085106383
MIDDLE EAST,furniture,REG AIR,11424.0,15.53579676674365
MIDDLE EAST,automobile,SHIP,8794.0,15.882183908045976
ASIA,household,RAIL,7929.0,15.286184210526317
AMERICA,household,REG AIR,7451.0,15.379661016949152
EUROPE,household,REG AIR,7461.0,15.206779661016949


In [0]:
line_item_comparison.write.format("delta")\
             .mode("overwrite")\
             .saveAsTable("kirandb.retail.line_item_comparison")

In [0]:
%sql
select * from kirandb.retail.line_item_comparison

region_name,market_segment,shipping_mode,total_qty,avg_shipping_days
AMERICA,automobile,RAIL,7684.0,15.942372881355931
MIDDLE EAST,automobile,MAIL,9226.0,16.333333333333332
AFRICA,automobile,TRUCK,9357.0,14.870689655172416
AFRICA,building,SHIP,12564.0,15.816733067729084
EUROPE,household,AIR,7128.0,14.4468085106383
MIDDLE EAST,furniture,REG AIR,11424.0,15.53579676674365
MIDDLE EAST,automobile,SHIP,8794.0,15.882183908045976
ASIA,household,RAIL,7929.0,15.286184210526317
AMERICA,household,REG AIR,7451.0,15.379661016949152
EUROPE,household,REG AIR,7461.0,15.206779661016949


### MONTHLY AND QUARTERLY PRICING ANALYSIS

In [0]:
from pyspark.sql import functions as F

lineitem = spark.read.format("delta").load("abfss://silver@retailkiran.dfs.core.windows.net/retail/lineitem_cleaned")
orders = spark.read.format("delta").load("abfss://silver@retailkiran.dfs.core.windows.net/retail/orders_cleaned")

monthly_pricing_summary = lineitem.join(orders, "order_id") \
    .withColumn("order_month", F.trunc("order_date", "MM")) \
    .groupBy("order_month") \
    .agg(
        F.sum("total_price").alias("gross_revenue"),
        F.avg("base_price").alias("avg_base_price"),
        F.sum(F.col("base_price") * F.col("discount_percentage")).alias("total_discount_value"),
        F.countDistinct("order_id").alias("total_orders")
    ) \
    .orderBy("order_month")

display(monthly_pricing_summary)

order_month,gross_revenue,avg_base_price,total_discount_value,total_orders
1992-01-01,154064495.19,36892.157515,1625346.1773,203
1992-02-01,122379374.89,34943.734497,1312118.6187,185
1992-03-01,155082347.97,35300.401756,1465261.8955,202
1992-04-01,134746014.56,34632.800556,1389367.8725,207
1992-05-01,137414466.74,34733.406442,1390662.2564,202
1992-06-01,126448979.15,36405.891544,1201468.4748,170
1992-07-01,132912401.52,36364.572306,1310238.3225,174
1992-08-01,149846768.53,36565.4627,1390334.3782,197
1992-09-01,143524755.84,35368.652828,1388784.5183,177
1992-10-01,123387679.27,34074.454425,1382999.0783,212


In [0]:
monthly_pricing_summary.write.format("delta")\
             .mode("overwrite")\
             .saveAsTable("kirandb.retail.monthly_pricing_summary")

In [0]:
quarterly_pricing_analysis = monthly_pricing_summary \
    .withColumn("order_quarter", F.concat(F.year("order_month"), F.lit("-Q"), F.quarter("order_month"))) \
    .groupBy("order_quarter") \
    .agg(
        F.sum("gross_revenue").alias("quarterly_revenue"),
        F.avg("avg_base_price").alias("avg_quarterly_price"),
        F.sum("total_orders").alias("total_orders")
    ) \
    .orderBy("order_quarter")

display(quarterly_pricing_analysis)

order_quarter,quarterly_revenue,avg_quarterly_price,total_orders
1992-Q1,431526218.05,35712.0979226667,590
1992-Q2,398609460.45,35257.3661806667,579
1992-Q3,426283925.89,36099.5626113333,548
1992-Q4,355496752.29,35346.5764546667,539
1993-Q1,378781767.6,36165.2701813333,539
1993-Q2,398314285.82,35878.549179,575
1993-Q3,419815759.86,35324.6538716667,582
1993-Q4,454881876.71,36351.6033336667,611
1994-Q1,429637319.4,35839.904333,579
1994-Q2,402857590.1,35468.4416136667,593


In [0]:
quarterly_pricing_analysis.write.format("delta")\
             .mode("overwrite")\
             .saveAsTable("kirandb.retail.quarterly_pricing_analysis")

### PRODUCT RETURN ANALYSIS

In [0]:
parts = spark.read.format("delta").load("abfss://silver@retailkiran.dfs.core.windows.net/retail/part_cleaned")
lineitem = spark.read.format("delta").load("abfss://silver@retailkiran.dfs.core.windows.net/retail/lineitem_cleaned")

product_return_analysis = lineitem.join(parts, "part_id") \
    .groupBy("brand", "part_type") \
    .agg(
        F.count("*").alias("total_items"),
        F.sum(F.when(F.col("return_flag") == "R", 1).otherwise(0)).alias("returned_items_count")
    ) \
    .withColumn("return_rate", F.col("returned_items_count") / F.col("total_items")) \
    .filter(F.col("total_items") > 10) \
    .orderBy(F.desc("return_rate"))
display(product_return_analysis)

brand,part_type,total_items,returned_items_count,return_rate
Brand#31,STANDARD BURNISHED TIN,28,15,0.5357142857142857
Brand#51,PROMO ANODIZED COPPER,34,18,0.5294117647058824
Brand#21,SMALL BRUSHED NICKEL,22,11,0.5
Brand#15,STANDARD BRUSHED COPPER,27,13,0.4814814814814814
Brand#14,SMALL BRUSHED NICKEL,17,8,0.4705882352941176
Brand#32,SMALL ANODIZED TIN,32,15,0.46875
Brand#51,SMALL POLISHED STEEL,28,13,0.4642857142857143
Brand#33,PROMO PLATED COPPER,26,12,0.4615384615384615
Brand#14,SMALL BURNISHED TIN,24,11,0.4583333333333333
Brand#55,LARGE BRUSHED COPPER,31,14,0.4516129032258064


In [0]:
product_return_analysis.write.format("delta")\
             .mode("overwrite")\
             .saveAsTable("kirandb.retail.product_return_analysis")

In [0]:
%sql
select * from kirandb.retail.product_return_analysis;

brand,part_type,total_items,returned_items_count,return_rate
Brand#31,STANDARD BURNISHED TIN,28,15,0.5357142857142857
Brand#51,PROMO ANODIZED COPPER,34,18,0.5294117647058824
Brand#21,SMALL BRUSHED NICKEL,22,11,0.5
Brand#15,STANDARD BRUSHED COPPER,27,13,0.4814814814814814
Brand#14,SMALL BRUSHED NICKEL,17,8,0.4705882352941176
Brand#32,SMALL ANODIZED TIN,32,15,0.46875
Brand#51,SMALL POLISHED STEEL,28,13,0.4642857142857143
Brand#33,PROMO PLATED COPPER,26,12,0.4615384615384615
Brand#14,SMALL BURNISHED TIN,24,11,0.4583333333333333
Brand#55,LARGE BRUSHED COPPER,31,14,0.4516129032258064
