In [None]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, trim, initcap, lower, regexp_replace, when, coalesce,
    try_to_timestamp, to_date, countDistinct, count, sum as Fsum, avg as Favg, min as Fmin, max as Fmax,
    date_trunc, date_format, lit, broadcast, expr, lag, round, desc
)
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("CustomerBehaviorLoyalty").getOrCreate()



In [None]:
from google.colab import files
uploaded = files.upload()

Saving orders.csv to orders.csv


PHASE 1 — Ingestion & Cleaning

In [None]:

# 1) Read all as StringType
orders_raw_df = spark.read.csv("orders.csv", header=True, inferSchema=False)

# 2) Trim text fields
df = (orders_raw_df
      .withColumn("city", trim(col("city")))
      .withColumn("category", trim(col("category")))
      .withColumn("product", trim(col("product")))
      .withColumn("status", trim(col("status")))
)

# 3) Normalize case for city/category/product (proper case) — keep originals for audit
df = (df
      .withColumn("city_clean", initcap(col("city")))
      .withColumn("category_clean", initcap(col("category")))
      .withColumn("product_clean", initcap(col("product")))
)

# 4) Clean amount: remove commas, handle invalid, cast to int
df = df.withColumn("amount_nocomma", regexp_replace(col("amount"), ",", "")) \
       .withColumn(
            "amount_int",
            when(col("amount_nocomma").rlike(r"^[0-9]+$"), col("amount_nocomma").cast("int")).otherwise(None)
       )

# 5) Parse order_date into DateType (tolerant to invalids)
df = df.withColumn(
    "order_date_clean",
    coalesce(
        try_to_timestamp(col("order_date"), lit("yyyy-MM-dd")),
        try_to_timestamp(col("order_date"), lit("dd/MM/yyyy")),
        try_to_timestamp(col("order_date"), lit("yyyy/MM/dd"))
    ).cast("date")
)

# 6) Remove duplicate order_id
df = df.dropDuplicates(["order_id"])

# 7) Keep only Completed orders (case-insensitive, trims handled)
clean_orders_df = df.filter(lower(col("status")) == lit("completed"))

clean_orders_df.cache()
print("Clean records:", clean_orders_df.count())
clean_orders_df.printSchema()
clean_orders_df.show(10, truncate=False)


Clean records: 285000
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- city: string (nullable = true)
 |-- category: string (nullable = true)
 |-- product: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- status: string (nullable = true)
 |-- city_clean: string (nullable = true)
 |-- category_clean: string (nullable = true)
 |-- product_clean: string (nullable = true)
 |-- amount_nocomma: string (nullable = true)
 |-- amount_int: integer (nullable = true)
 |-- order_date_clean: date (nullable = true)

+-----------+-----------+---------+-----------+-------+------+----------+---------+----------+--------------+-------------+--------------+----------+----------------+
|order_id   |customer_id|city     |category   |product|amount|order_date|status   |city_clean|category_clean|product_clean|amount_nocomma|amount_int|order_date_clean|
+-----------+-----------+---------+-----------+-------

PHASE 2 — Customer Metrics

In [None]:

customer_profile_df = (clean_orders_df
    .groupBy("customer_id")
    .agg(
        count("*").alias("total_orders"),
        Fsum(col("amount_int")).alias("total_spend"),
        Favg(col("amount_int")).alias("avg_order_value"),
        Fmin(col("order_date_clean")).alias("first_purchase_date"),
        Fmax(col("order_date_clean")).alias("last_purchase_date"),
        countDistinct("city_clean").alias("distinct_cities"),
        countDistinct("category_clean").alias("distinct_categories")
    )
)

customer_profile_df.cache()
customer_profile_df.show(10, truncate=False)



+-----------+------------+-----------+------------------+-------------------+------------------+---------------+-------------------+
|customer_id|total_orders|total_spend|avg_order_value   |first_purchase_date|last_purchase_date|distinct_cities|distinct_categories|
+-----------+------------+-----------+------------------+-------------------+------------------+---------------+-------------------+
|C016502    |6           |318813     |53135.5           |2024-01-03         |2024-02-12        |3              |3                  |
|C007013    |6           |241427     |48285.4           |2024-01-14         |2024-02-23        |2              |2                  |
|C022166    |6           |266454     |44409.0           |2024-01-07         |2024-02-16        |4              |4                  |
|C036542    |6           |232053     |46410.6           |2024-01-03         |2024-02-12        |4              |4                  |
|C049309    |6           |272741     |54548.2           |2024-01-10  

PHASE 3 — Customer Segmentation

In [None]:

customer_segmented_df = (customer_profile_df
    .withColumn(
        "customer_segment",
        when((col("total_spend") >= 200000) & (col("total_orders") >= 5), "VIP")
        .when(col("total_spend") >= 100000, "Premium")
        .otherwise("Regular")
    )
)

segment_counts_df = customer_segmented_df.groupBy("customer_segment").count()
segment_counts_df.show()



+----------------+-----+
|customer_segment|count|
+----------------+-----+
|         Premium|12485|
|         Regular|  623|
|             VIP|34392|
+----------------+-----+



PHASE 4 — Window Functions

In [None]:

w_overall = Window.orderBy(desc("total_spend"))
ranked_overall_df = customer_segmented_df.withColumn("spend_rank_overall", expr("rank()").over(w_overall))
ranked_overall_df.select("customer_id", "total_spend", "spend_rank_overall").orderBy("spend_rank_overall").show(10)



+-----------+-----------+------------------+
|customer_id|total_spend|spend_rank_overall|
+-----------+-----------+------------------+
|    C043076|     493949|                 1|
|    C034689|     486879|                 2|
|    C039985|     484057|                 3|
|    C026691|     477147|                 4|
|    C038979|     477138|                 5|
|    C020762|     474717|                 6|
|    C044654|     471304|                 7|
|    C014292|     468617|                 8|
|    C019565|     467523|                 9|
|    C045487|     467050|                10|
+-----------+-----------+------------------+
only showing top 10 rows


In [None]:

city_customer_spend_df = (clean_orders_df
    .groupBy("city_clean", "customer_id")
    .agg(Fsum("amount_int").alias("city_customer_spend"))
)

w_city = Window.partitionBy("city_clean").orderBy(desc("city_customer_spend"))
ranked_in_city_df = city_customer_spend_df.withColumn("city_spend_rank", expr("dense_rank()").over(w_city))
ranked_in_city_df.orderBy("city_clean", "city_spend_rank").show(30, truncate=False)


+----------+-----------+-------------------+---------------+
|city_clean|customer_id|city_customer_spend|city_spend_rank|
+----------+-----------+-------------------+---------------+
|Bangalore |C011518    |332527             |1              |
|Bangalore |C024935    |315622             |2              |
|Bangalore |C025451    |303208             |3              |
|Bangalore |C008486    |300843             |4              |
|Bangalore |C039191    |294970             |5              |
|Bangalore |C006114    |290915             |6              |
|Bangalore |C029163    |286115             |7              |
|Bangalore |C028773    |285105             |8              |
|Bangalore |C045363    |283538             |9              |
|Bangalore |C043646    |272357             |10             |
|Bangalore |C020442    |271941             |11             |
|Bangalore |C032542    |262148             |12             |
|Bangalore |C012335    |260141             |13             |
|Bangalore |C018229    |

In [None]:

top3_per_city_df = ranked_in_city_df.filter(col("city_spend_rank") <= 3)
top3_per_city_df.show(50, truncate=False)


+----------+-----------+-------------------+---------------+
|city_clean|customer_id|city_customer_spend|city_spend_rank|
+----------+-----------+-------------------+---------------+
|Bangalore |C011518    |332527             |1              |
|Bangalore |C024935    |315622             |2              |
|Bangalore |C025451    |303208             |3              |
|Chennai   |C028121    |340890             |1              |
|Chennai   |C027841    |287392             |2              |
|Chennai   |C030712    |284466             |3              |
|Delhi     |C016309    |325001             |1              |
|Delhi     |C022599    |314625             |2              |
|Delhi     |C018688    |306692             |3              |
|Hyderabad |C032833    |318097             |1              |
|Hyderabad |C023269    |292791             |2              |
|Hyderabad |C013263    |291679             |3              |
|Kolkata   |C032246    |304480             |1              |
|Kolkata   |C022131    |

In [None]:

top10_customers_overall_df = ranked_overall_df.orderBy(col("spend_rank_overall")).limit(10)
top10_customers_overall_df.show(truncate=False)



+-----------+------------+-----------+-----------------+-------------------+------------------+---------------+-------------------+----------------+------------------+
|customer_id|total_orders|total_spend|avg_order_value  |first_purchase_date|last_purchase_date|distinct_cities|distinct_categories|customer_segment|spend_rank_overall|
+-----------+------------+-----------+-----------------+-------------------+------------------+---------------+-------------------+----------------+------------------+
|C043076    |6           |493949     |82324.83333333333|2024-01-17         |2024-02-26        |5              |4                  |VIP             |1                 |
|C034689    |6           |486879     |81146.5          |2024-01-10         |2024-02-19        |4              |3                  |VIP             |2                 |
|C039985    |6           |484057     |80676.16666666667|2024-01-06         |2024-02-15        |3              |4                  |VIP             |3           

PHASE 5 — Customer Loyalty Analysis

In [None]:

customer_loyalty_df = (clean_orders_df
    .groupBy("customer_id")
    .agg(
        countDistinct("order_date_clean").alias("distinct_purchase_dates"),
        countDistinct("category_clean").alias("distinct_categories_purchased"),
        Fsum("amount_int").alias("total_spend_from_orders")
    )
    .withColumn(
        "is_loyal",
        (col("distinct_purchase_dates") >= 3) & (col("distinct_categories_purchased") >= 2)
    )
)

# 1) Identify loyal customers
loyal_customers_df = customer_loyalty_df.filter(col("is_loyal") == True)
loyal_customers_df.select("customer_id").show(20, truncate=False)

# 2) Loyal customers per city:
# Interpretation: a loyal customer is counted for each city where they made at least one purchase.
loyal_per_city_df = (clean_orders_df.join(loyal_customers_df, "customer_id")
    .select("city_clean", "customer_id").distinct()
    .groupBy("city_clean").agg(countDistinct("customer_id").alias("loyal_customers"))
)
loyal_per_city_df.orderBy(desc("loyal_customers")).show(truncate=False)

# 3) Loyal vs non-loyal revenue contribution
loyal_revenue_df = (clean_orders_df
    .join(customer_loyalty_df.select("customer_id", "is_loyal"), "customer_id", "left")
    .groupBy("is_loyal")
    .agg(Fsum("amount_int").alias("revenue"))
    .withColumn("is_loyal_str", when(col("is_loyal") == True, "Loyal").otherwise("Non-Loyal"))
    .select("is_loyal_str", "revenue")
)
loyal_revenue_df.show()


+-----------+
|customer_id|
+-----------+
|C022166    |
|C016502    |
|C007013    |
|C049309    |
|C029712    |
|C036809    |
|C047477    |
|C017845    |
|C033643    |
|C036542    |
|C030046    |
|C033572    |
|C047815    |
|C005519    |
|C014129    |
|C038464    |
|C043610    |
|C029861    |
|C035074    |
|C030167    |
+-----------+
only showing top 20 rows
+----------+---------------+
|city_clean|loyal_customers|
+----------+---------------+
|Hyderabad |28827          |
|Delhi     |28761          |
|Pune      |28714          |
|Chennai   |28669          |
|Kolkata   |28581          |
|Mumbai    |28532          |
|Bangalore |28514          |
+----------+---------------+

+------------+-----------+
|is_loyal_str|    revenue|
+------------+-----------+
|       Loyal|11421590547|
|   Non-Loyal|   14900177|
+------------+-----------+



PHASE 6 — Time-Based Analysis

In [None]:

# Add month bucket
orders_with_month_df = clean_orders_df.withColumn("order_month", date_trunc("month", col("order_date_clean")))

# 1) Monthly revenue per city
monthly_revenue_city_df = (orders_with_month_df
    .groupBy("order_month", "city_clean")
    .agg(Fsum("amount_int").alias("monthly_revenue"))
)

# 2) Monthly order count per category
monthly_orders_category_df = (orders_with_month_df
    .groupBy("order_month", "category_clean")
    .agg(count("*").alias("monthly_order_count"))
)

# 3) Growth trends — month-over-month delta and % change by city
w_month_city = Window.partitionBy("city_clean").orderBy("order_month")
trend_city_df = (monthly_revenue_city_df
    .withColumn("prev_revenue", lag("monthly_revenue").over(w_month_city))
    .withColumn("mom_change", col("monthly_revenue") - col("prev_revenue"))
    .withColumn("mom_change_pct", round(col("mom_change") / col("prev_revenue") * 100, 2))
)

monthly_revenue_city_df.orderBy("order_month", "city_clean").show(50, truncate=False)
trend_city_df.orderBy("city_clean", "order_month").show(50, truncate=False)
monthly_orders_category_df.orderBy("order_month", desc("monthly_order_count")).show(50, truncate=False)


+-------------------+----------+---------------+
|order_month        |city_clean|monthly_revenue|
+-------------------+----------+---------------+
|NULL               |Bangalore |14024671       |
|NULL               |Chennai   |14936431       |
|NULL               |Delhi     |16430276       |
|NULL               |Hyderabad |13126928       |
|NULL               |Kolkata   |14283855       |
|NULL               |Mumbai    |13145711       |
|NULL               |Pune      |14909854       |
|2024-01-01 00:00:00|Bangalore |822339117      |
|2024-01-01 00:00:00|Chennai   |818567389      |
|2024-01-01 00:00:00|Delhi     |817332633      |
|2024-01-01 00:00:00|Hyderabad |833063605      |
|2024-01-01 00:00:00|Kolkata   |824920456      |
|2024-01-01 00:00:00|Mumbai    |816636150      |
|2024-01-01 00:00:00|Pune      |833507124      |
|2024-02-01 00:00:00|Bangalore |792163305      |
|2024-02-01 00:00:00|Chennai   |796361427      |
|2024-02-01 00:00:00|Delhi     |805877007      |
|2024-02-01 00:00:00

PHASE 7 — Performance Engineering

In [None]:

clean_orders_df.cache()
customer_profile_df.cache()
city_customer_spend_df.cache()


print("\n== Explain: Customer aggregation ==")
customer_profile_df.explain(True)

print("\n== Explain: Window ranking inside city ==")
ranked_in_city_df.explain(True)



== Explain: Customer aggregation ==
== Parsed Logical Plan ==
'Aggregate ['customer_id], ['customer_id, 'count(*) AS total_orders#971, 'sum('amount_int) AS total_spend#972, 'avg('amount_int) AS avg_order_value#973, 'min('order_date_clean) AS first_purchase_date#974, 'max('order_date_clean) AS last_purchase_date#975, 'count(distinct 'city_clean) AS distinct_cities#976, 'count(distinct 'category_clean) AS distinct_categories#977]
+- Filter (lower(status#65) = completed)
   +- Deduplicate [order_id#53]
      +- Project [order_id#53, customer_id#54, city#62, category#63, product#64, amount#58, order_date#59, status#65, city_clean#66, category_clean#67, product_clean#68, amount_nocomma#69, amount_int#70, cast(coalesce(try_to_timestamp(order_date#59, Some(yyyy-MM-dd), TimestampType, Some(Etc/UTC), false), try_to_timestamp(order_date#59, Some(dd/MM/yyyy), TimestampType, Some(Etc/UTC), false), try_to_timestamp(order_date#59, Some(yyyy/MM/dd), TimestampType, Some(Etc/UTC), false)) as date) AS 

* Reused DFs: clean_orders_df, customer_profile_df, city_customer_spend_df, monthly_revenue_city_df
* Cache the ones reused multiple times.

**Shuffle stages**

Occur on groupBy and distinct due to key-based re-partitioning.
Window with partitionBy + orderBy triggers shuffle on partition keys.

**Repartitioning justification**

In [None]:

# If most aggregations are by city, repartition by city to reduce shuffle cost:
repart_by_city_df = clean_orders_df.repartition(col("city_clean"))
repart_by_city_df.groupBy("city_clean").agg(Fsum("amount_int")).explain(True)


== Parsed Logical Plan ==
'Aggregate ['city_clean], ['city_clean, unresolvedalias('sum('amount_int))]
+- RepartitionByExpression [city_clean#66]
   +- Filter (lower(status#65) = completed)
      +- Deduplicate [order_id#53]
         +- Project [order_id#53, customer_id#54, city#62, category#63, product#64, amount#58, order_date#59, status#65, city_clean#66, category_clean#67, product_clean#68, amount_nocomma#69, amount_int#70, cast(coalesce(try_to_timestamp(order_date#59, Some(yyyy-MM-dd), TimestampType, Some(Etc/UTC), false), try_to_timestamp(order_date#59, Some(dd/MM/yyyy), TimestampType, Some(Etc/UTC), false), try_to_timestamp(order_date#59, Some(yyyy/MM/dd), TimestampType, Some(Etc/UTC), false)) as date) AS order_date_clean#71]
            +- Project [order_id#53, customer_id#54, city#62, category#63, product#64, amount#58, order_date#59, status#65, city_clean#66, category_clean#67, product_clean#68, amount_nocomma#69, CASE WHEN RLIKE(amount_nocomma#69, ^[0-9]+$) THEN cast(amount_n

PHASE 8 — Broadcast Join (Light Use)

In [None]:


segment_map = [("VIP", 1), ("Premium", 2), ("Regular", 3)]
segment_df = spark.createDataFrame(segment_map, ["segment_label", "segment_code"])

segmented_with_code_df = (customer_segmented_df
    .withColumnRenamed("customer_segment", "segment_label")
    .join(broadcast(segment_df), "segment_label", "left")
)

segmented_with_code_df.explain(True)
segmented_with_code_df.select("customer_id", "segment_label", "segment_code").show(10, truncate=False)


== Parsed Logical Plan ==
'Join UsingJoin(LeftOuter, [segment_label])
:- Project [customer_id#54, total_orders#971L, total_spend#972L, avg_order_value#973, first_purchase_date#974, last_purchase_date#975, distinct_cities#976L, distinct_categories#977L, customer_segment#1698 AS segment_label#6883]
:  +- Project [customer_id#54, total_orders#971L, total_spend#972L, avg_order_value#973, first_purchase_date#974, last_purchase_date#975, distinct_cities#976L, distinct_categories#977L, CASE WHEN ((total_spend#972L >= cast(200000 as bigint)) AND (total_orders#971L >= cast(5 as bigint))) THEN VIP WHEN (total_spend#972L >= cast(100000 as bigint)) THEN Premium ELSE Regular END AS customer_segment#1698]
:     +- Aggregate [customer_id#54], [customer_id#54, count(1) AS total_orders#971L, sum(amount_int#70) AS total_spend#972L, avg(amount_int#70) AS avg_order_value#973, min(order_date_clean#71) AS first_purchase_date#974, max(order_date_clean#71) AS last_purchase_date#975, count(distinct city_clean#

The segment map is tiny; broadcasting it avoids shuffling the large customer dataset and yields a BroadcastHashJoin.

PHASE 9 — Sorting & Set Operations

In [None]:

# 1) Sort customers by total spend desc and order count desc
sorted_customers_df = (customer_profile_df
    .orderBy(desc("total_spend"), desc("total_orders"))
)
sorted_customers_df.show(20, truncate=False)

# 2) Create sets: Electronics vs Grocery buyers
electronics_customers_df = (clean_orders_df
    .filter(lower(col("category_clean")) == "electronics")
    .select("customer_id").distinct()
)
grocery_customers_df = (clean_orders_df
    .filter(lower(col("category_clean")) == "grocery")
    .select("customer_id").distinct()
)

# 3) Both sets (intersection)
both_sets_df = electronics_customers_df.join(grocery_customers_df, "customer_id", "inner")
both_sets_df.show(20, truncate=False)

# Only Electronics
only_electronics_df = electronics_customers_df.join(grocery_customers_df, "customer_id", "left_anti")
# Only Grocery
only_grocery_df = grocery_customers_df.join(electronics_customers_df, "customer_id", "left_anti")

print("Both sets:", both_sets_df.count())
print("Only Electronics:", only_electronics_df.count())
print("Only Grocery:", only_grocery_df.count())


+-----------+------------+-----------+-----------------+-------------------+------------------+---------------+-------------------+
|customer_id|total_orders|total_spend|avg_order_value  |first_purchase_date|last_purchase_date|distinct_cities|distinct_categories|
+-----------+------------+-----------+-----------------+-------------------+------------------+---------------+-------------------+
|C043076    |6           |493949     |82324.83333333333|2024-01-17         |2024-02-26        |5              |4                  |
|C034689    |6           |486879     |81146.5          |2024-01-10         |2024-02-19        |4              |3                  |
|C039985    |6           |484057     |80676.16666666667|2024-01-06         |2024-02-15        |3              |4                  |
|C026691    |6           |477147     |79524.5          |2024-01-12         |2024-02-21        |4              |3                  |
|C038979    |6           |477138     |79523.0          |2024-01-20         |

PHASE 10 — Storage Strategy

In [None]:

# 1) Customer master (Parquet)
customer_master_out = (customer_segmented_df
    .join(segmented_with_code_df.select("customer_id", "segment_code"), "customer_id", "left")
    .withColumnRenamed("segment_label", "customer_segment")
    .select("customer_id", "total_orders", "total_spend", "avg_order_value",
            "first_purchase_date", "last_purchase_date",
            "distinct_cities", "distinct_categories", "customer_segment", "segment_code")
)

customer_master_out.write.mode("overwrite").partitionBy("customer_segment").parquet("out/customer_master_parquet")

# 2) Monthly analytics (ORC)
monthly_revenue_city_df.write.mode("overwrite").orc("out/monthly_revenue_city_orc")
monthly_orders_category_df.write.mode("overwrite").orc("out/monthly_orders_category_orc")

# 3) Read back & validate
cm_read = spark.read.parquet("out/customer_master_parquet")
mrc_read = spark.read.orc("out/monthly_revenue_city_orc")
moc_read = spark.read.orc("out/monthly_orders_category_orc")

cm_read.printSchema(); print(cm_read.count())
mrc_read.printSchema(); print(mrc_read.count())
moc_read.printSchema(); print(moc_read.count())


root
 |-- customer_id: string (nullable = true)
 |-- total_orders: long (nullable = true)
 |-- total_spend: long (nullable = true)
 |-- avg_order_value: double (nullable = true)
 |-- first_purchase_date: date (nullable = true)
 |-- last_purchase_date: date (nullable = true)
 |-- distinct_cities: long (nullable = true)
 |-- distinct_categories: long (nullable = true)
 |-- segment_code: long (nullable = true)
 |-- customer_segment: string (nullable = true)

47500
root
 |-- order_month: timestamp (nullable = true)
 |-- city_clean: string (nullable = true)
 |-- monthly_revenue: long (nullable = true)

21
root
 |-- order_month: timestamp (nullable = true)
 |-- category_clean: string (nullable = true)
 |-- monthly_order_count: long (nullable = true)

12


PHASE 11 — Debugging

Why this is dangerous?

In [None]:
# df = df.groupBy("customer_id").sum("amount").show()

* .show() is an action that returns None.
* You assign None back to df, so df stops being a DataFrame.
* Any further transformations on df will fail.

Correct approach

In [None]:

agg_df = df.groupBy("customer_id").sum("amount_int")
agg_df.show()


+-----------+---------------+
|customer_id|sum(amount_int)|
+-----------+---------------+
|    C000142|         313288|
|    C000299|         228261|
|    C000433|         285507|
|    C001115|         163614|
|    C001875|         213381|
|    C002484|         145308|
|    C002512|         336838|
|    C002837|         225248|
|    C003080|         265277|
|    C003194|         227384|
|    C003484|         300712|
|    C004744|         182500|
|    C004804|         170880|
|    C005119|         197410|
|    C005781|         209156|
|    C006654|         130659|
|    C007013|         241427|
|    C008123|         232125|
|    C008343|         230184|
|    C008471|         214694|
+-----------+---------------+
only showing top 20 rows
