<a href="https://colab.research.google.com/github/bhuguvi26/Copy-of-Dmart-analysis-using-pyspark/blob/main/Copy_of_Dmart_analysis_using_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ============================
# ‚úÖ DMART PySpark Project ‚Äî FINAL STABLE VERSION
# ============================

# 1Ô∏è‚É£ Install Java & PySpark
!apt-get install openjdk-11-jdk -qq > /dev/null
!pip install -q pyspark==3.5.1 findspark

import os, re, shutil
from pathlib import Path
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["PATH"]+= ":/usr/lib/jvm/java-11-openjdk-amd64/bin"

import findspark
findspark.init()

from pyspark.sql import SparkSession, functions as F, types as T, Window
from google.colab import files

spark = SparkSession.builder.master("local[*]").appName("DMart_PySpark_Final").getOrCreate()
print("‚úÖ Spark version:", spark.version)

# 2Ô∏è‚É£ File loader (search or upload)
data_dir = Path("/content/data"); data_dir.mkdir(exist_ok=True)

def load_or_upload(name):
    for p in ["/content/data","/content","/mnt/data"]:
        file = Path(p)/name
        if file.exists():
            print(f"‚úÖ Found: {file}")
            return str(file)
    print(f"üìÇ Upload {name}")
    up = files.upload()
    for f,data in up.items():
        p = data_dir/f
        open(p,"wb").write(data)
        return str(p)

p_prod   = load_or_upload("Product.csv")
p_sales  = load_or_upload("Sales.csv")
p_cust   = load_or_upload("Customer.csv")

df_prod = spark.read.csv(p_prod, header=True, inferSchema=True)
df_sales= spark.read.csv(p_sales, header=True, inferSchema=True)
df_cust = spark.read.csv(p_cust, header=True, inferSchema=True)

# 3Ô∏è‚É£ Normalize column names
def norm(df):
    return df.toDF(*[re.sub(r'[^A-Za-z0-9]+','_',c.lower()).strip('_') for c in df.columns])

df_prod, df_sales, df_cust = map(norm,[df_prod,df_sales,df_cust])

print("üìå Product Columns:", df_prod.columns)
print("üìå Sales Columns:", df_sales.columns)
print("üìå Customer Columns:", df_cust.columns)

# 4Ô∏è‚É£ Rename important keys safely
def rename(df, options, new):
    for o in options:
        if o in df.columns: return df.withColumnRenamed(o,new)
    return df

df_sales = rename(df_sales,["product_id","productid","prod_id"],"product_id")
df_prod  = rename(df_prod, ["product_id","productid","prod_id"],"product_id")

df_sales = rename(df_sales,["customer_id","customerid"],"customer_id")
df_cust  = rename(df_cust, ["customer_id","customerid"],"customer_id")

# 5Ô∏è‚É£ Add missing product_id if not present
if "product_id" not in df_prod.columns:
    print("‚ö†Ô∏è No product_id found ‚Äî generating surrogate key")
    w = Window.orderBy(F.monotonically_increasing_id())
    df_prod = df_prod.withColumn("product_id", F.row_number().over(w).cast("string"))

# Cast key fields to string
df_sales = df_sales.withColumn("product_id",F.col("product_id").cast("string"))
df_prod  = df_prod.withColumn("product_id",F.col("product_id").cast("string"))
df_cust  = df_cust.withColumn("customer_id",F.col("customer_id").cast("string"))
df_sales = df_sales.withColumn("customer_id",F.col("customer_id").cast("string"))

# 6Ô∏è‚É£ Cast numeric fields
for c in ["sales","quantity","profit","discount"]:
    if c in df_sales.columns:
        df_sales = df_sales.withColumn(c, F.col(c).cast("double"))

# Create unit_price if missing
if "unit_price" not in df_sales.columns:
    df_sales = df_sales.withColumn("unit_price",
        F.when(F.col("quantity")>0, F.col("sales")/F.col("quantity"))
    )

# Convert order_date
if "order_date" in df_sales.columns:
    df_sales = df_sales.withColumn("order_date", F.to_date("order_date"))

# Replace null numeric
df_sales = df_sales.fillna({"profit":0,"discount":0})

# 7Ô∏è‚É£ Join tables
df = df_sales.join(df_prod,"product_id","left") \
             .join(df_cust,"customer_id","left")

df = df.withColumn("sales_amount", F.col("quantity") * F.col("unit_price"))
df.createOrReplaceTempView("dmart")

# 8Ô∏è‚É£ Run queries & save
out = Path("/content/output"); out.mkdir(exist_ok=True)
def run(name,sql):
    print(f"\nüìä ---- {name} ----")
    df_res = spark.sql(sql)
    df_res.show(10,False)
    tmp = out/name
    df_res.coalesce(1).write.mode("overwrite").option("header",True).csv(str(tmp))
    shutil.copy(list(tmp.glob("part*.csv"))[0], out/f"{name}.csv")

queries = {
"sales_by_category":"""
SELECT coalesce(category,'Unknown') category,
       round(sum(sales_amount),2) total_sales
FROM dmart GROUP BY category ORDER BY total_sales DESC""",

"top_customer_orders":"""
SELECT customer_id, customer_name, count(order_id) orders
FROM dmart GROUP BY customer_id, customer_name
ORDER BY orders DESC LIMIT 10""",

"avg_discount":"SELECT round(avg(discount),4) avg_discount FROM dmart",

"unique_products_region":"""
SELECT region, count(distinct product_id) products
FROM dmart GROUP BY region ORDER BY products DESC""",

"profit_by_state":"""
SELECT state, round(sum(profit),2) total_profit
FROM dmart GROUP BY state ORDER BY total_profit DESC""",

"top_subcategory_sales":"""
SELECT coalesce(sub_category,'Unknown') sub_category,
       round(sum(sales_amount),2) total_sales
FROM dmart GROUP BY sub_category ORDER BY total_sales DESC LIMIT 10""",

"avg_age_segment":"""
SELECT segment, round(avg(age),2) avg_age
FROM dmart GROUP BY segment""",

# ‚úÖ FIXED shipping mode query (ship_mode)
"orders_by_shipmode":"""
SELECT coalesce(ship_mode,'Unknown') ship_mode,
       count(order_id) orders
FROM dmart
GROUP BY ship_mode
ORDER BY orders DESC""",

"qty_by_city":"""
SELECT city, sum(quantity) qty
FROM dmart GROUP BY city ORDER BY qty DESC LIMIT 10""",

"profit_margin_segment":"""
SELECT segment,
round(sum(profit),2) profit,
round(sum(sales_amount),2) sales,
round(sum(profit)/sum(sales_amount),4) margin
FROM dmart GROUP BY segment ORDER BY margin DESC"""
}

for n,q in queries.items(): run(n,q)

print("\n‚úÖ ALL DONE ‚Äî Output saved in /content/output/")


‚úÖ Spark version: 3.5.1
‚úÖ Found: /content/Product.csv
‚úÖ Found: /content/data/Sales.csv
‚úÖ Found: /content/Customer.csv
üìå Product Columns: ['product_id', 'category', 'sub_category', 'product_name']
üìå Sales Columns: ['order_line', 'order_id', 'order_date', 'ship_date', 'ship_mode', 'customer_id', 'product_id', 'sales', 'quantity', 'discount', 'profit']
üìå Customer Columns: ['customer_id', 'customer_name', 'segment', 'age', 'country', 'city', 'state', 'postal_code', 'region']

üìä ---- sales_by_category ----
+---------------+-----------+
|category       |total_sales|
+---------------+-----------+
|Technology     |836154.03  |
|Furniture      |741999.8   |
|Office Supplies|719047.03  |
+---------------+-----------+


üìä ---- top_customer_orders ----
+-----------+-------------------+------+
|customer_id|customer_name      |orders|
+-----------+-------------------+------+
|WB-21850   |William Brown      |37    |
|JL-15835   |John Lee           |34    |
|MA-17560   |Matt Abel

# New Section