In [0]:
clientid = dbutils.secrets.get(scope="team6scope", key="clientid")
tenantid = dbutils.secrets.get(scope="team6scope", key="tenantid")
secretid = dbutils.secrets.get(scope="team6scope", key="secretid")
spark.conf.set("fs.azure.account.auth.type.team6geocartdata.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.team6geocartdata.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.team6geocartdata.dfs.core.windows.net", clientid)
spark.conf.set("fs.azure.account.oauth2.client.secret.team6geocartdata.dfs.core.windows.net", secretid)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.team6geocartdata.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenantid}/oauth2/token")

In [0]:
# Load Delta tables from ADLS
customers_df = spark.read.format("delta").load("abfss://source@team6geocartdata.dfs.core.windows.net/geocartdata/staging/customers/")
products_df = spark.read.format("delta").load("abfss://source@team6geocartdata.dfs.core.windows.net/geocartdata/staging/products/")
transactions_df = spark.read.format("delta").load("abfss://source@team6geocartdata.dfs.core.windows.net/geocartdata/staging/transactions/")
regions_df = spark.read.format("delta").load("abfss://source@team6geocartdata.dfs.core.windows.net/geocartdata/staging/regions/")
support_tickets_df = spark.read.format("delta").load("abfss://source@team6geocartdata.dfs.core.windows.net/geocartdata/staging/support_tickets/")


In [0]:
gold_base_path = "abfss://source@team6geocartdata.dfs.core.windows.net/geocartdata/curated/"

In [0]:
from pyspark.sql import *
from pyspark.sql.functions import *

## **Sales By Region**

In [0]:
from pyspark.sql.functions import col
sales_by_region = (
    transactions_df.join(regions_df, "RegionID", "inner")
    .groupBy("RegionName")
    .agg({"TotalAmount": "sum"})
    .withColumnRenamed("sum(TotalAmount)", "TotalSales")
    .orderBy(col("TotalSales").desc())
)
sales_by_region.show(10, truncate=False)


+-----------+--------------------+
|RegionName |TotalSales          |
+-----------+--------------------+
|Region-3654|1.3638004575430667E7|
|Region-815 |1.3335064376667537E7|
|Region-7665|1.2334079234548334E7|
|Region-3604|1.1653421306940306E7|
|Region-1285|1.139569831539592E7 |
|Region-8170|1.083747387248129E7 |
|Region-3238|1.0653637940350227E7|
|Region-5614|1.0024679865985813E7|
|Region-4755|9989171.688394729   |
|Region-7849|9340276.778022025   |
+-----------+--------------------+
only showing top 10 rows


In [0]:
sales_by_region.write.format("delta").mode("overwrite").save(gold_base_path + "sales_by_region/")

## **Customer Lifetime Value (CLV)**

In [0]:
from pyspark.sql.functions import sum, col
clv = (
    transactions_df.groupBy("CustomerID")
    .agg(
        sum("TotalAmount").alias("LifetimeValue"))
    .join(customers_df, "CustomerID", "left")
    .select("CustomerID", "FirstName", "LastName", "RegionID", "LifetimeValue")
    .orderBy(col("LifetimeValue").desc())
)
clv.show(10, truncate=False)


+----------+---------+--------+--------+--------------------+
|CustomerID|FirstName|LastName|RegionID|LifetimeValue       |
+----------+---------+--------+--------+--------------------+
|4047      |Ananya   |Brown   |8996    |1.3625221655430667E7|
|940       |Olivia   |Wilson  |6566    |1.3319647166667538E7|
|5502      |Arjun    |Jones   |7916    |1.2823285532700334E7|
|1156      |David    |Singh   |8174    |1.2316819534548335E7|
|5992      |Emma     |Das     |7163    |1.1653421306940306E7|
|3985      |Aarav    |Sharma  |6802    |1.1388883135395918E7|
|5293      |Sneha    |Kapoor  |1354    |1.083973817248129E7 |
|4566      |Riya     |Nair    |2673    |1.0660843670350228E7|
|5217      |Olivia   |Khan    |6412    |1.0016889325985812E7|
|9105      |Amelia   |Khan    |3521    |1.0001102218394728E7|
+----------+---------+--------+--------+--------------------+
only showing top 10 rows


In [0]:
clv.write.format("delta").mode("overwrite").save(gold_base_path + "customer_lifetime_value/")

## **Product Category Performance**

In [0]:
from pyspark.sql.functions import sum, avg, col

product_perf = (
    transactions_df.join(products_df, "ProductID", "inner")
    .groupBy("Category")
    .agg(
        sum("TotalAmount").alias("CategoryRevenue"),
        avg("Rating").alias("AvgRating")
    )
    .orderBy(col("CategoryRevenue").desc())
)
display(product_perf)

Category,CategoryRevenue,AvgRating
Beauty,79641464.88638175,3.536225997045787
Toys,73887413.42082849,3.4959206586826355
Sports,68214815.27685143,3.4777051792828746
Electronics,53500210.4288948,3.5265604575163385
Fashion,52622657.11560352,3.4822301024428746
Home & Kitchen,49340959.31936533,3.5163492063492097
Grocery,37847586.3175925,3.5066068642745725


In [0]:
product_perf.write.format("delta").mode("overwrite").save(gold_base_path + "product_category_performance/")


## **Complaint Resolution Efficiency**


In [0]:
from pyspark.sql.functions import col, sum as spark_sum, round as spark_round, lit
from functools import reduce

# Load Data
customers_df = spark.read.format("delta").load(
    "abfss://source@team6geocartdata.dfs.core.windows.net/geocartdata/staging/customers/"
)
support_tickets_df = spark.read.format("delta").load(
    "abfss://source@team6geocartdata.dfs.core.windows.net/geocartdata/staging/support_tickets/"
)

# Join to map Customer â†’ Country
tickets_with_country = (
    support_tickets_df.alias("t")
    .join(
        customers_df.select("CustomerID", "Country").alias("c"),
        col("t.CustomerID") == col("c.CustomerID"),
        "left"
    )
)

# Pivot Table
pivot_df = (
    tickets_with_country
    .groupBy("AgentName")
    .pivot("Country")
    .agg(spark_round(spark_sum("ResolutionTimeMins"), 2))
)

# Add Total Column
country_cols = [c for c in pivot_df.columns if c != "AgentName"]

if country_cols:
    pivot_df = pivot_df.withColumn(
        "Total",
        spark_round(reduce(lambda a, b: a + col(b), country_cols[1:], col(country_cols[0])), 2)
    )

# Compute sum for each country column
agg_exprs = [spark_sum(c).alias(c) for c in country_cols]

# Aggregate to get one-row dataframe
total_row = pivot_df.agg(*agg_exprs)

# Compute grand total after aggregation
grand_total = total_row.select(
    reduce(lambda a, b: a + b, [col(c) for c in country_cols]).alias("Total")
)

# Add AgentName column
total_row = (
    total_row.crossJoin(grand_total)
             .withColumn("AgentName", lit("Total"))
             .select(["AgentName"] + country_cols + ["Total"])
)

# Union pivot table + total row
final_df = pivot_df.unionByName(total_row)

display(final_df)

AgentName,null,Australia,Canada,France,Germany,India,Uk,Usa,Total
Harini,204311.0,5533658.0,16737872.69,1314257.13,5555473.0,807796.94,2697484.0,748827.0,33599679.76
Diana,1004610.75,8127129.5,629397.0,4278827.5,2268496.63,6140728.0,7556693.0,617340.0,30623222.38
Jia,3969726.25,3952486.78,7492312.75,5722839.0,6370104.63,679796.0,624123.0,4458000.5,33269388.91
Carlos,196866.0,3421760.23,669106.0,593257.0,882247.22,3655850.0,5346333.5,647443.0,15412862.95
Ethan,272396.0,762109.91,4073095.5,630607.39,902919.72,7936448.5,1820481.5,654515.0,17052573.52
Alex,311823.0,7294692.0,2185587.25,11267931.75,5632171.5,762727.67,2872454.81,10827003.63,41154391.61
Fatima,244662.0,681830.0,658331.0,7849622.81,542290.0,566664.0,614229.08,628524.0,11786152.89
George,199039.0,6388670.25,555155.0,519795.0,2806343.38,2445326.63,679460.0,3257336.75,16851126.01
Ivan,5600532.81,2811041.5,649607.0,2363274.38,10479546.75,1051038.13,1058102.66,1138596.69,25151739.92
Bhavna,224180.0,1435124.31,5678501.34,607175.0,1330383.38,740070.0,2077235.38,597697.0,12690366.41


In [0]:
pivot_df.write.format("delta").mode("overwrite").save(gold_base_path + "Complaint_resolution")

## **Revenue Forecasting**

In [0]:
from pyspark.sql.functions import col, to_date, date_format, sum as spark_sum, month
from pyspark.sql import functions as F

# Load Cleaned Transactions Table (Staging Layer)
transactions_df = spark.read.format("delta").load(
    "abfss://source@team6geocartdata.dfs.core.windows.net/geocartdata/staging/transactions/"
)

# Convert TransactionDate to proper Date type
transactions_df = transactions_df.withColumn(
    "TransactionDate",
    to_date(col("TransactionDate"), "yyyy-MM-dd")
)

# Extract MONTH NUMBER and MONTH NAME
transactions_df = (
    transactions_df
        .withColumn("MonthNum", month(col("TransactionDate")))
        .withColumn("MonthName", date_format(col("TransactionDate"), "MMMM"))
)

# MONTHLY TOTAL REVENUE (Aggregated Across All Years)
monthly_revenue = (
    transactions_df
        .groupBy("MonthNum", "MonthName")
        .agg(spark_sum("TotalAmount").alias("TotalRevenue"))
        .orderBy("MonthNum")     
        .select("MonthName", "TotalRevenue") 
)

display(monthly_revenue)


MonthName,TotalRevenue
January,32569070.49891552
February,17042465.911469303
March,31167030.41342097
April,11260528.186468204
May,47671565.40467359
June,61210261.2978937
July,27857607.12487176
August,53846475.71333738
September,51830144.98869173
October,45582384.49744704


In [0]:
transactions_df.write.format("delta").mode("overwrite").save(gold_base_path + "Revenue_monthly")

## **Sales By Payment Method**

In [0]:
from pyspark.sql import *
from pyspark.sql.functions import *
sales_by_payment = (
    transactions_df.groupBy("PaymentType")
    .agg(sum("TotalAmount").alias("TotalSales"))
    .orderBy(col("TotalSales").desc())
)
sales_by_payment.show()


+-----------+--------------------+
|PaymentType|          TotalSales|
+-----------+--------------------+
|     Wallet| 9.683050423926511E7|
|        COD| 9.420582474951614E7|
|        UPI| 8.036408676074165E7|
| Debit Card|6.1893573619122915E7|
|Net Banking|  5.53646818167624E7|
|Credit Card| 4.640363254808543E7|
|    Unknown|1.4047763485214021E7|
+-----------+--------------------+



In [0]:
sales_by_payment.write.format("delta").mode("overwrite").save(gold_base_path + "sales_by_payment_method/")

## **SLA(Service Level Agreement) Breach Rate**

In [0]:
from pyspark.sql.functions import avg, when, col

sla_breach_rate = (
    support_tickets_df
    .withColumn("BreachFlag", when(col("SLA_Breached") == True, 1).otherwise(0))
    .agg(avg("BreachFlag").alias("SLA_Breach_Rate"))
)
sla_breach_rate.show()


+-------------------+
|    SLA_Breach_Rate|
+-------------------+
|0.24763804325005248|
+-------------------+



In [0]:
sla_breach_rate.write.format("delta").mode("overwrite").save(gold_base_path + "sla_breach_rate/")


## **Total Spending By Age**

In [0]:
from pyspark.sql.functions import when

# Create AgeGroup
customers_segmented = customers_df.withColumn(
    "AgeGroup",
    when(col("Age") < 25, "Youth")
    .when((col("Age") >= 25) & (col("Age") < 45), "Middle Age")
    .otherwise("Senior")
)

# Join with transactions and compute total sales
sales_by_age = (
    transactions_df.join(customers_segmented, "CustomerID", "inner")
    .groupBy("AgeGroup")
    .agg(sum("TotalAmount").alias("TotalSales"))
    .orderBy("AgeGroup")
)
sales_by_age.show()


+----------+--------------------+
|  AgeGroup|          TotalSales|
+----------+--------------------+
|Middle Age|1.1858197269626315E8|
|    Senior|2.6941899252541524E8|
|     Youth| 5.587697114301355E7|
+----------+--------------------+



In [0]:
sales_by_age.write.format("delta").mode("overwrite").save(gold_base_path + "sales_by_age_group/")


## **Total Spend By Customer ID**

In [0]:
from pyspark.sql.functions import col, sum as spark_sum

# Load transactions cleaned Delta/Parquet
transactions_df = spark.read.format("delta").load("abfss://source@team6geocartdata.dfs.core.windows.net/geocartdata/staging/transactions/")
# Compute Total Spend per Customer
total_spend_df = (
    transactions_df
        .groupBy("CustomerID")
        .agg(
            spark_sum(col("TotalAmount")).alias("TotalSpend")
        )
        .orderBy(col("TotalSpend").desc())
)

display(total_spend_df)




CustomerID,TotalSpend
4047,13625221.655430667
940,13319647.166667538
5502,12823285.532700334
1156,12316819.534548337
5992,11653421.306940306
3985,11388883.135395918
5293,10839738.17248129
4566,10660843.670350228
5217,10016889.325985812
9105,10001102.218394728


In [0]:
# Register the DataFrame as a temporary view
total_spend_df.createOrReplaceTempView("total_spend_df")

# Run the SQL query
result = spark.sql("SELECT * FROM total_spend_df WHERE CustomerID = 1554")
result.show()


+----------+-----------------+
|CustomerID|       TotalSpend|
+----------+-----------------+
|      1554|2775493.820748108|
+----------+-----------------+



In [0]:
total_spend_df.write.format("delta").mode("overwrite").save(gold_base_path + "total_spend/")

## **(Average Revenue Per Customer) By Transaction Date**

In [0]:
from pyspark.sql.functions import col, sum as spark_sum, countDistinct, round as spark_round

# Load transactions
transactions_df = spark.read.format("delta").load(
    "abfss://source@team6geocartdata.dfs.core.windows.net/geocartdata/staging/transactions/"
)

# Aggregate
daily_arpc = (
    transactions_df.groupBy("TransactionDate")
    .agg(
        spark_round(
            spark_sum("TotalAmount") / countDistinct("CustomerID"), 2
        ).alias("ARPC")
    )
)

# Show
display(daily_arpc)

TransactionDate,ARPC
2023-05-18,4162.02
2024-09-15,6043.67
2023-01-21,4106.5
2023-05-01,3997.19
2024-08-20,6082.46
2024-10-24,5756.72
2024-07-14,3682.88
2024-01-19,4284.75
2024-10-22,4881.4
2025-02-25,3570.02


In [0]:
daily_arpc.write.format("delta").mode("overwrite").save(gold_base_path + "daily_arpc/")