In [0]:

from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, DoubleType, DateType
)
from pyspark.sql.functions import col, sum as _sum, avg, when, to_date
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import os

spark = SparkSession.builder.getOrCreate()

# --- Configuration ---
# If you uploaded sales_orders.csv to Databricks, set csv_path accordingly.
# Common upload path: /FileStore/tables/sales_orders.csv
csv_path = "/FileStore/tables/sales_orders.csv"
output_dir = "/dbfs/FileStore/tables/sales_viz/"
os.makedirs(output_dir, exist_ok=True)

# --- 1. Load dataset into a PySpark DataFrame and display first 10 rows ---
# Define schema (order_date loaded as string then converted)
schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("region", StringType(), True),
    StructField("category", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("price", DoubleType(), True),
    StructField("order_date", StringType(), True)   # load as string then convert
])

# Try reading CSV if it exists; otherwise use inline sample data
df = None
try:
    # check if file exists in DBFS
    _ = spark.read.csv(csv_path, header=True, inferSchema=True).limit(1).collect()
    df = spark.read.csv(csv_path, header=True, schema=schema)
    # If order_date is already date type in CSV, convert safely
except Exception:
    # Inline fallback data (same as earlier examples)
    data = [
        ("O001", "South", "Electronics", 3, 15000.0, "2025-08-01"),
        ("O002", "North", "Clothing", 5, 2000.0, "2025-08-02"),
        ("O003", "East", "Furniture", 2, 12000.0, "2025-08-03"),
        ("O004", "West", "Electronics", 1, 18000.0, "2025-08-04"),
        ("O005", "North", "Furniture", 4, 9000.0, "2025-08-05"),
        ("O006", "South", "Clothing", 6, 2500.0, "2025-08-06"),
        ("O007", "East", "Electronics", 2, 17000.0, "2025-08-07")
    ]
    df = spark.createDataFrame(data, schema)

# Convert order_date string -> DateType (safe conversion)
df = df.withColumn("order_date", to_date(col("order_date"), "yyyy-MM-dd"))

# Show first 10 rows (Task 1)
print("First 10 rows:")
display(df.limit(10))

# --- 2. Create revenue column = quantity * price ---
df = df.withColumn("revenue", col("quantity") * col("price"))

# --- Prepare summary DataFrames needed for multiple tasks ---
# Category summary (total revenue per category)
category_summary = df.groupBy("category").agg(_sum("revenue").alias("total_revenue")).orderBy(col("total_revenue").desc())

# Region summary (total revenue per region)
region_summary = df.groupBy("region").agg(_sum("revenue").alias("total_revenue")).orderBy(col("total_revenue").desc())

# Date trend (revenue by order_date)
date_trend = df.groupBy("order_date").agg(_sum("revenue").alias("total_revenue")).orderBy("order_date")

# Quantity by category & region for stacked bar
qty_by_region_category = df.groupBy("region", "category").agg(_sum("quantity").alias("total_qty")).orderBy("region")

# --- 3. Bar chart: total revenue by category ---
cat_pdf = category_summary.toPandas()
plt.figure(figsize=(8,5))
plt.bar(cat_pdf['category'], cat_pdf['total_revenue'])
plt.xlabel("Category")
plt.ylabel("Total Revenue")
plt.title("Total Revenue by Category")
plt.tight_layout()
plt.savefig(output_dir + "bar_revenue_by_category.png")
plt.close()
display(category_summary)  # Databricks UI: choose Bar chart if needed

# --- 4. Line chart: revenue trend by order_date (sorted) ---
date_pdf = date_trend.toPandas()
plt.figure(figsize=(8,5))
plt.plot(date_pdf['order_date'], date_pdf['total_revenue'], marker='o', linestyle='-')
plt.xlabel("Order Date")
plt.ylabel("Total Revenue")
plt.title("Revenue Trend by Order Date")
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(output_dir + "line_revenue_trend.png")
plt.close()
display(date_trend)

# --- 5. Pie chart: percentage contribution of each region to total revenue ---
reg_pdf = region_summary.toPandas()
plt.figure(figsize=(6,6))
plt.pie(reg_pdf['total_revenue'], labels=reg_pdf['region'], autopct='%1.1f%%', startangle=90)
plt.title("Revenue Share by Region")
plt.tight_layout()
plt.savefig(output_dir + "pie_revenue_by_region.png")
plt.close()
display(region_summary)  # In Databricks, choose Pie Chart in display UI

# --- 6. Stacked bar: total quantity sold per category by region ---
pivot_qty = qty_by_region_category.toPandas().pivot(index="region", columns="category", values="total_qty").fillna(0)
pivot_qty.plot(kind="bar", stacked=True, figsize=(9,6))
plt.xlabel("Region")
plt.ylabel("Total Quantity")
plt.title("Quantity Sold per Category by Region (Stacked)")
plt.legend(title="Category", bbox_to_anchor=(1.02,1), loc="upper left")
plt.tight_layout()
plt.savefig(output_dir + "stacked_qty_by_category_region.png")
plt.close()
# Also display the pivot (Databricks pivot view)
display(spark.createDataFrame(pivot_qty.reset_index()))

# --- 7. Filter and visualize top 5 products by total revenue (horizontal bar) ---
# NOTE: dataset may not have a product column. If a 'product' column exists, use it; otherwise use 'category' as proxy.
cols = [c.lower() for c in df.columns]
if "product" in cols or "product_name" in cols:
    prod_col = "product" if "product" in cols else "product_name"
    prod_summary = df.groupBy(prod_col).agg(_sum("revenue").alias("total_revenue")).orderBy(col("total_revenue").desc()).limit(5)
else:
    # fallback: use category as proxy for "products"
    prod_summary = category_summary.limit(5)

prod_pdf = prod_summary.toPandas()
plt.figure(figsize=(8,5))
plt.barh(prod_pdf.iloc[:,0].astype(str), prod_pdf['total_revenue'])
plt.xlabel("Total Revenue")
plt.title("Top 5 Products (or Categories) by Revenue")
plt.tight_layout()
plt.savefig(output_dir + "top5_products_horizontal.png")
plt.close()
display(prod_summary)

# --- 8. Scatter plot: quantity vs price colored by category ---
pdf_scatter = df.select("quantity", "price", "category").toPandas()
plt.figure(figsize=(8,6))
categories = pdf_scatter['category'].unique()
colors = plt.cm.tab10(range(len(categories)))
for i, cat in enumerate(categories):
    sub = pdf_scatter[pdf_scatter['category'] == cat]
    plt.scatter(sub['quantity'], sub['price'], label=cat, alpha=0.8, s=60)
plt.xlabel("Quantity")
plt.ylabel("Price")
plt.title("Quantity vs Price (colored by Category)")
plt.legend(title="Category")
plt.tight_layout()
plt.savefig(output_dir + "scatter_qty_vs_price.png")
plt.close()

# --- 9. Average order value by region and plot as bar chart ---
avg_order_region = df.groupBy("region").agg(avg("revenue").alias("avg_order_value")).orderBy("region")
avg_pdf = avg_order_region.toPandas()
plt.figure(figsize=(8,5))
plt.bar(avg_pdf['region'], avg_pdf['avg_order_value'])
plt.xlabel("Region")
plt.ylabel("Average Order Value")
plt.title("Average Order Value by Region")
plt.tight_layout()
plt.savefig(output_dir + "avg_order_value_by_region.png")
plt.close()
display(avg_order_region)

# --- 10. Heatmap (pivot) showing revenue by region and category ---
heat_df = df.groupBy("region", "category").agg(_sum("revenue").alias("total_revenue")).toPandas()
pivot_heat = heat_df.pivot(index="region", columns="category", values="total_revenue").fillna(0)
plt.figure(figsize=(8,6))
im = plt.imshow(pivot_heat.values, aspect='auto', cmap='YlGn')
plt.colorbar(im, label='Revenue')
plt.xticks(range(len(pivot_heat.columns)), pivot_heat.columns, rotation=45)
plt.yticks(range(len(pivot_heat.index)), pivot_heat.index)
plt.title("Revenue by Region and Category (Heatmap)")
# annotate cells
for i in range(pivot_heat.shape[0]):
    for j in range(pivot_heat.shape[1]):
        plt.text(j, i, int(pivot_heat.values[i,j]), ha='center', va='center', color='black', fontsize=9)
plt.tight_layout()
plt.savefig(output_dir + "heatmap_revenue_region_category.png")
plt.close()
# Also show pivot as DataFrame in Databricks UI (pivot table)
display(spark.createDataFrame(pivot_heat.reset_index()))

# --- 11. Bonus: Apply discounts and plot discounted vs original revenue per category (grouped bar) ---
df = df.withColumn("discount", when(col("category") == "Electronics", 0.10)
                          .when(col("category") == "Clothing", 0.05)
                          .when(col("category") == "Furniture", 0.08)
                          .otherwise(0.0))
df = df.withColumn("discounted_revenue", col("revenue") * (1 - col("discount")))

disc_df = df.groupBy("category").agg(
    _sum("revenue").alias("original_revenue"),
    _sum("discounted_revenue").alias("discounted_revenue")
).orderBy(col("original_revenue").desc()).toPandas()

x = np.arange(len(disc_df))
width = 0.35
plt.figure(figsize=(8,5))
plt.bar(x - width/2, disc_df['original_revenue'], width, label='Original')
plt.bar(x + width/2, disc_df['discounted_revenue'], width, label='Discounted')
plt.xticks(x, disc_df['category'])
plt.xlabel("Category")
plt.ylabel("Revenue")
plt.title("Original vs Discounted Revenue by Category")
plt.legend()
plt.tight_layout()
plt.savefig(output_dir + "discounted_vs_original_by_category.png")
plt.close()

# --- 12. Save all visualizations as PNG files to output_dir (done above) ---
# List saved files for verification
saved_files = os.listdir(output_dir)
print("Saved visualization files:")
for f in saved_files:
    print(f"- {output_dir}{f}")

print("\nAll tasks completed. PNG files are available in /dbfs/FileStore/tables/sales_viz/")


First 10 rows:


order_id,region,category,quantity,price,order_date
O001,South,Electronics,3,15000.0,2025-08-01
O002,North,Clothing,5,2000.0,2025-08-02
O003,East,Furniture,2,12000.0,2025-08-03
O004,West,Electronics,1,18000.0,2025-08-04
O005,North,Furniture,4,9000.0,2025-08-05
O006,South,Clothing,6,2500.0,2025-08-06
O007,East,Electronics,2,17000.0,2025-08-07


category,total_revenue
Electronics,97000.0
Furniture,60000.0
Clothing,25000.0


order_date,total_revenue
2025-08-01,45000.0
2025-08-02,10000.0
2025-08-03,24000.0
2025-08-04,18000.0
2025-08-05,36000.0
2025-08-06,15000.0
2025-08-07,34000.0


region,total_revenue
South,60000.0
East,58000.0
North,46000.0
West,18000.0


region,Clothing,Electronics,Furniture
East,0.0,2.0,2.0
North,5.0,0.0,4.0
South,6.0,3.0,0.0
West,0.0,1.0,0.0


category,total_revenue
Electronics,97000.0
Furniture,60000.0
Clothing,25000.0


region,avg_order_value
East,29000.0
North,23000.0
South,30000.0
West,18000.0


region,Clothing,Electronics,Furniture
East,0.0,34000.0,24000.0
North,10000.0,0.0,36000.0
South,15000.0,45000.0,0.0
West,0.0,18000.0,0.0


Saved visualization files:
- /dbfs/FileStore/tables/sales_viz/avg_order_value_by_region.png
- /dbfs/FileStore/tables/sales_viz/bar_revenue_by_category.png
- /dbfs/FileStore/tables/sales_viz/discounted_vs_original_by_category.png
- /dbfs/FileStore/tables/sales_viz/heatmap_revenue_region_category.png
- /dbfs/FileStore/tables/sales_viz/line_revenue_trend.png
- /dbfs/FileStore/tables/sales_viz/pie_revenue_by_region.png
- /dbfs/FileStore/tables/sales_viz/scatter_qty_vs_price.png
- /dbfs/FileStore/tables/sales_viz/stacked_qty_by_category_region.png
- /dbfs/FileStore/tables/sales_viz/top5_products_horizontal.png

All tasks completed. PNG files are available in /dbfs/FileStore/tables/sales_viz/
