READING CSV AND FILTERING NEEDED COLUMNS

In [0]:
df = spark.read.option("header", "true").csv(location).select("SKU", "Category", "Qty", "Amount", "Date")
display(df)

CASTING DATATYPES

In [0]:
from pyspark.sql.functions import col
df = df.withColumn("Qty", col("Qty").cast("int")) \
       .withColumn("Amount", col("Amount").cast("double"))

df.printSchema()


SIMULATING THE BASE COST - .i.e. COST FROM SUPPLIER

In [0]:
from pyspark.sql.functions import rand
df = df.withColumn(
    "BaseCost",
    col("Amount") * (1 - (rand() * 0.1 + 0.1))  # Random 10–20% less
)



Group by SKU & Category to get sales summary

In [0]:
from pyspark.sql.functions import sum as spark_sum, avg

sales_summary = (
    df.groupBy("SKU", "Category")
      .agg(
          spark_sum("Qty").alias("TotalUnitsSold"),
          spark_sum("Amount").alias("TotalRevenue"),
          spark_sum("BaseCost").alias("TotalCost"),
          avg("Amount").alias("AvgPrice"),
          avg("BaseCost").alias("AvgBaseCost")
      )
)

sales_summary = sales_summary.withColumn(
    "GrossMarginValue",
    col("TotalRevenue") - col("TotalCost")
).withColumn(
    "MarginPct",
    (col("AvgPrice") - col("AvgBaseCost")) / col("AvgPrice")
)

sales_summary.show(5)


Compute margin % and rank SKUs within category

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, when

windowSpec = Window.partitionBy("Category").orderBy(col("GrossMarginValue").desc())

sales_summary = sales_summary.withColumn(
    "SalesRank",
    row_number().over(windowSpec)
).withColumn(
    "IsAnchor",
    when(col("SalesRank") == 1, True).otherwise(False)
)

sales_summary.show(5)


Get anchor price for each category & join back

In [0]:
anchor_prices = sales_summary.filter(col("IsAnchor") == True).select(
    "Category",
    col("AvgPrice").alias("AnchorAvgPrice")
)

pricing_df = sales_summary.join(anchor_prices, on="Category", how="left")

pricing_df.show(5)


Apply final rule: 5% uplift for anchors, anchor minus 5% for non-anchors

In [0]:
pricing_df = pricing_df.withColumn(
    "SuggestedPrice",
    when(
        col("IsAnchor") == True,
        col("AvgPrice") * 1.05  # Anchor uplift
    ).otherwise(
        col("AnchorAvgPrice") * 1.05 * 0.95  # Non-anchor: anchor uplift -5%
    )
)

pricing_df.select(
    "SKU", "Category", "IsAnchor", "AvgPrice", "SuggestedPrice"
).show(5)


In [0]:
pricing_df.write.format("delta").mode("overwrite").option("path", path).saveAsTable("brz_ghq_sales_flatfile_gcsi.sales")