<a href="https://colab.research.google.com/github/Krishna2592/Databricks-Certified-Data-Engineer-Associate/blob/main/spark_trials.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install spark

Collecting spark
  Downloading spark-0.3.2-py3-none-any.whl.metadata (1.3 kB)
Collecting json-repair>=0.52.4 (from spark)
  Downloading json_repair-0.55.1-py3-none-any.whl.metadata (12 kB)
Downloading spark-0.3.2-py3-none-any.whl (351 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m351.7/351.7 kB[0m [31m10.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading json_repair-0.55.1-py3-none-any.whl (29 kB)
Installing collected packages: json-repair, spark
Successfully installed json-repair-0.55.1 spark-0.3.2


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, input_file_name, try_to_timestamp, row_number, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.window import Window

def deduplicate(df, spark):
  business_keys = ["CustomerID","Amount","timestamp"]

  window = (
      Window.partitionBy(*business_keys).orderBy(col("timestamp").desc())
  )

  ranked_df = df.withColumn("rank", row_number().over(window))

  deduplicated_df = ranked_df.filter(col("rank") == 1).drop("rank")

  return deduplicated_df.show()

if __name__ == "__main__":
  spark = SparkSession.builder.appName("CSVToParquet").getOrCreate()

  df = spark.read.csv("/content/sample_spark_data.csv", header=True, inferSchema=True) \
                  .withColumn("timestamp", try_to_timestamp(col("timestamp_str"), lit("dd-MM-yyyy HH:mm"))) \
                  .drop("timestamp_str")

  df.printSchema()

  deduplicate(df, spark)





root
 |-- ID: integer (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Amount: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)

+---+----------+------+-------------------+
| ID|CustomerID|Amount|          timestamp|
+---+----------+------+-------------------+
|  1|   CUST001| 120.5|2023-01-01 10:00:00|
|  3|   CUST001| 120.5|2023-01-01 11:00:00|
|  4|   CUST004| 45.25|2023-01-04 09:15:00|
|  5|   CUST005| 89.99|2023-01-05 16:20:00|
|  6|   CUST006| 230.1|2023-01-06 12:00:00|
|  7|   CUST007| 150.0|2023-01-07 13:10:00|
|  8|   CUST008|670.45|2023-01-08 15:40:00|
|  9|   CUST009| 310.3|2023-01-09 08:25:00|
| 10|   CUST010| 99.99|2023-01-10 17:55:00|
+---+----------+------+-------------------+



In [None]:
from typing import List

def zero_stripe (matrix: List[List[int]]):

  m, n = len(matrix), len(matrix[0])
  zero_rows, zero_cols = set(), set()

  #1st pass to get rows and cols indexes added to hash sets. what is hash map vs hash sets?
  for r in range(m):
    for c in range(n):
      if matrix[r][c] == 0:
        zero_rows.add(r)
        zero_cols.add(c)

  #2nd pass to zero stripe
  for r in range(m):
    for c in range(n):
      if r in zero_rows or c in zero_cols:
        matrix[r][c] = 0

  return matrix

if __name__ == "__main__":


  matrix = [
    [1, 2, 3, 4],
    [5, 0, 7, 8],
    [9, 10, 11, 12],
    [0, 14, 15, 16]
]

  print(zero_stripe(matrix))

[[0, 0, 3, 4], [0, 0, 0, 0], [0, 0, 11, 12], [0, 0, 0, 0]]




> Add blockquote


TimeComplexity Name ExampleinDSA WhenitHappens Real-LifeAnalogy
O(1) ConstantTime
Accessing array element, Hash map lookup, Stack push/pop
Operation doesn't depend on input size
Picking the first book from a shelf
O(log n)
Logarithmic Time
Binary Search, Balanced BST search, Heap operations
Input size reduces by half each step
Finding a word in a dictionary
O(n)
Linear Time
Traversing array, Linear Search, BFS/DFS in graph
Every element needs to be checked once
Taking attendance in a class
O(n log n)
Linearithmic Time
Merge Sort, Quick Sort (avg), Heap Sort, Tree Sort
Efficient divide-and-conquer algorithms
Sorting names in a phonebook
O(n²)
Quadratic Time
Bubble Sort, Selection Sort, Insertion Sort (worst case)
Nested loops over entire input
Comparing every student with every other student
O(n³)
Cubic Time
Matrix multiplication
Triple nested iterations
Checking all seat combinations in a theater
O(2^n)
Exponential
Recursive Fibonacci, Travelling Salesman
Branching doubles
Trying every combination of arranging and ordering every order of cards




#**"Nested Array Explosion"** scenario. It is the silent killer of Spark jobs because the input data looks small (one row, 50KB), but during the `explode()` transformation, that single row turns into 500,000 rows inside a single partition, causing a memory pressure spike that garbage collection cannot handle.

Here is a **Simulation and Remediation Kit**. We will create the "Killer Data," try to process it blindly, and then implement the **Observability & Protection Layer** you described.

### The Code Artifacts

I will generate a Python file containing:

1. **The Data Generator:** Creates a dataset with a specific "Data Bomb" (one user with 100,000 nested events).
2. **The Observability Pattern:** A `SafePipeline` class that implements row counting, array profiling, and sampling *before* execution.
3. **The Fix:** A "Salted Explode" strategy that handles the massive array without crashing.

### The Senior Engineer's Analysis

Here is the breakdown of what makes this "Senior" level code versus "Junior" level code.

#### 1. The Pre-Flight Check (`PipelineGuard`)

* **Junior:** Just writes `df.select(explode("events"))`. If it fails, they check logs.
* **Senior:** Assumes the data is hostile. The `profile_array_column` function (lines 58-79) acts as a scanner.
* **Why it matters:** It adds a minimal cost (one pass over metadata/size) to prevent a catastrophic crash. Finding `max(size)` allows you to fail fast or switch strategies dynamically.

#### 2. The Isolation Strategy

* **The Trap:** When you have 99% small arrays and 1% huge arrays, a standard `explode` might work *most* of the time, but occasionally a task will hang for hours or OOM.
* **The Fix:** I explicitly split the DataFrame into `small_df` and `huge_df` (lines 104-107).
* **Why:** This isolates the blast radius. If the huge data crashes, it doesn't take down the processing of the 99% good data.

#### 3. The `repartition(100)` Magic (Line 119)

This is the specific technical fix for "Executor Lost due to OOM" on explode.

* **The Physics of the Crash:**
* Input: `Row(User="Killer", Array_Size=100,000)` fits on Partition 1 (Executor A).
* Operation: `explode`.
* Result: Executor A now has to hold 100,000 rows *and* the object overhead in memory. It hits the heap limit.


* **The Solution:**
* `huge_df.repartition(100)`: Forces a shuffle.
* Even though it's one row coming *in*, if we had multiple killer rows, this distributes them to different executors *before* the explosion happens (or prepares the stage for downstream processing).
* *Note:* If a single *output* row is too big, `repartition` won't help. But here, the *input* is one row, the *output* is many rows. The OOM often happens during the generation of those rows. By isolating the heavy rows, we ensure the executor doing this work isn't also burdened with the rest of the dataset.



### Databricks Monitoring Guide (How to verify this)

When you run this in Databricks, here is exactly where you look:

1. **Ganglia UI -> Cluster Memory:**
* **Bad Pipeline:** You will see one node's memory spike to 100% while others are flat. Then a sudden drop (crash).
* **Good Pipeline:** You will see a small spike during the `profile` stage, then a uniform distribution of memory usage during the `repartition/explode` stage.


2. **Spark UI -> Stages -> Sort By "Duration":**
* Look for the stage with the `explode`.
* Click **"Description"** to see task details.
* Sort tasks by **"Shuffle Read Size"** or **"Output Size"**.
* **The Smoking Gun:** If one task has 10GB output and others have 10MB, you have found your nested array explosion.


3. **Driver Logs (Log4j):**
* My code prints: `WARNING: Data Skew Detected!`.
* In Databricks, use the "Driver Logs" tab and search for "WARNING". This gives you the specific `user_id` that is causing the problem, allowing you to go back to the source system team and say "User X is sending 100k events per batch, please fix upstream."

This strategy is **native for Batch processing** but requires a specific architectural pattern to work in **Streaming**.

Here is the breakdown of how, where, and when to apply this.

### 1. Batch vs. Streaming Compatibility

#### **Batch Processing (Native Fit)**

* **Verdict:** **Perfect fit.**
* **Why:** In batch jobs (e.g., Daily Ingestion), you have a finite dataset. Running an "Action" like `.collect()` (which my `profile_array_column` function does to get stats) takes a few seconds but saves hours of failure time.
* **Usage:** You run this at the start of your job before the heavy transformations.

#### **Streaming Processing (The Catch)**

* **Verdict:** **Conditional fit (Must use `foreachBatch`).**
* **The Problem:** You cannot run `.collect()` or calculate `max()` directly on a Streaming DataFrame. Spark will throw an error: `Queries with streaming sources must be executed with writeStream.start()`.
* **The Solution:** You must wrap this logic inside a `foreachBatch` function.
* Spark Structured Streaming processes data in small "micro-batches."
* Inside `foreachBatch`, the micro-batch appears as a **static** (Batch) DataFrame. You can then run the profiling logic on that specific chunk of data.



**Streaming Implementation Snippet:**

```python
def process_micro_batch(df, batch_id):
    # Inside here, 'df' is a Batch DataFrame of the last 30 seconds of data
    # We can safely run our guard logic
    clean_df = safe_explode_pipeline(spark, df)
    
    clean_df.write.format("delta").mode("append").save("/mnt/silver/events")

# The Stream Trigger
spark.readStream.format("cloudFiles").load("/mnt/raw") \
    .writeStream \
    .foreachBatch(process_micro_batch) \
    .start()

```

---

### 2. Where would this be used? (Architecture)

This fits explicitly in the **Bronze to Silver** transition layer of a Medallion Architecture.

* **The Input (Bronze):** Raw, nested JSON (e.g., from MongoDB, DynamoDB, or Application Logs).
* **The Operation:** Flattening/Exploding. This is the most dangerous operation in Spark because it changes row cardinality (1 row  100k rows).
* **The Output (Silver):** Clean, flat tables ready for analysis.

**Real-World Scenarios:**

1. **Clickstream Data:** A user session is one JSON object with an array of 5,000 clicks. You need to explode it to analyze individual clicks.
2. **IoT Sensor Arrays:** A device sends a daily packet containing 86,400 temperature readings (one per second) in a single array.
3. **Scraping Pipelines:** You scrape a webpage and get a list of 10,000 comments in one nested field.

---

### 3. How frequently should you use it?

It depends on your **SLA (Service Level Agreement)** vs. **Cost**.

* **High Frequency (Every Run):**
* **Recommended for:** Batch jobs and High-Latency Streams (Trigger: 1-5 minutes).
* **Cost:** The `.collect()` profiling adds overhead (e.g., 5-10 seconds per batch). If your job runs once a day, 10 seconds is nothing. If your stream runs every 10 seconds, adding 10 seconds of latency is doubling your processing time.


* **Blind Protection (Skip Profiling):**
* **Recommended for:** Ultra-Low Latency Streams (< 1 second).
* **Strategy:** Don't profile. Just **assume** the data is skewed and always apply the `repartition(100)` logic before exploding.
* **Tradeoff:** You waste a little CPU shuffling small data, but you guarantee stability without the latency "pause" of checking stats first.

In [None]:
import random
import time
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, explode, size, lit, max as _max, avg,
    when, count, posexplode, expr
    )
from pyspark.sql.types import (
    StructType, ArrayType, IntegerType, StructField,
    StringType, LongType
)
# ==============================================================================
# 1. THE DATA GENERATOR (Creating the "Data Bomb")
# ==============================================================================
def generate_skewed_data(spark, num_normal=1000, num_killers=5):
    """
    Generates a dataset where most users have 5-10 events,
    but a few 'killers' have 100,000 events.
    """
    print(f"Generating {num_normal} normal users and {num_killers} DATA BOMBS...")

    data = []

    # Normal Users
    for i in range(num_normal):
        events = [{"event_id": k, "ts": int(time.time())} for k in range(random.randint(1, 10))]
        data.append((f"user_{i}", events))

    # The Killers (Simulating the deep nested JSON issue)
    for i in range(num_killers):
        # 100,000 events in ONE array.
        # In memory, this is small. After explode, it's massive.
        events = [{"event_id": k, "ts": int(time.time())} for k in range(100000)]
        data.append((f"KILLER_USER_{i}", events))

    schema = StructType([
        StructField("user_id", StringType(), True),
        StructField("events", ArrayType(
            StructType([
                StructField("event_id", IntegerType(), True),
                StructField("ts", LongType(), True)
            ])
        ), True)
    ])

    return spark.createDataFrame(data, schema)

# ==============================================================================
# 2. THE OBSERVABILITY LAYER (The "Senior" Approach)
# ==============================================================================
class PipelineGuard:
  def __init__ (self, spark):
    self.spark = spark
    self.metrics = {}

  def profile_array_column(self, df, col_name, threshold = 1000):
    """
        Scans a DataFrame BEFORE expensive operations to detect array skew.
        """
    print(f"\n----Profiling Column: {col_name}---\n")

    #add metadata
    df_profiled = df.withColumn("_array_size", size(col(col_name)))

    #calculate stats
    stats = df_profiled.select(
        _max("_array_size").alias("max_size"),
        avg("_array_size").alias("avg_size"),
        count(when(col("_array_size")) > threshold, 1).alias("danger_rows")
    ).collect()[0]

    self.metrics[col_name] = stats

    print(f"Max_array_length: ", stats["max_size"])
    print(f"avg array lenght: ", stats["avg_size"])
    print(f"Rows exceeding threshold: {threshold}: {stats['danger_rows']}")

    if stats['danger_rows'] > 0:
      print("WARNING: data skew detected! Potential OOM on explode")
      print("Sample of problematic: ")
      df_profiiled.filter(col("_array_size") > threshold) \
                  .select("user_id", "_array_size").show(3)

    return df_profiled
def safe_explode_pipeline(spark,df):
  guard = PipelineGuard(spark)

  df_checked = guard.profile_array_ column(df, "events", threshold = 1000)
  danger_count = guard.metrics["events"]["danger_rows"]

  if danger_count == 0:
    return df.select("user_id", explode("events").alias("event"))

  else:
    print("\n--Applying Skew Mitigation Strategy--\n")
    # Strategy: ISOLATE -> REPARTITION -> EXPLODE -> UNION

    # 1. Split the Stream
    # Small rows go to the fast lane.
    small_df = df_checked.filter(col("_array_size") <= 10000)

    # Huge rows go to the heavy lifting lane.
    huge_df = df_checked.filter(col("_array_size") > 10000)

    # 2. Process Small Data (Standard Explode)
    small_exploded = small_df.select("user_id", explode("events").alias("event"))

    # 3. Process Huge Data (The "Salted" Fix)
    # Why OOM happens: Spark puts the one huge row on ONE core.
    # Fix: We force Spark to scatter this one user across 100 partitions
    # *before* exploding.

    # We use repartition(N) to force a shuffle.
    # Since we only have a few huge rows, we can just round-robin repartition.
    huge_exploded = (huge_df
                      .repartition(100) # <--- THE MAGIC FIX
                      .select("user_id", explode("events").alias("event")))

    # 4. Union back together
    final_df = small_exploded.union(huge_exploded)

    return final_df

# ==============================================================================
# EXECUTION
# ==============================================================================
if __name__ == "__main__":
spark = SparkSession.builder \
    .appName("NestedArrayOOMFix") \
    .master("local[*]") \
    .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    # 1. Generate Dangerous Data
    raw_df = generate_skewed_data(spark)

    # 2. Run the Protected Pipeline
    result_df = safe_explode_pipeline(spark, raw_df)

    print("\n--- Final Pipeline Stats ---")
    print(f"Total Exploded Rows: {result_df.count()}")

    # Verify we processed the killer users
    killer_count = result_df.filter(col("user_id").contains("KILLER")).count()
    print(f"Killer User Events Processed: {killer_count}")

    spark.stop()


