# Pareto Principle
- 80% of your sales comes from 20% of your products

In [0]:
%sql
-- Switch to my Catalog
USE CATALOG workspace;

-- Create schema if not exists
CREATE SCHEMA IF NOT EXISTS sql_pyspark_practice;

-- Use this schema
USE sql_pyspark_practice;

In [0]:
%sql
-- display(
--     spark.sql(
--         """
--         select sum(sales) * 0.8 as discounted_sales
--         from orders
--         """
--     )
-- )


-- 80% --> 1837760.7771199604

with product_wise_sales as (
  select product_id, sum(sales) as product_sales
  from orders
  group by product_id
), calc_sales as(
select  product_id, product_sales,
        sum(product_sales) over(order by product_sales desc) as running_sales,
        0.8*sum(product_sales) over() as total_sales
from product_wise_sales
)

select * from calc_sales
where running_sales <= total_sales;

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

orders_df = spark.table("orders")

# Step 1: Compute total sales per product
product_wise_sales = (
    orders_df.groupBy("product_id")
             .agg(F.sum("sales").alias("product_sales"))
)

# Step 2: Window for running cumulative sales (sorted by sales desc)
running_window = Window.orderBy(F.desc("product_sales")) \
                       .rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Step 3: Window for total sales (all rows)
total_window = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

calc_sales = (
    product_wise_sales
        .withColumn("running_sales", F.sum("product_sales").over(running_window))
        .withColumn("total_sales", 0.8 * F.sum("product_sales").over(total_window))
)

# Step 4: Filter products contributing to 80% of sales
result = calc_sales.filter(F.col("running_sales") <= F.col("total_sales"))

display(result)


# 1) Deep analysis of the SQL you provided (short summary)

Your SQL computes product-level sales, sorts products by sales (highest first), computes a running (cumulative) sum of those product sales, computes 80% of the total sales, and returns all products whose cumulative running sum is **≤ 80% of total sales**.

This is the classic “find top items that contribute to X% of total” (Pareto / 80/20 style) problem.

---

# 2) Reconstructed original problem (from the SQL logic)

**Reconstructed problem:**
“Given the `orders` table with a `product_id` and `sales` columns (each order row contains a sale amount), find the list of products that together account for the top 80% of total sales. Order products by sales descending and include products until the cumulative (running) sales reaches 80% of the grand total.”

---

# 3) Problem explained in plain English

You have many orders for different products. You want to know which products — when you take them from highest-selling to lower-selling — make up 80% of your total revenue. This helps you identify the small number of products that generate most of your revenue.

---

# 4) Why the problem matters & what concepts it tests

**Why it matters**

* Business: helps prioritize inventory, marketing, pricing, and account management on the products with highest impact.
* Data analysis: identifies “vital few” vs “trivial many” (Pareto principle).
* Operationally: helps focus resources on products that drive most revenue.

**Concepts it tests**

* Aggregation: grouping and summing.
* Window functions: cumulative (running) sums and global totals.
* Sorting and ranking.
* Filtering after window calculations.
* Edge-case handling (ties, equal cumulative threshold).

---

# 5) Logical thinking / general methodology to solve this kind of problem

1. Aggregate raw transactions to the unit you need (here: product-level total sales).
2. Sort those units from most important to least (descending by product sales).
3. Compute cumulative sum over that sorted order (running total).
4. Compute the threshold value you want to compare against (e.g., 0.8 * grand_total).
5. Select rows where cumulative sum ≤ threshold (or choose tie-handling rule).
6. Optionally, adjust for ties or include the item that crosses the threshold depending on business needs.

This pattern applies to any “top X% contributors” question (customers, SKUs, categories, channels).

---

# 6) Break the SQL into steps and explain each step in detail

Original SQL (condensed):

```sql
with product_wise_sales as (
  select product_id, sum(sales) as product_sales
  from orders
  group by product_id
),
calc_sales as (
  select product_id, product_sales,
         sum(product_sales) over(order by product_sales desc) as running_sales,
         0.8*sum(product_sales) over() as total_sales
  from product_wise_sales
)
select * from calc_sales
where running_sales <= total_sales;
```

Step-by-step explanation:

1. `product_wise_sales` CTE:

   * `select product_id, sum(sales) as product_sales from orders group by product_id`
   * Purpose: collapse `orders` to one row per `product_id` with its total sales across all orders.

2. `calc_sales` CTE:

   * `sum(product_sales) over(order by product_sales desc) as running_sales`

     * This is a window function computing a running (cumulative) sum of `product_sales` as we move down rows ordered by `product_sales` descending. Each row’s `running_sales` is the sum of product_sales from the highest-selling product down to that row.
     * Important: if multiple products have the same `product_sales`, behavior depends on DB specifics — the cumulative sum will include all rows up to current row in the ORDER BY ordering; tie ordering may be arbitrary unless extra tie-breaker columns are specified.
   * `0.8*sum(product_sales) over() as total_sales`

     * `sum(product_sales) over()` with an empty `OVER()` computes the global sum (same value repeated on every row). Multiplying by 0.8 yields the 80% threshold. (Equivalent to: `0.8 * (select sum(product_sales) from product_wise_sales)`.)
   * The `from product_wise_sales` provides `product_id` and `product_sales`.

3. Final SELECT & filter:

   * `select * from calc_sales where running_sales <= total_sales;`
   * This returns every product whose cumulative running sum is less than or equal to the 80% threshold.

Note: This logic includes products as long as the cumulative total up to that product remains ≤ 80%. The item that first pushes running_sales above 80% is excluded (because `<=`), which may be desired or not depending on business rule. An alternative is to include the product that causes cumulative >= threshold if you want to ensure at least 80% coverage.

---

# 7) Rewritten SQL solution — clean, readable, commented

```sql
-- Find products that cumulatively make up 80% of total sales

WITH product_sales AS (
  -- 1) total sales per product
  SELECT
    product_id,
    SUM(sales) AS product_sales
  FROM orders
  GROUP BY product_id
),

sales_with_cumulative AS (
  -- 2) compute cumulative product_sales (highest -> lowest) and the 80% threshold
  SELECT
    product_id,
    product_sales,
    -- running (cumulative) sum ordered by product_sales descending
    SUM(product_sales) OVER (ORDER BY product_sales DESC
                             ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_sales,
    -- total sales * 0.8 (same value repeated for every row)
    0.8 * SUM(product_sales) OVER () AS threshold_80pct
  FROM product_sales
)

-- 3) pick products until cumulative sales reach <= 80% of total sales
SELECT
  product_id,
  product_sales,
  running_sales,
  threshold_80pct
FROM sales_with_cumulative
WHERE running_sales <= threshold_80pct
ORDER BY product_sales DESC;
```

Key improvements:

* Named CTEs and columns clearly.
* Explicit `ROWS BETWEEN` clause added to make running-window intent clear.
* `threshold_80pct` naming clarifies purpose.
* Final `ORDER BY product_sales DESC` for readability.

---

# 8) Convert the SQL into equivalent PySpark DataFrame code

Below is PySpark DataFrame code that reproduces the SQL logic. I assume you already have a Spark `DataFrame` named `orders_df` with at least `product_id` and `sales` columns.

```python
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 1) Aggregate to product-level sales
product_sales_df = (
    orders_df
    .groupBy("product_id")
    .agg(F.sum("sales").alias("product_sales"))
)

# 2) Define windows:
#    - window_running: order by product_sales descending, cumulative from start to current row
#    - window_total: no partition, used to compute total across all rows
window_running = Window.orderBy(F.desc("product_sales")).rowsBetween(Window.unboundedPreceding, Window.currentRow)
window_total = Window.partitionBy()  # partitionBy() with no columns = global window

# 3) Add running cumulative and 80% threshold
sales_with_cum_df = (
    product_sales_df
    .withColumn("running_sales", F.sum("product_sales").over(window_running))
    .withColumn("threshold_80pct", 0.8 * F.sum("product_sales").over(window_total))
)

# 4) Filter rows where running_sales <= threshold_80pct
top_80pct_products_df = (
    sales_with_cum_df
    .filter(F.col("running_sales") <= F.col("threshold_80pct"))
    .orderBy(F.desc("product_sales"))
)

# top_80pct_products_df now matches the SQL result
```

---

# 9) Explain the PySpark code line-by-line

1. `from pyspark.sql import functions as F`

   * Short alias for commonly used Spark SQL functions.

2. `from pyspark.sql.window import Window`

   * Import to create window specifications for window functions.

3. `product_sales_df = (orders_df.groupBy("product_id").agg(F.sum("sales").alias("product_sales")))`

   * Groups `orders_df` by `product_id` and computes `product_sales` (sum of sales per product). Equivalent to the first SQL CTE.

4. `window_running = Window.orderBy(F.desc("product_sales")).rowsBetween(Window.unboundedPreceding, Window.currentRow)`

   * Defines a window that orders rows by `product_sales` descending and specifies the frame for cumulative sums: from the first row in the partition to the current row (running cumulative).

5. `window_total = Window.partitionBy()`

   * A global window (no partition columns) so that `sum(product_sales).over(window_total)` returns the grand total repeated for every row.

6. `.withColumn("running_sales", F.sum("product_sales").over(window_running))`

   * Adds a column `running_sales` containing cumulative sum of `product_sales` up to the current row in the descending order.

7. `.withColumn("threshold_80pct", 0.8 * F.sum("product_sales").over(window_total))`

   * Adds a column `threshold_80pct` that is 80% of the global sum of `product_sales`. Constant across rows.

8. `.filter(F.col("running_sales") <= F.col("threshold_80pct"))`

   * Keeps only products where the cumulative sum is ≤ threshold (same logic as SQL `WHERE`).

9. `.orderBy(F.desc("product_sales"))`

   * Orders the final output with highest-selling products first (matches how the cumulative was computed).

**Note about collect()**: I avoided using `.collect()` to retrieve total into driver; instead I computed grand total via a window (`Window.partitionBy()`), which keeps everything in the distributed execution and avoids driver roundtrips. That matches the SQL approach.

---

# 10) How thinking/process differs between SQL and PySpark

* **Declarative vs. Imperative pipeline**

  * SQL: you describe *what* you want (aggregations, window computations) and the engine decides execution. You write queries and CTEs to express transformations.
  * PySpark: you build a transformation pipeline step-by-step, chaining DataFrame operations. It looks more imperative but still lazy (Spark optimizes later).

* **Windows & frames**

  * SQL window syntax is compact; PySpark requires constructing `Window` objects and using them with `over()`. Conceptually identical but syntactically different.

* **Intermediate results**

  * SQL uses CTEs for readability; PySpark uses intermediate DataFrames (variables). Both help readability, but in PySpark you explicitly name and reuse DataFrames.

* **Global totals**

  * In SQL you can use `SUM(...) OVER()` or a subquery. In PySpark you can either compute constants via `.agg(...).collect()` or use `Window.partitionBy()` to get global totals without driver roundtrip. Using a window keeps computation distributed.

* **Tie-breaking and deterministic ordering**

  * SQL ordering ties can be ambiguous without additional columns. Always think about tie-breakers when exact deterministic behavior is required. Same in PySpark: add more columns to `orderBy()` window if needed.

* **Execution & debugging**

  * SQL returns results immediately (on query run). In PySpark, operations are lazy until an action triggers execution (e.g., `.show()`, `.collect()`). For debugging, PySpark often requires small `.show()` checks of intermediate DataFrames.

---

# 11) Hints to solve it without seeing the solution

### 3 Basic Hints

1. First aggregate by product to get the total sales per product — don’t try to do running sums on the raw `orders` table.
2. Sort products from largest `product_sales` to smallest — cumulative sums only make sense on an ordered list.
3. Compute the grand total once (or compute 80% of total) and compare the running cumulative sum against that threshold.

### 3 Advanced Hints

1. Use a window function to compute the running (cumulative) sum. The frame should be “from start to current row” so you accumulate as you go.
2. To compute the 80% threshold without a separate subquery, use a window with no partition (`OVER ()`) to get the global sum repeated on every row. Multiply it by 0.8.
3. Think about ties: if multiple products have equal `product_sales`, decide whether to: (a) include all tied rows that keep you ≤80%, (b) include the first tied row that crosses the threshold, or (c) include the whole group. Implement tie-breaker logic explicitly if determinism matters.

---

# 12) Final SQL and final PySpark solutions (again)

## Final SQL (clean, commented)

```sql
WITH product_sales AS (
  SELECT product_id,
         SUM(sales) AS product_sales
  FROM orders
  GROUP BY product_id
),

sales_with_cumulative AS (
  SELECT
    product_id,
    product_sales,
    SUM(product_sales) OVER (ORDER BY product_sales DESC
                             ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_sales,
    0.8 * SUM(product_sales) OVER () AS threshold_80pct
  FROM product_sales
)

SELECT product_id,
       product_sales,
       running_sales,
       threshold_80pct
FROM sales_with_cumulative
WHERE running_sales <= threshold_80pct
ORDER BY product_sales DESC;
```

> If you prefer to **ensure at least 80% coverage** (i.e., include the product that first makes cumulative >= 80%), change the filter to:
> `WHERE running_sales - product_sales < threshold_80pct`
> This selects products whose cumulative before adding current product is < threshold — then you can include the product that crosses the threshold (or use `running_sales >= threshold_80pct` depending on exact inclusion rule).

---

## Final PySpark DataFrame code

```python
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# assume orders_df exists with columns: product_id, sales

# 1) product level aggregation
product_sales_df = (
    orders_df
    .groupBy("product_id")
    .agg(F.sum("sales").alias("product_sales"))
)

# 2) windows for cumulative and global total
window_running = Window.orderBy(F.desc("product_sales")).rowsBetween(Window.unboundedPreceding, Window.currentRow)
window_total   = Window.partitionBy()  # global window

# 3) add cumulative and threshold columns
sales_with_cum_df = (
    product_sales_df
    .withColumn("running_sales", F.sum("product_sales").over(window_running))
    .withColumn("threshold_80pct", 0.8 * F.sum("product_sales").over(window_total))
)

# 4) filter to top products that cumulatively add up to <= 80% of total sales
top_80pct_products_df = (
    sales_with_cum_df
    .filter(F.col("running_sales") <= F.col("threshold_80pct"))
    .orderBy(F.desc("product_sales"))
)

# Show result
top_80pct_products_df.show(truncate=False)
```

---

# 13) Final teaching notes — how to generalize and apply this approach yourself

* Pattern to remember: `aggregate -> sort -> cumulative -> threshold filter`.
* Window functions are your friend for running totals and global totals (`SUM(...) OVER (...)`).
* Be explicit about ordering and frames when computing cumulative sums to avoid accidental mistakes.
* Consider tie-handling: if two products have equal sales, your ordering might be non-deterministic — add secondary keys (e.g., product_id) to `ORDER BY` in windows if you need reproducible results.
* When implementing in distributed systems (Spark), prefer window-based total computations to avoid collecting large numbers to driver; use global windows (`Window.partitionBy()` with no columns) or compute aggregated scalars and broadcast if needed.
* Always test with a small dataset that includes ties, single large product, many small products, and verify whether you want ≤80% or ≥80% semantics.

---
