In [0]:
CREATE OR REPLACE TEMP VIEW sales_demo AS
SELECT * FROM VALUES
  ('2025-07-01', 'Apples', 10, 2.5, 'North'),
  ('2025-07-01', 'Oranges', 5, 3.0, 'North'),
  ('2025-07-02', 'Apples', 8, 2.5, 'South'),
  ('2025-07-02', 'Bananas', 15, 1.2, NULL),
  ('2025-07-03', 'Oranges', 7, 3.0, 'East')
AS sales(date, product, quantity, price_per_unit, region);

## 🚀 Using Python to Parameterize Complex Spark SQL Queries

In Databricks, combining Python with Spark SQL lets you build dynamic, reusable queries with parameters — perfect for complex analytics or pipeline workflows.

### Why parameterize?

- Reuse SQL code without hardcoding values
- Easily pass variables from Python logic (dates, IDs, filters)
- Build complex queries programmatically and debug more easily

---

### Example: Parameterizing a Complex Query


In [0]:
%python
# Define parameters in Python
start_date = '2025-07-01'
end_date = '2025-07-03'
region_filter = 'North'

# Use triple quotes for multiline SQL, f-string for injecting variables safely
query = f"""
SELECT
  date,
  product,
  quantity,
  price_per_unit,
  region,
  CASE 
    WHEN quantity > 10 THEN 'High'
    WHEN quantity > 5 THEN 'Medium'
    ELSE 'Low'
  END AS demand_level
FROM sales_demo
WHERE date BETWEEN '{start_date}' AND '{end_date}'
  AND region = '{region_filter}'
"""

# Run the query via spark.sql()
df = spark.sql(query)
display(df)

### 💡 Important !!

When building Delta Live Tables (DLT) pipelines or any scalable data workflows in Databricks, the ability to parameterize Spark SQL queries is **crucial**. We can, and we do, setup parameters in our JSON configuration file for DLT pipeline build. These parameters are used in table and pipeline creation.

### Key reasons:

- **Centralized logic management:**  
  Instead of duplicating SQL queries for each variation (e.g., different Sites or regions), you write one parameterized query. Changing it in one place automatically propagates to all use cases.

- **Dynamic pipeline building:**  
  Each DLT table is created using Spark SQL logic, but behind the scenes, this logic is often enriched with audit columns, business keys, or hashed surrogate keys (like MD5 of concatenated keys). Being able to inject parameters programmatically lets you apply these transformations consistently and dynamically.

- **Standardization and reusability:**  
  Standardization functions (e.g., for data cleansing or harmonization) can be applied dynamically within parameterized queries or UDFs. This avoids manual, repetitive updates and helps maintain data quality.

- **Simplified maintenance:**  
  When business requirements change (e.g., adding a new region or modifying audit logic), you only update the parameterized SQL once — no need to hunt down multiple hardcoded queries.


## 📦 Collecting Scalar Values from DataFrames Safely in PySpark
In this section, we demonstrate how to **safely extract a single value from a PySpark DataFrame** and use it in another operation — without using `.collect()`, which can overwhelm the driver with large data.

## 🧮 Goal:
Find the product with the **highest quantity sold** from a DataFrame.

---

## ✅ Why Avoid `.collect()`?
- `df.collect()` loads **all rows** into the driver — not efficient or safe for large datasets.
- Instead, use `.select()` and `.first()` to bring just **one row** (scalar value) to the driver.

---

## 🔍 Step-by-Step Logic:

1. **Select the maximum value** using Spark SQL function `max()`.
2. **Show and inspect** the intermediate result (optional debugging).
3. **Extract the max value** into a Python variable using `.first()`.
4. **Access the scalar** either by column name or index.
5. **Use the scalar in a filter**, then extract the product with the highest quantity.

---

## 💡 Use Case in Real Projects:
This is a key pattern when building **DLT pipelines** or **parameterized logic**:
- You often want to calculate a metric in Spark (e.g. latest date, highest score, etc.)
- Then use that scalar in further logic, conditions, or joins.

---

### 🛡️ Summary Table

| ✅ Best Practice                  | ❌ Risky Practice              |
|----------------------------------|-------------------------------|
| `df.select(...).first()`         | `df.collect()`                |
| Pulls one row (scalar value)     | Pulls **all** rows to driver  |
| Safe for production workflows    | Risk of memory overload       |

---


In [0]:
%python
from pyspark.sql.functions import max

# 🔍 **Step 1: Calculate the max quantity as a new DataFrame**
max_quantity_dataframe = df.select(max("quantity"))

# 📺 **Step 2: Debug — show and display the intermediate max result**
print("🔎 DataFrame containing max quantity:")
print(max_quantity_dataframe)  # <-- just the logical plan
max_quantity_dataframe.show()  # <-- tabular display
display(max_quantity_dataframe)  # <-- for notebook UI

# 🧱 **Step 3: Extract the max value as a Row object**
max_quantity = df.select(max("quantity")).first()
print("📦 Row object from which we'll extract the scalar value:")
print(max_quantity)

# 🎯 **Step 4: Extract scalar value by name and index**
max_quantity_by_name = max_quantity["max(quantity)"]
max_quantity_by_index = max_quantity[0]
print(f"🔢 Extracted max quantity (by column name): {max_quantity_by_name}")
print(f"🔢 Extracted max quantity (by index): {max_quantity_by_index}")

# 🏆 **Step 5: Use the extracted value to find the product with highest quantity**
highest_quantity_product = (
    df.filter(df["quantity"] == max_quantity_by_index)
    .select("product")
    .first()[0]
)
print(f"🏆 Product with the highest quantity sold: **{highest_quantity_product}**")
