# Retail Sales Analytics Pipeline (Databricks / Local Jupyter)

This notebook includes:
- Instructions to use the sample data
- PySpark pipeline (Bronze → Silver → Gold)
- Aggregations and visualizations (Plotly)

This notebook is ready to run in **Databricks Community Edition** (import the `.ipynb`)

In [0]:
Data uploaded to Unity Catalogue

When using Databricks Community Edition: upload the CSVs to Data -> Add Data -> DBFS FileStore (/FileStore/raw/)
Local sample data path: ./mock_data/sample_data


## 1) If you haven't generated sample data yet
Run the provided generator script `mock_data/generate_sales_data.py` (it produces ~100,000 sales rows):

```
python mock_data/generate_sales_data.py --rows 100000
```


In [0]:
# PySpark pipeline (works in Databricks notebooks)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, round as _round, month, year

spark = SparkSession.builder.appName('RetailSalesPipeline').getOrCreate()

# In Databricks, upload CSVs to /FileStore/raw/ and use paths like '/FileStore/raw/sales_transactions.csv'
sales_path = 'dbfs:/Volumes/workspace/default/mydata/sales_transactions.csv'  # change to LOCAL_SAMPLE_DIR + '/sales_transactions.csv' if running locally and have pyspark
customers_path = 'dbfs:/Volumes/workspace/default/mydata/customers.csv'
products_path = 'dbfs:/Volumes/workspace/default/mydata/products.csv'

sales_df = spark.read.option('header', True).csv(sales_path)
customers_df = spark.read.option('header', True).csv(customers_path)
products_df = spark.read.option('header', True).csv(products_path)

display(sales_df.limit(5))


transaction_id,date,product_id,customer_id,quantity,price
T0000001,2025-07-22,P2842,C108225,2,70.12
T0000002,2025-08-11,P2222,C115674,1,228.73
T0000003,2024-09-10,P2566,C112749,4,466.19
T0000004,2025-01-04,P2728,C110241,1,172.29
T0000005,2024-12-21,P2688,C110983,1,77.85


In [0]:
# Bronze -> Silver: cleaning
clean_sales_df = sales_df.dropna(subset=['price','quantity']) \
    .withColumn('price', col('price').cast('double')) \
    .withColumn('quantity', col('quantity').cast('int')) \
    .filter(col('price') > 0)

display(clean_sales_df.limit(15))
# Silver -> Gold
#display(clean_sales_df.limit(5))
# Save to Delta in Databricks; use parquet if Delta is restricted
clean_sales_df.write.mode('overwrite').parquet('dbfs:/Volumes/workspace/default/mydata/processed/clean_sales')


transaction_id,date,product_id,customer_id,quantity,price
T0000001,2025-07-22,P2842,C108225,2,70.12
T0000002,2025-08-11,P2222,C115674,1,228.73
T0000003,2024-09-10,P2566,C112749,4,466.19
T0000004,2025-01-04,P2728,C110241,1,172.29
T0000005,2024-12-21,P2688,C110983,1,77.85
T0000006,2025-03-11,P2300,C106385,1,84.31
T0000007,2024-03-17,P2291,C106829,4,41.0
T0000008,2025-07-11,P2725,C117498,1,435.86
T0000009,2024-09-21,P2834,C113671,1,67.16
T0000010,2025-02-15,P2584,C115925,1,353.95


In [0]:
%python
# Rename columns to avoid duplicate 'name'
products_df_renamed = products_df.withColumnRenamed('name', 'product_name')
customers_df_renamed = customers_df.withColumnRenamed('name', 'customer_name')

# Enrich (join)
enriched = (
    clean_sales_df
    .join(products_df_renamed, 'product_id', 'left')
    .join(customers_df_renamed, 'customer_id', 'left')
)

display(enriched.limit(5))
enriched.write.mode('overwrite').parquet('dbfs:/Volumes/workspace/default/mydata/processed/enriched_sales')


customer_id,product_id,transaction_id,date,quantity,price,product_name,category,brand,customer_name,city,region
C108225,P2842,T0000001,2025-07-22,2,70.12,Product_D2HE,Groceries,BrandB,Wadtt,Puebla,North
C115674,P2222,T0000002,2025-08-11,1,228.73,Product_I5O1,Home,BrandF,Twcbsjwfzw,León,North
C112749,P2566,T0000003,2024-09-10,4,466.19,Product_WBZE,Toys,BrandD,Hwfpwd,Cancún,East
C110241,P2728,T0000004,2025-01-04,1,172.29,Product_IPQ7,Clothing,BrandD,Roram,Mexico City,West
C110983,P2688,T0000005,2024-12-21,1,77.85,Product_1COQ,Home,BrandF,Jxhftshuld,Puebla,North


In [0]:
# Aggregations (Gold)
revenue_by_product = enriched.groupBy('product_id', 'product_name', 'category') \
    .agg(_round(_sum(col('price') * col('quantity')), 2).alias('total_revenue'))

revenue_by_region = enriched.groupBy('region') \
    .agg(_round(_sum(col('price') * col('quantity')), 2).alias('region_revenue'))

display(revenue_by_product.orderBy(col('total_revenue').desc()).limit(10))


product_id,product_name,category,total_revenue
P2990,Product_H4E8,Beauty,57348.73
P2856,Product_0VMV,Sports,57047.97
P2616,Product_P1B7,Stationery,54702.57
P2185,Product_0K94,Sports,54432.25
P2658,Product_QS2V,Sports,53672.53
P2238,Product_PTV9,Clothing,52791.24
P2675,Product_LJLT,Electronics,52733.12
P2418,Product_IDJ1,Beauty,52482.98
P2536,Product_5JAO,Groceries,52281.54
P2701,Product_TJVB,Electronics,51874.08


In [0]:
# Export a summary CSV for external visualization
revenue_by_region.coalesce(1).write.mode('overwrite').csv('dbfs:/Volumes/workspace/default/mydata/processed/gold/revenue_by_region', header=True)
print('Saved revenue_by_region to dbfs:/Volumes/workspace/default/mydata/processed/gold/revenue_by_region')


Saved revenue_by_region to dbfs:/Volumes/workspace/default/mydata/processed/gold/revenue_by_region


## Visualization (Plotly) - works in local Jupyter and Databricks (with HTML output)


In [0]:
import plotly.express as px
pdf = revenue_by_region.toPandas()
fig = px.bar(pdf, x='region', y='region_revenue', title='Revenue by Region')
fig.show()


## Notes
- Databricks Community Edition can import this `.ipynb` directly.
- To use the notebook in Databricks: Upload the sample CSVs to **Data -> Add Data -> DBFS FileStore -> /FileStore/raw/** and then run the notebook cells.
