In [None]:
from pyspark.sql.functions import col, lit, rand, expr, sum, avg, count, when, round
from pyspark.sql.window import Window
import random
from datetime import datetime, timedelta
from delta.tables import DeltaTable
import os

print("Databricks SparkSession is ready with Delta Lake support.")

In [None]:
# Sample Product Metadata
product_data = [
    ("SKU001", "Medical Device", "Box", "MfgA", "Warehouse", 1000.00, 0.40),
    ("SKU002", "Disposable Glove", "Bag", "MfgB", "Direct Ship", 5.00, 0.20),
    ("SKU003", "Surgical Mask", "Box", "MfgC", "Warehouse", 20.00, 0.35),
    ("SKU004", "Bandage Roll", "Roll", "MfgA", "Warehouse", 15.00, 0.25),
    ("SKU005", "Syringe", "Pack", "MfgB", "Direct Ship", 0.50, 0.15),
    ("SKU006", "Sterile Wipe", "Pack", "MfgC", "Warehouse", 10.00, 0.30),
    ("SKU007", "Diagnostic Kit", "Kit", "MfgA", "Warehouse", 500.00, 0.45),
    ("SKU008", "Cleaning Solution", "Bottle", "MfgD", "Direct Ship", 25.00, 0.22),
]
product_schema = ["SKU", "Category", "Packaging", "Manufacturer", "FulfillmentMethod", "BasePrice", "CostMargin"]
products_df = spark.createDataFrame(product_data, schema=product_schema)

# Sample Transactional Data
import random
from datetime import datetime, timedelta

start_date = datetime(2024, 1, 1)
skus = [p[0] for p in product_data]
categories = [p[1] for p in product_data]
customer_locations = ["NY", "CA", "TX", "FL", "IL"]

transaction_data = []
for _ in range(1000): # 1000 sample transactions
    sku = random.choice(skus)
    category = products_df.filter(col("SKU") == sku).select("Category").first()[0]
    quantity = random.randint(1, 20)
    price_paid = products_df.filter(col("SKU") == sku).select("BasePrice").first()[0] * (1 + (random.random() * 0.1 - 0.05))
    timestamp = start_date + timedelta(days=random.randint(0, 180), hours=random.randint(0, 23))
    customer_location = random.choice(customer_locations)
    impressions = random.randint(100, 1000)
    add_to_cart = random.randint(int(impressions * 0.1), int(impressions * 0.5))
    conversions = 1 if add_to_cart > 0 and random.random() > 0.1 else 0

    transaction_data.append((sku, price_paid, quantity, timestamp, customer_location, impressions, add_to_cart, conversions))

transaction_schema = ["SKU", "PricePaid", "Quantity", "Timestamp", "CustomerLocation", "Impressions", "AddToCart", "Conversions"]
transactions_df = spark.createDataFrame(transaction_data, schema=transaction_schema)

products_df.display() # Use .display() in Databricks notebooks for rich output
transactions_df.display()

In [None]:
BRONZE_PATH = "/Volumes/workspace/default/myvol/data_lake/bronze"
SILVER_PATH = "/Volumes/workspace/default/myvol/data_lake/silver"
GOLD_PATH = "/Volumes/workspace/default/myvol/data_lake/gold"


# Paths for tables
products_raw_path = f"{BRONZE_PATH}/products_raw"
transactions_raw_path = f"{BRONZE_PATH}/transactions_raw"

# Check and write `products_raw`
if DeltaTable.isDeltaTable(spark, products_raw_path):
    print(f"Delta table exists at {products_raw_path}")
else:
    # Write and create Delta table
    products_df.write.format("delta").mode("overwrite").save(products_raw_path)
    print(f"Created Delta table at {products_raw_path}")

# Check and write `transactions_raw`
if DeltaTable.isDeltaTable(spark, transactions_raw_path):
    print(f"Delta table exists at {transactions_raw_path}")
else:
    transactions_df.write.format("delta").mode("overwrite").save(transactions_raw_path)
    print(f"Created Delta table at {transactions_raw_path}")

print("Raw data saved to Databricks Delta Lake (Bronze layer)")

In [None]:
#Read from bronzre layer tables and write to silver layer
print("Transactions raw path:", transactions_raw_path)
if ((os.path.exists(transactions_raw_path))== "False"):
    print("Exists on DBFS:", os.path.exists(transactions_raw_path))
else:
    print("Does not exist on DBFS:", os.path.exists(transactions_raw_path))
    transactions_df.write.format("delta").mode("overwrite").save(transactions_raw_path)
    print(f"Re-created Delta table at {transactions_raw_path}")

print("Exists on DBFS:", os.path.exists(transactions_raw_path))

if DeltaTable.isDeltaTable(spark, transactions_raw_path):
    print("Delta table exists at", transactions_raw_path)
else:
    print("No Delta table found at", transactions_raw_path)
# Read Bronze layer tables
products_bronze_df = spark.read.format("delta").load(products_raw_path)
transactions_bronze_df = spark.read.format("delta").load(transactions_raw_path)

# Aggregate transactional metrics per SKU
agg_txn_df = transactions_bronze_df.groupBy("SKU") \
                          .agg(
                              sum("Quantity").alias("TotalQuantitySold"),
                              sum("Impressions").alias("TotalImpressions"),
                              sum("AddToCart").alias("TotalAddToCart"),
                              sum("Conversions").alias("TotalConversions")
                          )

# Join products to transactions aggregates
joined_df = agg_txn_df.join(
    products_bronze_df.select("SKU", "Category", "BasePrice", "CostMargin"),
    on="SKU",
    how="inner"
)

# Safely calculate ConversionRate (%) avoiding division by zero
joined_df = joined_df.withColumn(
    "ConversionRate",
    when(col("TotalImpressions") > 0, (col("TotalConversions") / col("TotalImpressions")) * 100).otherwise(0)
)

# Calculate anchor thresholds: 90th percentile volume, fixed conversion rate and margin
anchor_threshold_volume_percentile = joined_df.approxQuantile("TotalQuantitySold", [0.9], 0.01)[0]
anchor_threshold_conversion_rate = 20.0
anchor_threshold_margin = 0.40 # 40%

# Flag anchor products based on criteria
processed_products_df = joined_df.withColumn(
    "IsAnchor",
    when(col("TotalQuantitySold") >= anchor_threshold_volume_percentile, True)
    .when(col("ConversionRate") >= anchor_threshold_conversion_rate, True)
    .when(col("CostMargin") >= anchor_threshold_margin, True)
    .otherwise(False)
)

print("\nAnchor products identified:")
display(processed_products_df.filter(col("IsAnchor") == True)) # Databricks-specific function

# Write processed products to Silver Layer Delta table
silver_output_path = f"{SILVER_PATH}/processed_products"
processed_products_df.write.format("delta").mode("overwrite").save(silver_output_path)
print(f"Processed products saved to Databricks Delta Lake (Silver layer) at {silver_output_path}")

In [None]:
# Suggest Pricing for Non-Anchor Products (Anchor-Aware Price Tiering) ---
silver_products_df = spark.read.format("delta").load(silver_output_path)

# Calculate average anchor price per category (influencer for non-anchors)
anchor_category_prices = silver_products_df.filter(col("IsAnchor") == True) \
                                             .groupBy("Category") \
                                             .agg(avg("BasePrice").alias("AvgAnchorPrice"))

# Join back to all products to get anchor context
products_with_anchor_context_df = silver_products_df.join(anchor_category_prices, "Category", "left")

# Implement Pricing Logic for Non-Anchor Products
min_desired_margin = 0.18 # Example: ensure at least 18% margin

final_pricing_df = products_with_anchor_context_df.withColumn("SuggestedPrice",
    when(col("IsAnchor") == True, col("BasePrice")) # Anchors retain their base price for this MVP
    .otherwise(
        when(col("AvgAnchorPrice").isNotNull(),
            # Rule for non-anchors in categories with anchors: Adjust around base price while ensuring min margin
            expr(f"GREATEST(BasePrice * (1 - rand() * 0.1 + 0.05), BasePrice / (1 - CostMargin) * {min_desired_margin} + BasePrice)")
        ).otherwise(
            # Rule for non-anchors in categories without anchors: Simple adjustment around base price ensuring min margin
            expr(f"GREATEST(BasePrice * (1 - rand() * 0.1 + 0.05), BasePrice / (1 - CostMargin) * {min_desired_margin} + BasePrice)")
        )
    )
).withColumn("SuggestedPrice", round(col("SuggestedPrice"), 2))

print("\nFinal Suggested Pricing:")
final_pricing_df.select("SKU", "Category", "BasePrice", "IsAnchor", "SuggestedPrice", "CostMargin", "ConversionRate").display(truncate=False)


In [None]:
#Write Results to Delta Lake (Gold Layer) ---
output_df = final_pricing_df.select("SKU", "Category", "BasePrice", "IsAnchor", "SuggestedPrice", "CostMargin", "ConversionRate")
gold_output_path = f"{GOLD_PATH}/dynamic_pricing_results"
output_df.write.format("delta").mode("overwrite").save(gold_output_path)

print(f"\nDynamic pricing results written to Databricks Delta Lake (Gold layer) at: {gold_output_path}")

print("\nVerifying data by reading from Gold Delta Lake:")
read_gold_df = spark.read.format("delta").load(gold_output_path)
read_gold_df.display(5)