In [7]:


from pyspark.sql import SparkSession
from pyspark.sql import functions as F, types as T
from pyspark.sql.window import Window
from pyspark import StorageLevel
import os

spark = (SparkSession.builder
         .appName("Orders-ETL-All-In-One")
         .config("spark.sql.shuffle.partitions", "200")    # tune per cluster
         .config("spark.sql.adaptive.enabled", "true")     # AQE helps at scale
         .getOrCreate())


def resolve_input_path(user_override=None):
    candidates = []
    if user_override:
        candidates.append(user_override)


    candidates += [
        "orders_raw.csv",
        "./orders_raw.csv",
        "/content/orders_raw.csv",
    ]


    candidates += [
        "/dbfs/FileStore/orders_raw.csv",
        "dbfs:/FileStore/orders_raw.csv",
    ]


    for p in candidates:
        if p.startswith("dbfs:"):
            continue
        if os.path.exists(p):
            return p


    try:
        dbutils
        try:
            _ = dbutils.fs.head("dbfs:/FileStore/orders_raw.csv", 1)  # type: ignore
            return "dbfs:/FileStore/orders_raw.csv"
        except Exception:
            pass
    except NameError:
        pass

    return None


USER_INPUT_PATH = None

input_path = resolve_input_path(USER_INPUT_PATH)
if not input_path:
    raise FileNotFoundError(
        "Could not find 'orders_raw.csv'. Upload it next to the notebook (./orders_raw.csv) "
        "or to DBFS at dbfs:/FileStore/orders_raw.csv, then re-run."
    )
print(f"Using input_path: {input_path}")


raw_schema = T.StructType([
    T.StructField("order_id",    T.StringType(), True),
    T.StructField("customer_id", T.StringType(), True),
    T.StructField("city",        T.StringType(), True),
    T.StructField("category",    T.StringType(), True),
    T.StructField("product",     T.StringType(), True),
    T.StructField("amount",      T.StringType(), True),
    T.StructField("order_date",  T.StringType(), True),
    T.StructField("status",      T.StringType(), True),
])
df_raw = (spark.read
    .option("header", "true")
    .option("mode", "PERMISSIVE")
    .option("columnNameOfCorruptRecord", "_corrupt_record")
    .csv(input_path))


print("=== Raw Schema ===")
df_raw.printSchema()
print("=== Raw Record Count ===", df_raw.count())


string_cols = ["order_id","customer_id","city","category","product","amount","order_date","status"]
def trim_all(df, cols):
    for c in cols:
        df = df.withColumn(c, F.when(F.col(c).isNotNull(), F.trim(F.col(c))).otherwise(F.col(c)))
    return df
df1 = trim_all(df_raw, string_cols)


city_keys = ["bangalore","bengaluru","mumbai","bombay","delhi","new delhi",
             "chennai","kolkata","calcutta","hyderabad","pune"]
city_vals = ["Bangalore","Bangalore","Mumbai","Mumbai","Delhi","Delhi",
             "Chennai","Kolkata","Kolkata","Hyderabad","Pune"]
city_map = F.map_from_arrays(F.array(*[F.lit(k) for k in city_keys]),
                             F.array(*[F.lit(v) for v in city_vals]))

df2 = (df1
    .withColumn("city_lc", F.lower(F.col("city")))
    .withColumn("city_std", F.coalesce(city_map[F.col("city_lc")], F.initcap(F.col("city_lc"))))
    .withColumn("category_lc", F.lower(F.col("category")))
    .withColumn("category_std",
                F.when(F.col("category_lc").isin("electronics","grocery","fashion","home"),
                       F.initcap(F.col("category_lc")))
                 .otherwise(F.initcap(F.col("category_lc"))))
    .withColumn("product_std",
                F.initcap(F.regexp_replace(F.regexp_replace(F.col("product"), r"\s+", " "),
                                           r",+$", "")))
)


df3 = (df2
    .withColumn("amount_digits", F.regexp_replace(F.col("amount"), r"[^0-9]", ""))
    .withColumn("amount_int", F.when(F.length(F.col("amount_digits")) > 0,
                                     F.col("amount_digits").cast("int"))
                               .otherwise(F.lit(None).cast("int")))
)


date_candidates = [
    F.try_to_timestamp(F.col("order_date"), F.lit("yyyy-MM-dd")),
    F.try_to_timestamp(F.col("order_date"), F.lit("dd/MM/yyyy")),
    F.try_to_timestamp(F.col("order_date"), F.lit("yyyy/MM/dd")),
]

df4 = (df3
    .withColumn("order_ts", F.coalesce(*date_candidates))
    .withColumn("order_date_std", F.to_date("order_ts"))
)



df5 = (df4
    .withColumn("invalid_amount", F.col("amount_int").isNull())
    .withColumn("invalid_date",   F.col("order_date_std").isNull())
    .withColumn("invalid_status", ~F.array_contains(F.array(F.lit("Completed"), F.lit("Cancelled")), F.col("status")))
)

if "_corrupt_record" in df5.columns:
    df_clean = df5.withColumn(
        "is_invalid_record",
        F.col("invalid_amount") | F.col("invalid_date") | F.col("invalid_status") | F.col("_corrupt_record").isNotNull()
    )
else:
    df_clean = df5.withColumn(
        "is_invalid_record",
        F.col("invalid_amount") | F.col("invalid_date") | F.col("invalid_status")
    )

print("=== Data Quality Snapshot ===")
dq = (df_clean
      .select(
          F.count("*").alias("total_rows"),
          F.sum(F.col("invalid_amount").cast("int")).alias("invalid_amount_rows"),
          F.sum(F.col("invalid_date").cast("int")).alias("invalid_date_rows"),
          F.sum(F.col("invalid_status").cast("int")).alias("invalid_status_rows"),
          F.sum(F.col("is_invalid_record").cast("int")).alias("invalid_any_rows"),
      ))
dq.show(truncate=False)


df_dedup = df_clean.dropDuplicates(["order_id"])


pre_cnt  = df_dedup.count()
df_valid = df_dedup.filter((F.col("status") == "Completed") & (~F.col("is_invalid_record")))
post_cnt = df_valid.count()

print("Count after de-dup:", pre_cnt)
print("Count after status='Completed' & validity:", post_cnt)


df_for_aggs = df_valid.repartition("city_std").persist(StorageLevel.MEMORY_AND_DISK)
print("Materializing cache ...", df_for_aggs.count())


revenue_by_city = (df_for_aggs
    .groupBy("city_std")
    .agg(F.sum("amount_int").alias("total_revenue"))
    .orderBy(F.desc("total_revenue")))
revenue_by_city.explain(True)
revenue_by_city.show(20, truncate=False)

revenue_by_category = (df_for_aggs
    .groupBy("category_std")
    .agg(F.sum("amount_int").alias("total_revenue"))
    .orderBy(F.desc("total_revenue")))
revenue_by_category.show(20, truncate=False)

top5_products = (df_for_aggs
    .groupBy("product_std")
    .agg(F.sum("amount_int").alias("product_revenue"))
    .orderBy(F.desc("product_revenue"))
    .limit(5))
top5_products.show(truncate=False)

aov_by_city = (df_for_aggs
    .groupBy("city_std")
    .agg(F.avg("amount_int").alias("avg_order_value"))
    .orderBy(F.desc("avg_order_value")))
aov_by_city.show(20, truncate=False)


city_rank_w = Window.orderBy(F.desc("total_revenue"))
ranked_cities = revenue_by_city.withColumn("rank_revenue", F.rank().over(city_rank_w))
ranked_cities.show(truncate=False)

prod_cat_rev = (df_for_aggs
    .groupBy("category_std", "product_std")
    .agg(F.sum("amount_int").alias("product_revenue")))
prod_rank_w = Window.partitionBy("category_std").orderBy(F.desc("product_revenue"))
ranked_products_in_category = prod_cat_rev.withColumn("rank_in_category", F.rank().over(prod_rank_w))
ranked_products_in_category.show(50, truncate=False)

top_product_per_category = ranked_products_in_category.filter(F.col("rank_in_category") == 1)
top_product_per_category.show(truncate=False)


curated_out   = "./curated_orders_parquet_by_city"
analytics_out = "./analytics_orc"


(df_valid
 .select("order_id","customer_id","city_std","category_std","product_std",
         "amount_int","order_date_std","status")
 .write
 .mode("overwrite")
 .partitionBy("city_std")
 .parquet(curated_out))

(revenue_by_city.write.mode("overwrite").format("orc").save(f"{analytics_out}/revenue_by_city"))
(revenue_by_category.write.mode("overwrite").format("orc").save(f"{analytics_out}/revenue_by_category"))
(top5_products.write.mode("overwrite").format("orc").save(f"{analytics_out}/top5_products"))
(aov_by_city.write.mode("overwrite").format("orc").save(f"{analytics_out}/aov_by_city"))
(top_product_per_category.write.mode("overwrite").format("orc").save(f"{analytics_out}/top_product_per_category"))

# ------------------- Section 8 â€” Debugging & Reasoning ------------------------
# Why this fails:
#   df = df.filter(df.amount > 50000).show()
# - 'amount' is StringType (ingested as string); numeric comparison is invalid.
# - show() returns None, so assigning it to df sets df=None.
# Correct demo:
df_debug = df_raw.withColumn(
    "amount_int_safe",
    F.when(F.length(F.regexp_replace("amount", r"[^0-9]", "")) > 0,
           F.regexp_replace("amount", r"[^0-9]", "").cast("int"))
)
df_debug_filtered = df_debug.filter(F.col("amount_int_safe") > 50000)
df_debug_filtered.show(5, truncate=False)

df_for_aggs.unpersist()
print("Pipeline finished successfully.")


Using input_path: orders_raw.csv
=== Raw Schema ===
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- city: string (nullable = true)
 |-- category: string (nullable = true)
 |-- product: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- status: string (nullable = true)

=== Raw Record Count === 300000
=== Data Quality Snapshot ===
+----------+-------------------+-----------------+-------------------+----------------+
|total_rows|invalid_amount_rows|invalid_date_rows|invalid_status_rows|invalid_any_rows|
+----------+-------------------+-----------------+-------------------+----------------+
|300000    |25164              |2595             |0                  |27542           |
+----------+-------------------+-----------------+-------------------+----------------+

Count after de-dup: 300000
Count after status='Completed' & validity: 258834
Materializing cache ... 258834
== Parsed Logic