# 🧠 Business Context for PySpark Transformations in Retail

# Generate Inventory Dataset

In [0]:
!pip install faker

In [0]:
import pandas as pd
import numpy as np
import uuid
from faker import Faker
from datetime import datetime, timedelta

fake = Faker()
NUM_RECORDS = 50000
CATEGORIES = ["Electronics", "Apparel", "Groceries", "Toys", "Home", "Books", "Fitness"]

data = {
    "product_id": [str(uuid.uuid4()) for _ in range(NUM_RECORDS)],
    "product_name": [fake.word().capitalize() + " " + fake.word().capitalize() for _ in range(NUM_RECORDS)],
    "category": np.random.choice(CATEGORIES, NUM_RECORDS),
    "quantity": np.random.randint(0, 1000, NUM_RECORDS),
    "reorder_level": np.random.randint(50, 300, NUM_RECORDS),
    "unit_price": np.round(np.random.uniform(10.0, 10000.0, NUM_RECORDS), 2),
    "tags": [np.random.choice(["popular", "eco", "sale", "imported"], size=np.random.randint(1, 4), replace=False).tolist() for _ in range(NUM_RECORDS)],
    "store_id": np.random.randint(1, 10, NUM_RECORDS),
    "last_restock_date": [
        (datetime.today() - timedelta(days=np.random.randint(0, 90))).strftime("%Y-%m-%d")
        for _ in range(NUM_RECORDS)
    ]
}

df_gen = pd.DataFrame(data)
df_gen.to_csv("/Volumes/workspace/demo/retail_inventory/retail_inventory_faker.csv", index=False)
print("✅ Dataset saved to /dbfs/tmp/retail_inventory_faker.csv")

### 1. `filter()` – Low Stock Alert
SmartMart's Inventory Control Division is responsible for maintaining healthy stock levels. They’ve implemented a daily alerting system that flags any product where the available quantity is below the predefined reorder threshold. These alerts are used to trigger automated replenishment, especially for fast-moving items to prevent out-of-stock scenarios.

In [0]:
df_gena = spark.read.csv("/Volumes/workspace/demo/retail_inventory/retail_inventory_faker.csv", header=True, inferSchema=True)
df_gena.show()

In [0]:
from pyspark.sql import functions as F
df_gena.where(F.col('quantity') < F.col('reorder_level')).display()

### 2. `withColumn()` – Inventory Valuation
As part of its quarterly financial audit, SmartMart’s finance team calculates the total value of inventory held across all stores. This is critical for accurate working capital assessment and supports financial forecasting.

In [0]:
df_gena_withValue = df_gena.withColumn('held_value', F.col('quantity') * F.col('unit_price'))
df_gena_withValue.display()
df_gena_withValue.select('held_value').show()


### 3. `select()` – Product Reporting
To maintain consistent product data across systems, the IT Ops team prepares filtered exports of product IDs, names, and prices for integration with POS systems.

In [0]:
df_gena_withValue.select('product_id','product_name','store_id','held_value').display()

### 4. `groupBy() + agg()` – Average Price by Category
The procurement department needs insights on average prices per category to evaluate supplier performance and adjust procurement strategies during contract renewals.

In [0]:
df_gena_withValue.groupBy('category').avg('held_value').display()

### 5. `drop()` – Remove Redundant Column
In preparation for archiving data to a cold storage lake, technical columns like `tags` are removed to streamline schema and save storage costs.

In [0]:
df_gena_drop= df_gena_withValue.drop('tags')
df_gena_drop.display()


### 6. `distinct()` – Unique Categories
SmartMart's web team needs a clean, deduplicated list of product categories to update the e-commerce app's filter menus for improved customer experience.

Business Need:
SmartMart’s digital team updates the category filters in the mobile app weekly based on the product catalog.

Requirement:
Extract a deduplicated list of category values to refresh the frontend app filters used by customers during product search.

In [0]:
df_gena_drop.select("category").distinct().display()

### 7. `join()` – Geo-Enriched Inventory View
To generate regional inventory reports, store IDs from the inventory dataset are enriched with location metadata (city names) for business user-friendly dashboards.

Business Need:
Executives request regional inventory insights (e.g., "Top 5 stores in Mumbai with excess Electronics stock").

Requirement:
Join the inventory dataset (via store_id) with a separate store_metadata table (with store_id, city) to add human-readable location data.

## Creating the metadata_store table

In [0]:
data = [
    (1, "Mumbai"),
    (2, "Delhi"),
    (3, "Bengaluru"),
    (4, "Hyderabad"),
    (5, "Chennai"),
    (6, "Kolkata"),
    (7, "Pune"),
    (8, "Ahmedabad"),
    (9, "Jaipur")
]

# Define the schema
columns = ["store_id", "loc_name"]

# Create the DataFrame
store_metadata_df = spark.createDataFrame(data, schema=columns)

# Show the DataFrame
store_metadata_df.show()

In [0]:
df_gena_drop.join(store_metadata_df, on='store_id').display()

### 8. `orderBy()` – High-Value Inventory Review
The CFO identifies products that hold the highest inventory value. These products often tie up the most capital and are candidates for tighter inventory control.

Business Need:
The finance and operations teams need to identify high-value stock items tying up capital and prioritize their movement.

Requirement:
Sort products by descending inventory_value (calculated as quantity * unit_price) to review the top 10 highest holding SKUs.

### 9. `union()` – Combine Warehouse Snapshots
SmartMart's warehouses operate independently. Their datasets are unified into one view so analysts can generate national-level inventory reports.


Business Need:
Warehouse operations across regions report inventory separately. Management wants a consolidated view for national forecasting.

Requirement:
Use union() to combine multiple DataFrames from different warehouse snapshots into a unified DataFrame for aggregated analysis.

### 10. `explode()` – Tag Analysis for Marketing
Marketing uses product tags for campaign targeting. To evaluate tag performance individually, the nested tag array is exploded so that each tag can be analyzed separately.


Business Need:
Marketing teams run promotions based on product tags (e.g., "eco", "sale"). They need individual tag-level performance analytics.

Requirement:
Explode the tags array so that each tag becomes a separate row, enabling tag-specific reporting (e.g., products with "eco" tag sold 5% more).

### 11. `fillna()` – Handle Missing Prices
After legacy data migrations, some prices were missing. Instead of dropping rows, default values are set to ensure completeness for downstream calculations.


Business Need:
Legacy product data imported during mergers had missing price values, blocking certain analytics.

Requirement:
Replace missing values in the unit_price column with a default value (e.g., 0 or category average) to maintain data completeness.


### 12. `dropDuplicates()` – Remove Duplicate Products
Merging multiple regional systems caused duplicate product records. Deduplication ensures reporting accuracy and consistent SKU counts.

Business Need:
After integrating multiple systems, some products were duplicated, which caused double counting in reports.

Requirement:
Remove duplicate records based on a combination of product_id and store_id to maintain a clean, consistent master product dataset.