<a href="https://colab.research.google.com/github/ankitarm/Data_Engineer_Scenario/blob/main/Real_World_Scenario_Questions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Q1 - You notice your PySpark job is writing corrupted Parquet files. What do you do?
 **Step 1: Identify the Issue**

* **Check error logs**: Look for `ParquetCorruptException`, `EOFException`.
* **Verify output**: Try reading the Parquet files using `spark.read.parquet(path)` or a tool like `parquet-tools` or `AWS Athena`.
* **Check sample files**: Validate file sizes—zero-byte or unusually small files may signal corruption.

---

  **Step 2: Investigate Common Causes**

1. **Schema evolution issues**

   * Mismatched schemas between different writes can cause corruption.
   * ✔️ Fix: Enable schema merging only if needed:

     ```python
     spark.read.option("mergeSchema", "true")
     ```

2. **Partial writes or task failures**

   * If tasks fail midway, incomplete files may be written.
   * ✔️ Fix: Enable job/task retry and speculative execution cautiously:

     ```python
     spark.conf.set("spark.speculation", "false")  # disables speculative execution
     ```

3. **Concurrent writes**

   * Multiple jobs writing to the same location without coordination can corrupt data.
   * ✔️ Fix: Ensure only one job writes to a target directory at a time.

4. **Incompatible or custom UDFs**

   * UDFs that return nulls or un-serializable data can corrupt rows.
   * ✔️ Fix: Validate data types and sanitize values before writing.

5. **File system issues (e.g., S3 eventual consistency)**

   * On S3 or HDFS, delayed consistency can result in missing metadata.
   * ✔️ Fix: Use **`spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2`**
     to reduce write inconsistency.

---

### 🧪 **Step 3: Isolate the Bad Files**

* Use `parquet-tools`:

  ```bash
  parquet-tools schema part-*.parquet
  ```
* Delete or quarantine corrupted files and rerun the failed part of the job if possible.

---

### ✅ **Step 4: Apply Best Practices**

* Write to a temp directory and move to the final location only if the job succeeds.
* Use `.coalesce(n)` to reduce the number of small files.
* Enable write validation:

  ```python
  spark.conf.set("parquet.enable.summary-metadata", "true")
  ```



##  Q2 - Your Spark job fails when reading JSON files with inconsistent schema. What’s your fix?

 The problem is that different JSON files might have different structures (e.g., missing fields, different data types for the same field, or new fields).
 Spark, by default, when reading JSON, infers the schema from the first file it reads (or from a sample if set). If subsequent files have a different structure, the job may fail.
 There are several ways to handle this:
 1. **Schema Merging**: Spark has an option to merge schemas from multiple JSON files. This is especially useful when the JSON files have different fields (but note: it might not help with type conflicts).
 2. **Define a Fixed Schema**: If we know the expected schema, we can define it explicitly and then use that schema to read the JSON files. This way, any field not present in the schema will be ignored, and any missing field will be set to null. However, if there is a type conflict (e.g., a field is integer in one file and string in another), then we might still get an error.
 3. **Use `permissive` mode with a column for corrupt records**: We can set the mode to `PERMISSIVE` (which is the default) and also use the `columnNameOfCorruptRecord` option. This will put the malformed records (due to schema inconsistency) into a separate column. Then we can handle them separately.
 4. **Use `mode` option for parsing**: We can set the mode when reading JSON to `DROPMALFORMED` or `FAILFAST`. However, `FAILFAST` would fail quickly on any malformed record (which we are already experiencing) and `DROPMALFORMED` would drop the records that don't match the inferred or provided schema. But note: `DROPMALFORMED` might drop entire records if one field is inconsistent.
 5. **Preprocess the JSON files**: We might preprocess the JSON files to make them consistent (e.g., using a script or a preliminary Spark job to standardize the schema).
 6. **Use a more flexible format**: If the inconsistency is too high, we might consider using a more flexible format like Parquet (with schema merging) or ORC, but that requires converting the data.

 ## Q3 - You have multiple joins in your ETL. How do you debug performance?


1. **Inspect Spark UI** for slow stages/shuffles.
  - **Shuffle Metrics**: High shuffle read/write (`spark.shuffle.spill`, `shuffle bytes`) indicates costly joins.

- Look for warnings like:
  ```
  WARN JoinSelection: Detected cartesian product join (expensive!)
  ```
2. **Broadcast small tables** where possible.
3. **Handle skew** (salting, split processing).
- **Skewed keys** cause uneven task workloads (some tasks take much longer).
- Check skew in join keys:
  ```scala
  df.groupBy("join_key").count().orderBy(desc("count")).show()
  ```
- **Mitigate Skew**:
  ```scala
  // Option 1: Salting (add random prefixes to skewed keys)
  val saltedDf = df.withColumn("salted_key", concat($"key", lit("_"), (rand() * 10).cast("int")))
  
  // Option 2: Split skewed keys into separate processing
  val skewedKeys = Seq("key1", "key2")  // Manually identified
  val skewedData = df1.filter($"key".isin(skewedKeys: _*))
  val nonSkewedData = df1.filter(!$"key".isin(skewedKeys: _*))
  ```

4. **Reorder joins** and enable CBO.
- Spark evaluates joins left-to-right by default. **Reorder joins** to:
  - Filter early (reduce data volume).
  - Place smaller tables first to enable broadcast joins.

5. **Tune shuffle partitions** and memory.

**6. Debug with EXPLAIN**
- Use `EXPLAIN` to inspect the physical plan:
  ```scala
  df.explain("extended")  // Show parsed/logical/physical plans
  ```


6. **Cache** reused DataFrames.

- **Cache** reused DataFrames to avoid recomputation:
  ```scala
  df.cache().count()  // Materialize cache
  ```
- **Monitor Cache Usage** in Spark UI’s "Storage" tab.

7. **Common Anti-Patterns to Fix**
 ```
| Issue | Solution |
|-------|----------|
| Cartesian joins | Add explicit join conditions or disable (`spark.sql.crossJoin.enabled=false`) |
| Joining on mismatched types | Cast keys to the same type (e.g., `df.withColumn("key", $"key".cast("int"))`) |
| Unpartitioned data | Repartition before join: `df.repartition(100, $"key")` |
```


## Q4 - Data duplication is found after a union. What’s wrong?


 **Root Causes**
1. **`union` vs. `unionAll` Misunderstanding**:
   - In Spark, `union` (and `unionAll`, which is an alias) **does not deduplicate data**. It simply stacks datasets on top of each other, including duplicates.
   - If you expected deduplication (like SQL’s `UNION DISTINCT`), you used the wrong operation.

2. **Overlapping Data in Source Datasets**:
   - The input DataFrames/RDDs being unioned contain overlapping records (e.g., identical rows across files or partitions).

3. **Inconsistent Data Generation**:
   - Upstream ETL logic (e.g., joins, aggregations) is producing duplicate rows before the `union`.

---

 **How to Debug**
1. **Check for Duplicates in Inputs**:
   ```python
   # Count distinct rows in each DataFrame before union
   df1.distinct().count() == df1.count()  # False means duplicates exist
   df2.distinct().count() == df2.count()
   ```

2. **Inspect the Union Logic**:
   ```python
   # Show duplicates introduced by union
   df_union = df1.union(df2)
   df_union.groupBy(df_union.columns).count().filter("count > 1").show()
   ```

---

 **Solutions**
 **1. Use `union` + `distinct` (If Deduplication Is Needed)**
   ```python
   df_union_distinct = df1.union(df2).distinct()  # Removes duplicates globally
   ```
   - **Tradeoff**: Expensive (requires full data shuffle).

 **2. Use `unionByName` (If Schemas Differ Slightly)**
   ```python
   df_union = df1.unionByName(df2)  # Matches columns by name, not position
   ```
   - Prevents mismatched columns from creating "hidden" duplicates.

 **3. Deduplicate Before Union (Optimal for Performance)**
   ```python
   df_union = df1.distinct().union(df2.distinct())  # Dedupe inputs first
   ```
   - **Best for**: Large datasets where post-union `distinct` is costly.

 **4. Use `except` to Find Overlaps (Debugging)**
   ```python
   df1.intersect(df2).show()  # Show overlapping rows
   ```

 **5. Fix Upstream Logic**
   - If duplicates come from source data (e.g., double-reading files), adjust the ETL:
     ```python
     # Example: Use .dropDuplicates() after reading
     df1 = spark.read.parquet("path").dropDuplicates()
     ```

---

 **Key Notes**
- **`union` ≠ `UNION DISTINCT`**: Spark’s `union` behaves like SQL’s `UNION ALL`.
- **Performance**: Post-union `distinct` is expensive; prefer deduping inputs first.
- **Schema Safety**: Use `unionByName` if columns are in different orders.

---

 **Example Workflow**
```python
# Read and deduplicate inputs
df1 = spark.read.parquet("path1").dropDuplicates(["id", "timestamp"])
df2 = spark.read.parquet("path2").dropDuplicates(["id", "timestamp"])

# Union with distinct (if needed)
df_final = df1.unionByName(df2).distinct()
```

By addressing the root cause (usually upstream logic or misunderstanding of `union`), you can eliminate duplicates efficiently.


## Q5 - You’re required to load only new data each day. How do you handle incremental loads?

 **1. Track Incremental Data Sources**
Identify how to detect new data:
- **Timestamp-based**: Use a column like `last_updated_at` or `event_time`.
- **ID-based**: Incrementing IDs (e.g., `auto_increment` keys).
- **File-based**: New files arrive in a directory (e.g., `s3://bucket/day=2024-01-01/`).
- **CDC (Change Data Capture)**: Log-based tools (Debezium, AWS DMS) or database triggers.

---

 **2. Store the "High-Water Mark"**
Persist the last processed record to avoid reprocessing:
- **Options**:
  - Database table (e.g., `metadata.load_history`).
  - File (e.g., `_last_processed.json` in cloud storage).
  - Spark’s `checkpoint` (for streaming jobs).

**Example (Delta Lake)**:
```python
# Read last processed timestamp
last_run = spark.read.json("dbfs:/mnt/last_processed.json").first()["last_timestamp"]
```

---

 **3. Query Only New Data**
Filter data at read time using the high-water mark:
 **For Timestamp-Based Increments**
```python
new_data = spark.read.table("source_table") \
  .filter(f"event_time > '{last_run}'")  # Or use >= for inclusivity
```

 **For File-Based Increments (e.g., Hive-Style Partitioning)**
```python
new_data = spark.read.parquet("s3://bucket/") \
  .filter("day > '2024-01-01'")  # Dynamic partition filtering
```

 **For Database CDC**
```python
# Using Debezium output (e.g., Kafka topic)
cdc_data = spark.read.format("kafka") \
  .option("startingOffsets", last_kafka_offset) \
  .load()
```

---

 **4. Merge or Append New Data**
 **Option A: Append (Immutable Data)**
```python
new_data.write.mode("append").saveAsTable("target_table")
```

 **Option B: Merge (Updates + Inserts)**
Use Delta Lake/Iceberg for ACID operations:
```python
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "/path/to/target")
delta_table.alias("target").merge(
  new_data.alias("source"),
  "target.id = source.id") \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()
```

---

 **5. Update the High-Water Mark**
After successful processing:
```python
# Update timestamp (e.g., max(new_data.event_time))
new_max = new_data.selectExpr("max(event_time)").first()[0]

# Persist for next run
spark.createDataFrame([(new_max,)], ["last_timestamp"]) \
  .write.mode("overwrite").json("dbfs:/mnt/last_processed.json")
```

---

 **6. Handle Failures Idempotently**
- **Idempotent writes**: Use transactional formats (Delta Lake, Iceberg) to avoid partial loads.
- **Checkpointing**: In streaming jobs, use:
  ```python
  spark.readStream.option("checkpointLocation", "/checkpoint/path")
  ```

---

 **7. Optimize Performance**
- **Partitioning**: Ensure target tables are partitioned by date/ID for fast filtering.
- **Z-Ordering** (Delta Lake): Cluster by key columns to speed up merges:
  ```python
  delta_table.optimize().executeZOrderBy("id")
  ```

---

 **Example End-to-End Pipeline**
```python
# 1. Fetch last processed timestamp
last_run = spark.read.json("last_processed.json").first()["timestamp"]

# 2. Load new data
new_data = spark.read.table("source").filter(f"timestamp > '{last_run}'")

# 3. Merge into target
delta_table = DeltaTable.forPath(spark, "/data/target")
delta_table.alias("t").merge(
  new_data.alias("s"),
  "t.id = s.id") \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()

# 4. Update high-water mark
new_max = new_data.selectExpr("max(timestamp)").first()[0]
spark.createDataFrame([(new_max,)], ["timestamp"]) \
  .write.mode("overwrite").json("last_processed.json")
```

---

### **Key Considerations**
| Approach | Best For | Tools |
|----------|----------|-------|
| **Timestamp/ID** | Append-only data | Delta Lake, plain Parquet |
| **File-Based** | Object storage (S3, HDFS) | Hive partitions, AutoLoader |
| **CDC** | Database updates | Debezium, Kafka, Delta Lake CDC |


## Q6 - Your PySpark notebook is stuck at an action like `collect()`. What’s your approach?


 **Summary Checklist**
1. **Check Spark UI** for stuck stages/skew.
2. **Avoid `collect()`**—use `take()`, `show()`, or writes.
3. **Repartition/salt data** if skewed.
4. **Increase memory** (driver/executors) if OOM occurs.
5. **Optimize queries** (filter early, checkpoint).
6. **Tune Spark configs** (shuffle partitions, timeouts).

 **1. Diagnose the Problem**
 **Check the Spark UI**
- Access the Spark UI (`http://<driver-node>:4040`) while the job is stuck:
  - **Stages Tab**: Identify which stage/task is hanging (look for incomplete/failed tasks).
  - **Executors Tab**: Check if executors are alive, memory usage, and GC (garbage collection) time.
  - **Storage Tab**: Verify if excessive caching is causing memory pressure.

 **Look for Warnings/Errors**
- **Driver/Executor logs** (accessible via Spark UI or cluster logs):
  - OutOfMemory (OOM) errors.
  - Long garbage collection pauses (`GC overhead limit exceeded`).
  - Network timeouts (`TimeoutException`).

---

 **2. Immediate Mitigations**
 **Cancel and Reprocess Safely**
- **Keyboard interrupt** (if in notebook) or stop the Spark job via UI.
- **Avoid `collect()`** for large datasets:
  ```python
  # Replace:
  df.collect()  # Brings all data to driver (risky for big data)
  
  # With safer actions:
  df.take(100)  # Fetch a sample
  df.write.parquet("output.parquet")  # Write to disk instead
  ```

 **Increase Resources (If Possible)**
- **Driver Memory**: Set higher memory for the driver (e.g., `--driver-memory 8G`).
- **Executor Memory/Instances**:
  ```python
  spark.conf.set("spark.executor.memory", "4G")
  spark.conf.set("spark.executor.instances", "10")
  ```

---

 **3. Debug Common Causes**
 **Cause 1: Data Skew**
- **Symptoms**: A few tasks run much longer than others.
- **Fix**:
  - **Salting**: Redistribute skewed keys:
    ```python
    from pyspark.sql.functions import rand
    df = df.withColumn("salted_key", concat("key", lit("_"), (rand() * 10).cast("int")))
    ```
  - **Repartition**: Balance partitions:
    ```python
    df = df.repartition(200, "key")  # Increase partition count
    ```

 **Cause 2: Memory Issues**
- **Symptoms**: `java.lang.OutOfMemoryError` or high spill-to-disk.
- **Fix**:
  - **Reduce partition size**:
    ```python
    spark.conf.set("spark.sql.shuffle.partitions", "200")  # Default: 200
    ```
  - **Cache wisely**:
    ```python
    df.cache().count()  # Materialize cache
    df.unpersist()  # Release if unused
    ```

 **Cause 3: Expensive Transformations Before `collect()`**
- **Symptoms**: Slow stages before the action.
- **Fix**:
  - **Optimize queries**:
    ```python
    # Replace:
    df.filter("col1 > 100").collect()
    
    # With pushed-down filtering:
    df.select("col1").filter("col1 > 100").collect()
    ```
  - **Checkpoint intermediate results**:
    ```python
    df.checkpoint()  # Breaks lineage for complex DAGs
    ```

 **Cause 4: Network/Dependency Issues**
- **Symptoms**: Tasks hanging at "shuffle" or external calls.
- **Fix**:
  - **Increase timeouts**:
    ```python
    spark.conf.set("spark.network.timeout", "600s")
    ```

---

 **4. Alternative Actions to `collect()`**
- **For inspection**:
  ```python
  df.show(10)        # Prints rows to console
  df.take(100)       # Returns a list of rows (limited size)
  df.limit(100).collect()  # Safer than full collect
  ```
- **For writing**:
  ```python
  df.write.csv("output.csv")  # Avoids driver memory issues
  ```

---

 **5. Long-Term Prevention**
- **Monitor**: Use Spark UI regularly to catch skew/memory issues early.
- **Tune configurations**:
  ```python
  spark.conf.set("spark.sql.adaptive.enabled", "true")  # Auto-optimize shuffle
  spark.conf.set("spark.executor.memoryOverhead", "2G")  # For off-heap memory
  ```
- **Use structured streaming** for incremental processing instead of batch `collect()`.

---



## Q7 - You're asked to validate the data pipeline output against the source. How?

 **1. Define Validation Criteria**
Start by identifying key metrics to compare between source and target:
- **Row Count**: Record counts should match (or meet expected differences, like deduplication).
- **Schema**: Column names, data types, and nullability.
- **Data Integrity**: Key fields (e.g., IDs, timestamps) should have no unexpected changes.
- **Business Logic**: Aggregations, transformations, and derived fields must adhere to rules.
- **Freshness**: Data should be up-to-date (e.g., latest timestamps match SLAs).

---

 **2. Automated Validation Checks**
Implement programmatic checks using PySpark/SQL:

 **A. Row Count Validation**
```python
source_count = spark.table("source_table").count()
target_count = spark.table("target_table").count()

assert source_count == target_count, \
    f"Count mismatch: Source={source_count}, Target={target_count}"
```

 **B. Schema Validation**
```python
source_schema = spark.table("source_table").schema
target_schema = spark.table("target_table").schema

 source_schema == target_schema, \
    "Schema mismatch!\nSource:\n{source_schema}\nTarget:\n{target_schema}"
```

 **C. Data Diff (Sample or Full)**
Compare a sample of records:
```python
from pyspark.sql.functions import md5, concat_ws

# Compare checksums of key columns
source_hashes = spark.table("source_table") \
    .select(md5(concat_ws("|", "id", "timestamp")).alias("hash"))
target_hashes = spark.table("target_table") \
    .select(md5(concat_ws("|", "id", "timestamp")).alias("hash"))

# Find mismatches
mismatches = source_hashes.join(target_hashes, "hash", "anti")
assert mismatches.count() == 0, f"{mismatches.count()} mismatched records found"
```

 **D. Statistical Validation**
Check distributions/aggregates:
```python
source_stats = spark.table("source_table").selectExpr(
    "avg(revenue) as avg_revenue",
    "count(distinct user_id) as distinct_users"
).first()

target_stats = spark.table("target_table").selectExpr(
    "avg(revenue) as avg_revenue",
    "count(distinct user_id) as distinct_users"
).first()

assert round(source_stats.avg_revenue, 2) == round(target_stats.avg_revenue, 2), \
    "Revenue averages differ"
```

---

 **3. Handle Expected Deltas**
If the pipeline intentionally modifies data (e.g., filtering, deduplication), adjust validations:
```python
# Example: Expect 5% fewer records due to deduplication
expected_count = source_count * 0.95
assert abs(target_count - expected_count) < 100, \
    "Count outside expected tolerance"
```

---

 **4. Logging and Alerting**
- **Log discrepancies** for triage:
  ```python
  if mismatches.count() > 0:
      mismatches.write.mode("overwrite").parquet("path/to/mismatches")
  ```
- **Set up alerts** (e.g., via Airflow, Databricks Jobs, or custom monitors) for failures.

---

 **5. Source-to-Target Tools (For Large Pipelines)**
For complex pipelines, consider specialized tools:
- **Great Expectations**: Framework for data validation.
  ```python
  import great_expectations as ge
  df = ge.dataset.SparkDFDataset(spark.table("target_table"))
  df.expect_column_values_to_not_be_null("user_id")
  ```
- **Delta Lake/DBT Tests**: Built-in assertions for schema/data quality.
  ```sql
  -- In DBT
  tests:
    - not_null:
        column: user_id
    - relationships:
        to: ref('source_table')
        field: id
  ```

---

 **6. Edge Cases to Validate**
- **Null Handling**: Ensure `NULL` values are preserved or transformed correctly.
- **Data Type Conversions**: Verify no loss of precision (e.g., `timestamp` → `string`).
- **Partitioning**: Check if partition columns match source predicates.
- **Incremental Loads**: Validate only new/updated data was processed.

---

 **7. Example Validation Workflow**
```python
def validate_pipeline():
    source = spark.table("source").filter("date = '2023-01-01'")
    target = spark.table("target").filter("date = '2023-01-01'")
    
    # Row count
    assert source.count() == target.count(), "Row count mismatch"
    
    # Schema
    assert set(source.columns) == set(target.columns), "Column mismatch"
    
    # Key field consistency
    source_ids = source.select("id").distinct()
    target_ids = target.select("id").distinct()
    assert source_ids.subtract(target_ids).count() == 0, "ID mismatch"
    
    print("Validation passed!")

validate_pipeline()
```

---
```
 **Key Takeaways**
| **Check**               | **Method**                                                                 |
|-------------------------|----------------------------------------------------------------------------|
| **Row Count**           | Compare `.count()` between source/target.                                  |
| **Schema**              | Validate column names, types, and nullability.                             |
| **Data Accuracy**       | Use checksums (e.g., `md5`) or join anti-checks.                           |
| **Business Logic**      | Test aggregations (e.g., `sum(revenue)`).                                  |
| **Automation**          | Integrate with CI/CD (e.g., GitHub Actions) or orchestration (Airflow).    |
```

## Q8 - You're asked to convert a nested JSON into a flat table. What steps do you take?


Converting nested JSON into a flat table in PySpark involves **flattening nested structures** (arrays, structs) into columns. Here’s a step-by-step guide:

---

 **1. Read the Nested JSON**
```python
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.read.json("path/to/nested_data.json")  # Or use `option("multiline", "true")` for multi-line JSON
df.printSchema()
```
**Sample Nested Schema**:
```json
{
  "id": 1,
  "name": "Alice",
  "address": {
    "city": "NY",
    "zip": 10001
  },
  "orders": [
    {"order_id": 101, "amount": 50.0},
    {"order_id": 102, "amount": 30.0}
  ]
}
```

---

 **2. Flatten Structs (Single Nested Fields)**
Use dot notation or `selectExpr` to unnest structs:
```python
# Method 1: Dot notation
flattened_df = df.select(
    "id",
    "name",
    col("address.city").alias("city"),  # Unnest struct
    col("address.zip").alias("zip")
)

# Method 2: selectExpr (SQL-like syntax)
flattened_df = df.selectExpr(
    "id",
    "name",
    "address.city as city",
    "address.zip as zip"
)
```

---

 **3. Explode Arrays (List of Objects)**
For arrays, use `explode()` or `explode_outer()` (to keep nulls):
```python
from pyspark.sql.functions import explode

# Explode the "orders" array
exploded_df = df.select(
    "id",
    "name",
    explode("orders").alias("order")  # Each array element becomes a row
)

# Then flatten the exploded struct
final_df = exploded_df.select(
    "id",
    "name",
    col("order.order_id").alias("order_id"),
    col("order.amount").alias("amount")
)
```
**Output**:
```
+---+-----+--------+------+
|id |name |order_id|amount|
+---+-----+--------+------+
|1  |Alice|101     |50.0  |
|1  |Alice|102     |30.0  |
+---+-----+--------+------+
```

---

 **4. Handle Nested Arrays (Advanced)**
For multi-level nesting, combine `explode` with `struct` flattening:
```python
from pyspark.sql.functions import explode

# Example: JSON with nested arrays
df = spark.read.json("path/to/deeply_nested.json")

# Step 1: Explode outer array
df_exploded = df.withColumn("order_item", explode("orders.items"))

# Step 2: Flatten structs
df_flat = df_exploded.select(
    "id",
    col("order_item.item_id").alias("item_id"),
    col("order_item.price").alias("price")
)
```

---

 **5. Dynamic Flattening (For Unknown Schemas)**
If the schema is unknown or highly dynamic:
```python
from pyspark.sql.functions import col

def flatten_df(nested_df):
    flat_cols = []
    nested_cols = []
    
    for field in nested_df.schema.fields:
        if isinstance(field.dataType, StructType):
            # Recursively flatten structs
            for sub_field in field.dataType.fields:
                flat_cols.append(col(f"{field.name}.{sub_field.name}").alias(f"{field.name}_{sub_field.name}"))
        else:
            flat_cols.append(col(field.name))
    
    return nested_df.select(flat_cols)

flattened_df = flatten_df(df)
```

---

 **6. Write the Flat Table**
```python
flattened_df.write.parquet("path/to/flattened_table")  # Or CSV/Delta
```

---

 **Key Functions & Tips**
| **Function**           | **Purpose**                                                                 |
|------------------------|----------------------------------------------------------------------------|
| `explode()`            | Converts array elements into rows (skips nulls/empty arrays).              |
| `explode_outer()`      | Includes null/empty arrays in output.                                      |
| `selectExpr()`         | SQL-like syntax for complex column operations.                             |
| `struct.field`         | Dot notation to access nested struct fields.                               |
| `from_json()`          | Parse JSON strings into structs before flattening.                         |

**Performance Tips**:
- **Filter early**: Use `select()` to prune unused columns before flattening.
- **Limit explosions**: Exploding large arrays can cause row multiplication (use `size()` to check).
- **Schema inference**: Use `spark.read.json` with sampling for large files:
  ```python
  spark.read.option("samplingRatio", "0.1").json("path")
  ```

---

 **Example: End-to-End Flattening**
```python
from pyspark.sql.functions import explode, col

# Input JSON with nested fields and arrays
data = [
    {"id": 1, "name": "Alice", "address": {"city": "NY", "zip": 10001}, "orders": [{"id": 101, "amount": 50.0}]},
    {"id": 2, "name": "Bob", "address": {"city": "SF", "zip": 94105}, "orders": []}
]
df = spark.createDataFrame(data)

# Step 1: Flatten structs
df_flat = df.select(
    "id",
    "name",
    col("address.city").alias("city"),
    col("address.zip").alias("zip"),
    "orders"
)

# Step 2: Explode arrays (with explode_outer to keep Bob's empty orders)
df_final = df_flat.select(
    "id", "name", "city", "zip",
    explode_outer("orders").alias("order")
).select(
    "*",
    col("order.id").alias("order_id"),
    col("order.amount").alias("amount")
).drop("order")

df_final.show()
```
**Output**:
```
+---+-----+----+-----+--------+------+
| id| name|city|  zip|order_id|amount|
+---+-----+----+-----+--------+------+
|  1|Alice|  NY|10001|     101|  50.0|
|  2|  Bob|  SF|94105|    null|  null|
+---+-----+----+-----+--------+------+
```


## Q9 - The job fails with 'Files were not closed properly'. What might be the issue?
The error **"Files were not closed properly"** in Spark typically indicates issues with file handles or I/O operations during job execution. Here are the common causes and solutions:

---

### **1. Resource Leaks (Most Common)**
#### **Cause**:
- **Unclosed File Handles**: Spark couldn’t close output files (e.g., Parquet, ORC) due to:
  - Executor crashes or JVM failures.
  - Improper shutdown (e.g., `kill -9` or cluster preemption).
  - Code not calling `.close()` on `OutputStream`s in UDFs.

#### **Fix**:
- **Graceful Shutdown**: Ensure jobs exit cleanly (avoid forced termination).
- **Use `try-finally`** for resource management:
  ```python
  try:
      df.write.parquet("output_path")
  finally:
      spark.stop()  # Ensures SparkContext cleanup
  ```

---

### **2. Disk/Network Issues**
#### **Cause**:
- **Storage Disconnects**: Network timeouts or disk failures during writes (e.g., S3, HDFS).
- **Permission Denied**: Filesystem permissions changed mid-job.

#### **Fix**:
- **Check Storage Health**:
  ```bash
  hdfs fsck /output_path  # For HDFS
  aws s3 ls s3://bucket/output_path/  # For S3
  ```
- **Retry Logic**: Use `spark.hadoop.fs.s3a.retry.limit` (S3) or similar for HDFS.
- **Validate Permissions**:
  ```bash
  hdfs dfs -ls /output_path  # Verify write permissions
  ```

---

### **3. Spark Configuration Gaps**
#### **Cause**:
- Missing settings for fault-tolerant writes (especially in cloud storage).

#### **Fix**:
- **Enable Committer Algorithms** (for S3/S3A):
  ```python
  spark.conf.set("spark.hadoop.fs.s3a.committer.name", "magic")
  spark.conf.set("spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a",
                "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory")
  ```
- **Set Hadoop Configs**:
  ```python
  spark.conf.set("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com")
  ```

---

### **4. File System Consistency**
#### **Cause**:
- **Eventual Consistency**: Cloud storage (e.g., S3) delays visibility of written files.
- **Partial Writes**: Job crashes mid-write, leaving incomplete files.

#### **Fix**:
- **Use Transactional Formats**:
  ```python
  df.write.format("delta").save("output_path")  # Delta Lake
  df.write.format("iceberg").save("output_path")  # Iceberg
  ```
- **Check for `_SUCCESS` Files**:
  ```bash
  hdfs dfs -ls /output_path/_SUCCESS  # Should exist if job succeeded
  ```

---

### **5. Code-Level Issues**
#### **Cause**:
- **Custom Sinks/UDFs**: Manual file operations that don’t close resources.
- **Shuffle Spill**: Disk spills during shuffles not cleaned up.

#### **Fix**:
- **Audit Custom Code**:
  ```python
  # Bad: Missing close()
  with open("local_file.txt", "w") as f:
      f.write("data")

  # Good: Explicit close
  f = open("local_file.txt", "w")
  try:
      f.write("data")
  finally:
      f.close()
  ```
- **Monitor Shuffle**:
  ```python
  spark.conf.set("spark.executor.extraJavaOptions",
                "-XX:+PrintGCDetails -XX:+PrintGCTimeStamps")
  ```

---

### **6. Recovery Steps**
If the job already failed:
1. **Delete Incomplete Output**:
   ```bash
   hdfs dfs -rm -r /output_path  # HDFS
   aws s3 rm --recursive s3://bucket/output_path/  # S3
   ```
2. **Re-run with Checks**:
   - Enable Spark UI (`4040` port) to monitor file writes.
   - Check executor logs for `IOException` or `FileNotFoundException`.

---

### **Preventive Best Practices**
| **Issue**               | **Prevention**                                                                 |
|-------------------------|-------------------------------------------------------------------------------|
| Resource Leaks          | Use `try-finally`, avoid `kill -9`.                                           |
| Cloud Storage Writes    | Enable S3A committers or use Delta Lake/Iceberg.                              |
| Shuffle Stability       | Tune `spark.shuffle.spill` and executor memory.                               |
| File Permissions        | Pre-create output paths with correct permissions.                             |

---

### **Example Configuration for S3**
```python
spark = SparkSession.builder \
    .config("spark.hadoop.fs.s3a.committer.name", "magic") \
    .config("spark.hadoop.fs.s3a.committer.magic.enabled", "true") \
    .config("spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a",
            "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory") \
    .getOrCreate()
```


##Q10 - Your Spark job shows too many tasks for small data. What do you check?

When a Spark job generates **too many tasks for small data**, it leads to unnecessary overhead and slower performance due to task scheduling and coordination costs. Here’s how to diagnose and fix the issue:

---

 **1. Check Partition Count**
 **Why?**  
Tasks correspond to partitions. Small data split into too many partitions causes inefficiency.  
 **How?**  
```python
df.rdd.getNumPartitions()  # Check current partition count
```
- **Expected**: For small data (e.g., <1GB), partitions should ideally match core count (e.g., 2–4x executors × cores).  
- **Fix**:  
  ```python
  df = df.coalesce(4)  # Reduce partitions (no shuffle)
  # Or repartition (with shuffle if needed):
  df = df.repartition(8)  
  ```

---

 **2. Review Default Parallelism**
 **Why?**  
`spark.default.parallelism` controls partition count for RDDs/shuffles.  
 **How?**  
```python
spark.conf.get("spark.default.parallelism")  # Default: often 200 (too high for small data)
```
- **Fix**:  
  ```python
  spark.conf.set("spark.default.parallelism", "16")  # Set to ~2-4x total cores
  ```

---

 **3. Inspect Input File Splitting**
 **Why?**  
Small files (e.g., 100x 1MB files) generate 1 partition per file.  
 **How?**  
```python
df.inputFiles()  # List input files (check count/size)
```
- **Fix**:  
  - **Merge files** upstream (e.g., `df.repartition(4).write.parquet()`).  
  - Use **`spark.sql.files.maxPartitionBytes`** to control split size:  
    ```python
    spark.conf.set("spark.sql.files.maxPartitionBytes", "128MB")  # Default: 128MB
    ```

---

 **4. Examine Shuffle Partitions**
 **Why?**  
Shuffles (joins/aggregations) use `spark.sql.shuffle.partitions` (default: 200).  
 **How?**  
```python
spark.conf.get("spark.sql.shuffle.partitions")
```
- **Fix**:  
  ```python
  spark.conf.set("spark.sql.shuffle.partitions", "16")  # Match data size
  ```

---

 **5. Check Adaptive Query Execution (AQE)**
 **Why?**  
AQE can coalesce small partitions post-shuffle (Spark 3.0+).  
 **How?**  
```python
spark.conf.get("spark.sql.adaptive.enabled")  # Should be 'true'
spark.conf.get("spark.sql.adaptive.coalescePartitions.enabled")  # Should be 'true'
```
- **Fix**: Ensure AQE is enabled:  
  ```python
  spark.conf.set("spark.sql.adaptive.enabled", "true")
  spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "16MB")
  ```

---

 **6. Look for Skewed Operations**
 **Why?**  
Skewed data can create uneven partitions, forcing more tasks.  
 **How?**  
```python
df.groupBy("key").count().orderBy("count").show()  # Check key distribution
```
- **Fix**:  
  - **Salting**: Add random prefixes to skewed keys.  
  - **Repartition**: Balance data manually.  

---

 **7. Review Code for Unnecessary Splits**
 **Why?**  
Operations like `explode()` or `flatMap` multiply rows/partitions.  
 **Fix**:  
- Filter early to reduce data volume:  
  ```python
  df.filter("col > 100").explode("array_col")  # Instead of exploding first
  ```

---

 **Summary of Key Configs**
| **Config**                                  | **Purpose**                                  | **Recommended Value for Small Data** |
|---------------------------------------------|---------------------------------------------|--------------------------------------|
| `spark.default.parallelism`                 | Default partition count for RDDs.           | 2–4x total cores (e.g., 16)          |
| `spark.sql.shuffle.partitions`              | Partitions during shuffles.                 | 16–32                                |
| `spark.sql.files.maxPartitionBytes`         | Max split size for input files.             | 64–128MB                             |
| `spark.sql.adaptive.enabled`                | Enable AQE to coalesce partitions.          | `true`                               |

---

 **Example Workflow**
```python
# 1. Read data and check partitions
df = spark.read.parquet("small_data/")
print("Partitions before:", df.rdd.getNumPartitions())  # e.g., 200

# 2. Reduce partitions and tune configs
df = df.coalesce(4)
spark.conf.set("spark.sql.shuffle.partitions", "8")

# 3. Verify after operation
df.groupBy("key").count().show()
print("Partitions after:", df.rdd.getNumPartitions())  # e.g., 8
```

**Key Takeaway**: The goal is to **align partition count with data size and cluster resources**. Fewer partitions reduce overhead, but too few can cause underutilization.

## Q11. You're integrating Spark with a JDBC source. How do you optimize?

Optimizing Spark's integration with a JDBC source requires balancing **performance**, **database load**, and **resource efficiency**. Here’s a step-by-step guide to maximize throughput while minimizing strain on the source database:

---

 **1. Partitioning for Parallel Reads**
 **Why?**  
Avoid single-threaded reads by splitting the query into parallel tasks.
 **How?**  
Use **partitioning columns**, **predicates**, or **custom bounds**:
```python
 Method 1: Partition by column (numeric/date)
df = spark.read.jdbc(
    url="jdbc:postgresql://host/db",
    table="orders",
    column="order_id",   Partition column
    lowerBound=1,
    upperBound=100000,
    numPartitions=10,   Parallel tasks
    properties={"user": "user", "password": "pass"}
)

 Method 2: Custom predicates (e.g., date ranges)
predicates = [f"date >= '2023-01-01' AND date < '2023-02-01'",
              f"date >= '2023-02-01' AND date < '2023-03-01'"]
df = spark.read.jdbc(url, table, predicates=predicates)
```
**Key Settings**:
- `numPartitions`: Match to executor cores (e.g., 10–100).
- Choose a **high-cardinality column** (e.g., IDs, dates) to avoid skew.

---

 **2. Tuning Fetch Size**
 **Why?**  
Control how many rows are fetched per round-trip (reduces network calls).
 **How?**  
```python
properties = {
    "user": "user",
    "password": "pass",
    "fetchSize": "10000"   Default is usually 10-1000
}
df = spark.read.jdbc(url, table, properties=properties)
```
- **Ideal `fetchSize`**: 1,000–50,000 (tradeoff between memory and latency).

---

 **3. Pushdown Optimization**
 **Why?**  
Filter data at the database level (not in Spark).
 **How?**  
```python
 Pushdown filters via SQL query (instead of table name)
query = "(SELECT * FROM orders WHERE status = 'shipped') AS tmp"
df = spark.read.jdbc(url, query, properties=properties)

 Or use Spark's filter pushdown:
df.filter("status = 'shipped'").explain()   Check if filter is pushed to JDBC
```
- **Verify**: In Spark UI, the SQL query should include the filter.

---

 **4. Write Optimization**
 **Batching Inserts**  
```python
df.write.jdbc(
    url=url,
    table="target_table",
    mode="append",
    properties={
        "user": "user",
        "password": "pass",
        "batchsize": "50000"   Default: 1000
    }
)
```
- **`batchsize`**: Increase (e.g., 10k–100k) to reduce round-trips.

 **Disable Constraints Temporarily**  
For bulk writes, disable indexes/constraints in the database before writing.

---

 **5. Connection Pooling**
 **Why?**  
Reuse connections to avoid overhead.
 **How?**  
```python
properties = {
    "user": "user",
    "password": "pass",
    "driver": "org.postgresql.Driver",
    "connectionPool": "HikariCP",   Use connection pool
    "maximumPoolSize": "10"         Match numPartitions
}
```

---

 **6. Avoid OOMs**
 **Driver Memory**  
- Large metadata/result sets can crash the driver.  
- **Fix**:  
  ```python
  spark.conf.set("spark.driver.memory", "4g")
  ```

 **Executor Memory**  
- Partition data to fit in executor memory:  
  ```python
  df.repartition(100).write.jdbc(...)   More partitions = less data per task
  ```

---

 **7. Database-Specific Tweaks**
| **Database** | **Optimization**                                  |
|--------------|--------------------------------------------------|
| PostgreSQL   | `SET work_mem = '256MB';` (per-connection memory) |
| MySQL        | `useServerPrepStmts=false` (for batch writes)     |
| Oracle       | `oracle.jdbc.readTimeout=300000` (timeout adjust) |

---

 **8. Monitoring & Validation**
- **Check Spark UI**: Look for skewed partitions in JDBC read stages.
- **Database Metrics**: Monitor CPU/query latency during extraction.
- **Logs**: Verify pushdown filters in Spark query plans (`df.explain()`).

---

 **Example Configurations**
 **Optimal Read**
```python
df = spark.read.jdbc(
    url=url,
    table="large_table",
    column="id",
    lowerBound=1,
    upperBound=1000000,
    numPartitions=50,
    properties={
        "user": "user",
        "password": "pass",
        "fetchSize": "50000",
        "driver": "org.postgresql.Driver"
    }
)
```

 **Optimal Write**
```python
df.repartition(20).write.jdbc(
    url=url,
    table="target_table",
    mode="overwrite",
    properties={
        "user": "user",
        "password": "pass",
        "batchsize": "50000",
        "truncate": "true"   Faster overwrites (database-dependent)
    }
)
```

---

 **Key Takeaways**
1. **Partition Smartly**: Use `numPartitions` + bounds/predicates.
2. **Minimize Round-Trips**: Tune `fetchSize` and `batchsize`.
3. **Offload Work**: Push filters/aggregations to the database.
4. **Monitor Resources**: Avoid overwhelming the DB or Spark.
5. **Database Tuning**: Adjust DB-side settings (timeouts, memory).

By aligning Spark's parallelism with the database's capabilities, you can achieve high throughput without degrading source performance.


## Q12 - You're working with streaming data. How to handle late-arriving events?
Handling late-arriving events in Spark Streaming or Structured Streaming requires a combination of **time-based buffering**, **state management**, and **grace period policies**. Here’s a comprehensive approach:

---

 **1. Define Watermarks**
**What it does**:  
Tracks event-time progress and bounds how late data can arrive before being ignored.  

**How to implement**:  
```python
from pyspark.sql.functions import window, col

stream = spark.readStream \
    .schema(schema) \
    .json("path/to/stream") \
    .withWatermark("event_time", "10 minutes")   Late threshold = 10 mins
```
- **Key**: Events older than `10 minutes` past the latest watermark are dropped.  
- **Tradeoff**: Larger values increase state size but accommodate more delays.

---

 **2. Use Windowed Aggregations with Grace Period**
**What it does**:  
Allows late data to update windows within a grace period.  

**Example**:  
```python
windowed_counts = stream \
    .groupBy(
        window(col("event_time"), "5 minutes", "1 minute"),   5-min window, sliding every 1 min
        col("user_id")
    ) \
    .count() \
    .withWatermark("event_time", "10 minutes")   Must match watermark
```
- **Grace Period**: Late data within `10 minutes` of the window’s end time can update results.  
- **Output Mode**: Use `update` or `complete` (not `append`) to reflect late arrivals.

---

 **3. Handle Late Events in Stateful Operations**
**For custom state (e.g., sessionization)**:  
Use `mapGroupsWithState` or `flatMapGroupsWithState` to:  
- **Timeout stale state**: Discard state after inactivity.  
- **Merge late updates**: Re-process if within watermark bounds.  

**Example (Scala API)**:  
```scala
val query = stream
  .groupByKey(_.userId)
  .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout) {
    case (userId, events, state) =>
      // Custom logic to handle late events
      if (state.hasTimedOut) cleanup()
      else updateState()
  }
```

---

 **4. Output Modes for Late Data**
| **Mode**     | **Behavior**                                  | **Use Case**                     |
|--------------|----------------------------------------------|----------------------------------|
| `append`     | Ignores late data post-watermark.            | Simple filtering.                |
| `update`     | Updates existing rows for late data.         | Real-time dashboards.            |
| `complete`   | Recomputes all results (expensive).          | Batch-like correctness.          |

**Example**:  
```python
query = windowed_counts.writeStream \
    .outputMode("update") \
    .format("console") \
    .start()
```

---

 **5. Reprocessing with Sinks**
For critical late data:  
- **Write raw late events** to a dead-letter queue (e.g., Kafka, Delta Lake).  
- **Re-process periodically** using batch jobs.  

**Example**:  
```python
 Write late events to Delta Lake
late_events = stream.filter("event_time < watermark")
late_events.writeStream \
    .format("delta") \
    .option("path", "/path/late_events") \
    .start()
```

---

 **6. Tune Checkpointing**
**Why**:  
Ensures state recovery after failures.  
**How**:  
```python
query = stream.writeStream \
    .option("checkpointLocation", "/path/checkpoint") \
    .start()
```
- **Required** for stateful operations (watermarks, aggregations).

---

 **7. Database Integration (CDC)**
For Change Data Capture (CDC) pipelines:  
- Use **Debezium** or **AWS DMS** to capture late database updates.  
- Merge with streaming data using `MERGE` in Delta Lake/Iceberg.  

**Example**:  
```sql
MERGE INTO target_table t
USING late_events_stream s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
```

---

 **Key Configurations**
| **Parameter**                     | **Purpose**                                  | **Recommended Value**          |
|-----------------------------------|---------------------------------------------|--------------------------------|
| `spark.sql.streaming.statefulOperator.checkpointInterval` | Frequency of checkpoints. | `2 minutes` (default)          |
| `spark.sql.shuffle.partitions`    | Parallelism for stateful ops.               | 2–4x cores                     |
| `spark.sql.streaming.noDataMicros`| How long to wait for new data.              | `600000000` (10 mins)          |

---

 **Summary of Strategies**
1. **Watermarks**: Set thresholds for late data (`withWatermark`).  
2. **Grace Periods**: Allow window updates (`groupBy(window)`).  
3. **State Timeouts**: Clean up stale state (`mapGroupsWithState`).  
4. **Dead-Letter Queues**: Re-process late events offline.  
5. **Checkpointing**: Ensure fault tolerance.  

**Tradeoffs**:  
- **Low Latency**: Smaller watermarks/grace periods (risk dropping data).  
- **Correctness**: Larger buffers (higher state size).  

By combining these techniques, you can balance real-time responsiveness with data completeness.


## Q13 - Spark job fails with 'Stage canceled' error. How do you debug?
When a Spark job fails with a **"Stage canceled"** error, it typically indicates that a stage was intentionally terminated due to an external issue or internal Spark logic. Here’s a systematic approach to debug and resolve the problem:

---

 **1. Check Spark UI and Logs**
 **Step 1: Access Spark UI**
- Navigate to `http://<driver-node>:4040` (or the configured port) to inspect:
  - **Stages Tab**: Look for the canceled stage (marked in red) and check its details.
  - **Executors Tab**: Verify if executors are alive or were lost (`ExecutorLostFailure`).
  - **Environment Tab**: Review misconfigured settings.

 **Step 2: Examine Logs**
- **Driver Logs**: Search for `Stage cancelled` and preceding warnings/errors.
  ```bash
  grep -i "Stage cancelled" /path/to/spark-driver.log
  ```
- **Executor Logs**: Check for OOM, network timeouts, or task failures.
  ```bash
  grep -i "ERROR" /path/to/executor.log
  ```

---

 **2. Common Causes and Fixes**
 **A. Executor Failures**
- **Symptoms**: `ExecutorLostFailure`, `Lost executor X on Y`.
- **Root Causes**:
  - **OOM Errors**: Executors ran out of memory.
  - **Node Crashes**: Cluster resource issues (e.g., preemption, hardware failure).
- **Fixes**:
  - Increase executor memory:
    ```python
    spark-submit --executor-memory 8G ...
    ```
  - Tune garbage collection:
    ```python
    spark.conf.set("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:MaxGCPauseMillis=200")
    ```

 **B. Task Failures Exceeding Threshold**
- **Symptoms**: `Task failed 4 times (most recent failure: ...)`.
- **Root Cause**: Repeated task failures trigger stage cancellation (default: 4 retries).
- **Fixes**:
  - Increase retry limit:
    ```python
    spark.conf.set("spark.task.maxFailures", "8")
    ```
  - Debug task errors (e.g., data skew, UDF bugs):
    ```python
    df.sample(0.1).collect()   Test on a sample
    ```

 **C. Manual Cancellation**
- **Symptoms**: `Cancelled because of attempt failure on another executor`.
- **Root Cause**:
  - User-initiated cancellation (e.g., `sparkContext.cancelJob()`).
  - Upstream stage failed, forcing dependent stages to cancel.
- **Fix**: Check job submission scripts/notebooks for accidental cancellations.

 **D. Dynamic Resource Scaling**
- **Symptoms**: `Stage cancelled because BarrierJob failed`.
- **Root Cause**: Cluster managers (YARN/K8s) reclaimed resources.
- **Fixes**:
  - Disable dynamic allocation if unstable:
    ```python
    spark.conf.set("spark.dynamicAllocation.enabled", "false")
    ```
  - Allocate fixed resources:
    ```python
    spark-submit --num-executors 10 ...
    ```

 **E. Shuffle Service Issues**
- **Symptoms**: `ShuffleBlockFetcherIterator: Failed to get shuffle data`.
- **Root Cause**: Shuffle service crashes or network partitions.
- **Fixes**:
  - Restart shuffle service on worker nodes.
  - Increase shuffle timeout:
    ```python
    spark.conf.set("spark.shuffle.io.retryWait", "60s")
    spark.conf.set("spark.shuffle.io.maxRetries", "10")
    ```

---

 **3. Advanced Debugging**
 **Step 1: Reproduce with Smaller Data**
- Test on a subset to isolate the issue:
  ```python
  df.limit(1000).write.parquet("...")   Check if cancellation persists
  ```

 **Step 2: Enable Debug Logging**
- Add verbose logging to pinpoint failures:
  ```python
  spark.sparkContext.setLogLevel("DEBUG")
  ```

 **Step 3: Check Data Skew**
- Skewed partitions cause long-running tasks that may time out:
  ```python
  df.groupBy("key").count().orderBy("count").show()
  ```
  - **Fix**: Salt skewed keys or repartition.

---

 **4. Key Configurations to Review**
| **Config**                          | **Purpose**                                  | **Recommended Value**          |
|-------------------------------------|---------------------------------------------|--------------------------------|
| `spark.task.maxFailures`            | Max task retries before stage cancel.       | `8` (default: `4`)             |
| `spark.executor.memory`             | Executor heap size.                         | `8G` (adjust per workload)     |
| `spark.shuffle.service.enabled`     | External shuffle service.                   | `true` (for YARN/K8s)          |
| `spark.dynamicAllocation.enabled`   | Dynamic executor scaling.                   | `false` (if unstable)          |

---

 **5. Example Fix for OOM-Induced Cancellation**
```python
spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.executor.memory", "8G") \
    .config("spark.executor.memoryOverhead", "2G") \   For off-heap
    .config("spark.sql.shuffle.partitions", "200") \   Reduce partition size
    .getOrCreate()
```

---

 **Summary**
1. **Inspect Logs/UI**: Identify executor failures, task errors, or manual cancellations.
2. **Tune Resources**: Increase memory, retries, or disable dynamic allocation.
3. **Check Data**: Address skew or corrupt records.
4. **Review Configs**: Adjust shuffle/timeout settings.

**Pro Tip**: If the issue persists, enable **Spark event logging** (`spark.eventLog.enabled=true`) and analyze the history server post-failure.


## Q14 - Output records have strange characters. What might be wrong?
When your Spark job outputs records with **strange characters** (e.g., `�`, `Ã©`, `\\\\x00`, or garbled text), it typically indicates an **encoding, serialization, or data corruption issue**. Here’s how to diagnose and fix the problem:

---

 **1. Check Source Data Encoding**
 **Issue**:  
The input file/database uses an encoding (e.g., `UTF-8`, `ISO-8859-1`) that wasn’t correctly interpreted.  
 **Fix**:  
Explicitly specify the encoding when reading:  
```python
 For text files:
df = spark.read.text("path/to/file.txt", encoding="UTF-8")

 For CSV/JSON:
df = spark.read.option("encoding", "UTF-8").csv("path/to/file.csv")
```
- **Common Encodings**: `UTF-8` (default), `ISO-8859-1`, `Windows-1252`.  
- **Check File Encoding**:  
  ```bash
  file -i input.txt   Linux/macOS
  ```

---

 **2. Binary/Corrupt Data in Text Files**
 **Issue**:  
The file contains binary data (e.g., BOM headers, non-text bytes) or is corrupted.  
 **Fix**:  
- **Skip BOM Headers** (for UTF-8 with BOM):  
  ```python
  df = spark.read.option("encoding", "UTF-8").option("charset", "UTF-8").csv("file.csv")
  ```
- **Filter Out Null Bytes**:  
  ```python
  from pyspark.sql.functions import col
  df = df.filter(~col("value").contains("\x00"))
  ```

---

 **3. Serialization Issues**
 **Issue**:  
Data was serialized/deserialized incorrectly (e.g., Parquet/ORC schema mismatch).  
 **Fix**:  
- **For Parquet/ORC**:  
  ```python
  df = spark.read.parquet("path.parquet")   Ensure schema matches
  ```
- **For Kafka/Avro**:  
  ```python
  df = spark.read.format("kafka") \
       .option("kafka.bootstrap.servers", "host:9092") \
       .option("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") \
       .load()
  ```

---

 **4. Character Escaping in CSV/JSON**
 **Issue**:  
Special characters (e.g., `"`, `\n`) are improperly escaped.  
 **Fix**:  
- **CSV**:  
  ```python
  df = spark.read.option("escape", "\\").option("quote", "\"").csv("file.csv")
  ```
- **JSON**:  
  ```python
  df = spark.read.option("multiLine", "true").json("file.json")
  ```

---

 **5. Database Source Issues**
 **Issue**:  
Database collation/encoding (e.g., `latin1` vs. `UTF-8`) mismatches Spark’s expectations.  
 **Fix**:  
- **JDBC Read**:  
  ```python
  df = spark.read.jdbc(
      url="jdbc:mysql://host/db",
      table="table",
      properties={"useUnicode": "true", "characterEncoding": "UTF-8"}
  )
  ```

---

 **6. Debugging Steps**
1. **Inspect Raw Data**:  
   ```python
   df.select("problem_column").show(truncate=False)   Show full content
   ```
2. **Check Hex Values**:  
   ```python
   from pyspark.sql.functions import hex
   df.select(hex("problem_column")).show()
   ```
3. **Reproduce Locally**:  
   Read a sample file in Python to isolate Spark:  
   ```python
   with open("file.txt", "r", encoding="utf-8") as f:
       print(f.read())
   ```

---

 **7. Common Scenarios & Fixes**
| **Symptom**               | **Likely Cause**               | **Solution**                              |
|---------------------------|--------------------------------|-------------------------------------------|
| `Ã©` instead of `é`       | UTF-8 misinterpreted as Latin1 | Specify `encoding="UTF-8"`                |
| `�` (replacement chars)   | Invalid UTF-8 bytes            | Use `encoding="ISO-8859-1"` or clean data |
| `\\\\x00`                 | Null bytes in text             | Filter or replace nulls                   |
| Garbled binary            | Wrong file format              | Use binary readers (e.g., `spark.read.format("binaryFile")`) |

---

 **Example Fix for UTF-8 Misinterpretation**
```python
 Read CSV with explicit UTF-8 encoding and escape rules
df = spark.read.option("encoding", "UTF-8") \
     .option("escape", "\\") \
     .csv("input.csv")

 Write with same encoding
df.write.option("encoding", "UTF-8").parquet("output.parquet")
```

---

 **Key Takeaways**
1. **Always specify encoding** when reading text files.  
2. **Validate source data** for corruption/binary content.  
3. **Match serialization formats** (e.g., Parquet schemas, Kafka deserializers).  
4. **Debug column-wise** using `show()` or `hex()`.  

**Pro Tip**: If the issue persists, use `spark.read.format("binaryFile")` to inspect raw bytes before parsing.


## Q15 - Stakeholders want data lineage for each pipeline. What’s your approach?
To provide **data lineage** for each pipeline, you need to track the **origin, transformations, and dependencies** of data across its lifecycle. Here’s a structured approach to implement lineage tracking in Spark-based pipelines:

---

 **1. Automated Lineage Capture**
 **A. Built-in Spark Lineage (Logical Plans)**
Spark’s query plans inherently capture lineage at the transformation level. Extract it programmatically:
```python
 Get logical plan as JSON
df = spark.table("source").filter("id > 100").groupBy("category").count()
lineage_json = df._jdf.queryExecution().logical().toJSON()
```
- **Pros**: No extra tools needed.
- **Cons**: Low-level; requires parsing JSON for business context.

 **B. OpenLineage Integration**
Use OpenLineage (open standard) with Spark listeners:
```python
spark.sparkContext._jsc.sc().addSparkListener(LineageTrackingListener())
```
- **Tools**: Marquez, Amundsen, or Egeria.
- **Output**: Tracks inputs → transformations → outputs as a DAG.

---

 **2. Metadata Tagging**
Embed lineage metadata directly in output files/databases:
```python
from pyspark.sql.functions import lit

df_with_lineage = df.withColumn("_lineage", lit({
    "source": "s3://source-data/2023/01/01",
    "transformations": ["filter_id>100", "groupBy_category"],
    "generated_at": "2023-01-01T12:00:00Z"
}))

df_with_lineage.write.parquet("s3://output-data/")
```

---

 **3. Pipeline Orchestration Tools**
Integrate with orchestration platforms that natively track lineage:
- **Airflow**: Use `Dataset` and `Lineage` decorators.
  ```python
  from airflow.lineage import Dataset
  @task(outlets=[Dataset("s3://output-data")])
  def run_spark_job():
      spark_job()
  ```
- **Databricks**: Unity Catalog tracks lineage across notebooks/jobs.
- **Azure Purview/AWS Glue**: Automatically scans Spark jobs for lineage.

---

 **4. Custom Lineage Tracking**
For granular control, log lineage to a metadata store (e.g., PostgreSQL, Neo4j):
```python
def log_lineage(spark, job_id, inputs, outputs, transformations):
    lineage_data = {
        "job_id": job_id,
        "inputs": inputs,   e.g., ["s3://raw/users", "s3://raw/orders"]
        "transformations": transformations,   e.g., ["join", "aggregation"]
        "outputs": outputs,   e.g., "s3://processed/reports"
        "timestamp": datetime.utcnow()
    }
     Write to metadata DB
    spark.createDataFrame([lineage_data]).write.jdbc(...)
```

---

 **5. Visualization & Governance**
- **Visualization Tools**:  
  - **Amundsen**: LinkedIn’s tool for data discovery + lineage.  
  - **DataHub**: Metadata framework with lineage graphs.  
- **Governance**:  
  - Tag sensitive columns (e.g., PII) in lineage metadata.  
  - Enforce retention policies based on lineage.

---

 **6. Example End-to-End Workflow**
1. **Capture**: Use Spark listeners to log lineage during job execution.  
2. **Store**: Write lineage metadata to a searchable store (e.g., Elasticsearch).  
3. **Visualize**: Render dependencies in tools like Tableau/PowerBI.  
4. **Alert**: Notify stakeholders of upstream changes (e.g., schema drift).  

```mermaid
graph LR
    A[Source: s3://raw/users] --> B[Spark Job: Clean & Join]
    C[Source: s3://raw/orders] --> B
    B --> D[Output: s3://processed/reports]
    B --> E[Lineage DB]
    E --> F[Amundsen UI]
```

---

 **Key Considerations**
| **Requirement**       | **Solution**                          | **Tools**                          |
|-----------------------|---------------------------------------|------------------------------------|
| Lightweight           | Spark logical plans + metadata tags   | OpenLineage, Custom JSON           |
| Enterprise-grade      | Integrated governance platforms       | Databricks Unity, Azure Purview    |
| Visualization         | Graph databases + UI                  | Amundsen, DataHub, Neo4j           |
| Compliance            | PII tagging + audit logs              | Collibra, Alation                  |

---

 **Best Practices**
1. **Standardize Metadata**: Use common fields (e.g., `source`, `owner`, `last_updated`).  
2. **Automate**: Hook lineage capture into CI/CD pipelines.  
3. **Validate**: Regularly check lineage accuracy with sample audits.  

**Example Stakeholder Report**:
```
Data Asset: s3://processed/reports
- Sources: s3://raw/users, s3://raw/orders
- Transformations: join(user_id), agg(sum(revenue))
- Last Updated: 2023-01-01
- Owner: analytics-team@company.com
```

By combining automation, metadata management, and visualization, you can provide transparent, auditable lineage tailored to stakeholder needs.


## Q16 - A pipeline failed due to special characters in column names. How do you handle?
When a pipeline fails due to **special characters in column names** (e.g., spaces, hyphens, emojis, or symbols like `@`, ``, `()`), follow this systematic approach to handle and prevent the issue:

---

 **1. Identify Problematic Columns**
First, detect which columns contain special characters:
```python
 Check for non-alphanumeric characters in column names
problematic_columns = [col for col in df.columns if not col.isidentifier()]
print("Invalid columns:", problematic_columns)
```
**Example Output**:  
`['user-id', 'first name', 'amount($)']`

---

 **2. Clean Column Names Automatically**
Replace or remove special characters to comply with Spark's naming rules (alphanumeric + underscore):  
```python
from pyspark.sql.functions import col
import re

 Clean all column names
def clean_column(name):
    return re.sub(r'[^a-zA-Z0-9_]', '_', name)   Replace special chars with _

 Apply to DataFrame
df_clean = df.select([col(c).alias(clean_column(c)) for c in df.columns])
```
**Result**:  
- `user-id` → `user_id`  
- `first name` → `first_name`  
- `amount($)` → `amount_`  

**Alternative**: Use `trim` or `regexp_replace` for specific cases:
```python
from pyspark.sql.functions import regexp_replace
df = df.withColumnRenamed("first name", "first_name")
```

---

 **3. Handle Reserved Keywords**
Some column names (e.g., `order`, `timestamp`) conflict with SQL reserved keywords. Escape them with backticks:  
```python
 For querying reserved keywords
df.createOrReplaceTempView("temp_table")
clean_df = spark.sql("SELECT `order`, `timestamp` FROM temp_table")
```

---

 **4. Prevent Future Issues**
 **A. Schema Enforcement**
Validate column names when reading data:
```python
 Read data and validate columns
df = spark.read.csv("path/to/file.csv")
invalid_cols = [c for c in df.columns if not c.isidentifier()]
if invalid_cols:
    raise ValueError(f"Invalid columns: {invalid_cols}")
```

 **B. Preprocessing Script**
Add a preprocessing step to clean column names before pipeline execution:
```python
def sanitize_columns(df):
    return df.toDF(*[clean_column(c) for c in df.columns])

df = spark.read.csv("path/to/file.csv")
df = sanitize_columns(df)
```

 **C. Use Delta Lake/Iceberg**
These table formats enforce stricter schema rules and can reject invalid names upfront.

---

 **5. Special Cases**
 **Unicode/Emoji Characters**
Replace non-ASCII characters (e.g., `ü`, `→`, `😊`) if they cause issues:
```python
def remove_unicode(name):
    return name.encode('ascii', 'ignore').decode('ascii')   Drops non-ASCII
```

 **Case Sensitivity**
Standardize to lowercase/uppercase to avoid conflicts:
```python
df = df.toDF(*[c.lower() for c in df.columns])
```

---

 **6. Example End-to-End Fix**
```python
from pyspark.sql import SparkSession
import re

spark = SparkSession.builder.getOrCreate()

 Step 1: Read raw data
df = spark.read.csv("data_with_special_chars.csv", header=True)

 Step 2: Clean column names
df_clean = df.toDF(*[re.sub(r'[^a-zA-Z0-9_]', '_', c) for c in df.columns])

 Step 3: Write sanitized data
df_clean.write.parquet("cleaned_data.parquet")
```

**Output Schema Before/After**:
```
Before: user-id, first name, amount($)
After:  user_id, first_name, amount_
```

---

 **Key Takeaways**
| **Scenario**               | **Solution**                              | **Tools/Methods**                     |
|----------------------------|------------------------------------------|---------------------------------------|
| Spaces/symbols             | Replace with `_`                         | `regexp_replace`, `alias`             |
| Reserved keywords          | Escape with backticks (` `` `)           | Spark SQL                             |
| Unicode/emoji              | Remove or transliterate                  | `encode('ascii', 'ignore')`           |
| Prevention                 | Schema validation + preprocessing        | Delta Lake, sanitize_columns()        |

**Best Practices**:
1. **Standardize early**: Clean column names at ingestion.
2. **Log changes**: Track column name mappings for auditing.
3. **Test rigorously**: Validate with unit tests for edge cases (e.g., `null`, empty strings).

By automating column name sanitization and enforcing naming conventions, you can avoid pipeline failures and improve maintainability.


## Q17 - You want to validate data freshness in S3. How do you check?

Validating data freshness in S3 ensures your pipeline processes up-to-date data. Here’s how to check it programmatically and automate monitoring:

---

 **1. Check File Timestamps in S3**
 **A. Using AWS CLI**
```bash
 Get last modified time of a specific file
aws s3 ls s3://your-bucket/path/to/file.csv --recursive | awk '{print $1, $2, $4}'

 Check latest file in a prefix
aws s3 ls s3://your-bucket/path/ --recursive | sort | tail -n 1
```
**Output**:  
`2023-10-01 12:30:00 data/2023-10-01/file.csv`

 **B. Using Python (boto3)**
```python
import boto3
from datetime import datetime, timezone

s3 = boto3.client('s3')
response = s3.list_objects_v2(Bucket='your-bucket', Prefix='path/to/data/')

 Get latest file timestamp
latest_file = max(response['Contents'], key=lambda x: x['LastModified'])
freshness = (datetime.now(timezone.utc) - latest_file['LastModified']).total_seconds() / 3600
print(f"Data is {freshness:.2f} hours old")
```

---

 **2. Validate Metadata in Spark**
Read S3 file metadata directly in PySpark:
```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, max as spark_max

spark = SparkSession.builder.getOrCreate()

 Read data and extract file timestamps
df = spark.read.parquet("s3a://your-bucket/path/")
df_with_metadata = df.withColumn("file_path", input_file_name())

 Get latest file timestamp (requires Hadoop FileSystem)
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(
    spark._jsc.hadoopConfiguration()
)
latest_timestamp = df_with_metadata.select("file_path").distinct().rdd.map(
    lambda x: fs.getFileStatus(
        spark._jvm.org.apache.hadoop.fs.Path(x.file_path)
    ).getModificationTime() / 1000   Convert ms to seconds
).max()

from datetime import datetime
print(f"Latest data timestamp: {datetime.fromtimestamp(latest_timestamp)}")
```

---

 **3. Automate Freshness Checks**
 **A. Airflow Sensor**
```python
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from datetime import timedelta

s3_sensor = S3KeySensor(
    task_id="check_data_freshness",
    bucket_key="path/to/latest_file.parquet",
    bucket_name="your-bucket",
    aws_conn_id="aws_default",
    timeout=60 * 60 * 24,   Timeout after 1 day
    poke_interval=60 * 60,   Check hourly
    mode="reschedule",
)
```

 **B. Lambda + CloudWatch Alerts**
1. **Lambda Function** (checks freshness every hour):
```python
import boto3

def lambda_handler(event, context):
    s3 = boto3.client('s3')
    response = s3.list_objects_v2(Bucket='your-bucket', Prefix='data/')
    
    latest_file = max(response['Contents'], key=lambda x: x['LastModified'])
    freshness_hours = (datetime.now(timezone.utc) - latest_file['LastModified']).total_seconds() / 3600
    
    if freshness_hours > 24:
        sns = boto3.client('sns')
        sns.publish(
            TopicArn='arn:aws:sns:us-east-1:1234567890:alerts',
            Message=f"Data in s3://your-bucket/data/ is stale ({freshness_hours:.1f} hours old)"
        )
```
2. **CloudWatch Trigger**: Schedule to run hourly.

---

 **4. Data Freshness Dashboard**
Use AWS Athena + QuickSight to visualize freshness:
```sql
-- Athena query to track file timestamps
SELECT
    split("$path", '/')[-1] as file_name,
    date_parse(
        substr("$path", position('dt=' IN "$path") + 3, 10),
        '%Y-%m-%d'
    ) as partition_date,
    from_unixtime(max("$file_modified_time") / 1000) as latest_update
FROM
    your_table
GROUP BY
    1, 2
```

---

 **5. Key Checks for Data Freshness**
| **Check**                | **Method**                                                                 | **Alert Threshold**          |
|--------------------------|---------------------------------------------------------------------------|------------------------------|
| File timestamp           | `aws s3 ls` or `boto3.list_objects_v2()`                                 | >1 hour (for real-time)      |
| Partition age            | Hive-style paths (`s3://bucket/dt=2023-10-01/`)                          | >24 hours (for daily batches)|
| _SUCCESS file            | Check for `_SUCCESS` marker in S3 prefix                                  | Missing after expected time  |
| Metadata column          | Extract `last_updated` from data itself (if available)                    | Custom logic                 |

---

 **6. Example Validation in PySpark**
```python
from pyspark.sql.functions import max as spark_max

 Assuming a 'timestamp' column exists in the data
max_timestamp = spark.read.parquet("s3a://your-bucket/path/") \
    .select(spark_max("timestamp")).first()[0]

print(f"Latest record timestamp: {max_timestamp}")
```

---

 **Best Practices**
1. **Monitor Partitioned Data**:  
   - Validate Hive-style paths (e.g., `s3://bucket/dt=2023-10-01/`).  
   - Use AWS Glue Crawlers to keep metastore updated.  
2. **Alert on Staleness**:  
   - Trigger alerts if no new files arrive within expected intervals.  
3. **Combine Metrics**:  
   - Cross-check S3 timestamps with data-level timestamps for corruption edge cases.  

**Pro Tip**: For critical pipelines, implement **two-phase validation**:  
1. Check S3 file timestamps (quick).  
2. Verify max timestamp in the data itself (accurate but slower).  

This ensures both file delivery and content freshness.


## Q18 - You're seeing 'Too many open files' error. What could be wrong?
The **"Too many open files"** error in Spark indicates that the system’s file descriptor limit has been exceeded. This is a common issue in data-intensive workloads. Here’s how to diagnose and resolve it:

---

 **1. Root Causes**
 **A. System-Level Limits**
- **Default limits** are too low (e.g., 1024 descriptors per process on Linux).
- **Spark jobs** open many files (especially with thousands of partitions or small files).

 **B. Resource Leaks**
- Unclosed file handles in:
  - **Spark operations**: Shuffle files, broadcast variables.
  - **Custom code**: UDFs/Java code not closing `InputStream`s/`OutputStream`s.

 **C. Small File Problem**
- Reading/writing **thousands of small files** (e.g., 10,000x 1MB files) strains descriptors.

---

 **2. Immediate Fixes**
 **Increase File Descriptor Limits**
**Linux/MacOS**:
```bash
 Check current limits
ulimit -n   User-level
cat /proc/sys/fs/file-max   System-wide

 Temporarily increase (for current session)
ulimit -n 65536

 Permanently increase (add to /etc/security/limits.conf)
* soft nofile 65536
* hard nofile 65536
```
**Spark Config** (if running as a service):
```bash
 Add to spark-env.sh
SPARK_DAEMON_ULIMIT="nofile=65536"
```

 **Reduce Open Files in Spark**
1. **Consolidate small files**:
   ```python
   df.repartition(100).write.parquet("s3a://bucket/output")   Fewer partitions = fewer files
   ```
2. **Tune shuffle partitions**:
   ```python
   spark.conf.set("spark.sql.shuffle.partitions", "200")   Default: 200
   ```

---

 **3. Debug Open Files**
 **Identify Leaky Processes**
```bash
 Find Spark processes
ps aux | grep spark

 Check open files count for a PID
ls -l /proc/<PID>/fd | wc -l

 List open files (Linux)
lsof -p <PID> | wc -l
```

 **Check Spark UI**
- **Environment Tab**: Verify `spark.files.open` and `spark.redactor.regex`.
- **Executors Tab**: Look for executor failures due to `FileNotFoundException`.

---

 **4. Fix Resource Leaks**
 **In Custom Code**
- **Close resources explicitly**:
  ```python
   Bad: Leaks file handles
  with open("/tmp/file.txt", "w") as f:
      f.write("data")

   Good: Explicit close
  f = open("/tmp/file.txt", "w")
  try:
      f.write("data")
  finally:
      f.close()
  ```

 **In Spark Jobs**
- **Clean up temporary shuffle files**:
  ```python
  spark.conf.set("spark.cleaner.referenceTracking.cleanCheckpoints", "true")
  ```

---

 **5. Configuration Tuning**
| **Parameter**                          | **Purpose**                                  | **Recommended Value**       |
|----------------------------------------|---------------------------------------------|----------------------------|
| `spark.files.maxPartitionBytes`        | Reduce partitions (fewer open files).       | `128MB` (default)          |
| `spark.sql.sources.bucketing.enabled`  | Bucketing reduces small files.              | `true`                     |
| `spark.shuffle.service.enabled`        | Offloads shuffle files to external service. | `true` (YARN/K8s)          |
| `spark.executor.extraJavaOptions`      | Increase JVM file limits.                   | `-XX:MaxDirectMemorySize=1G` |

---

 **6. Advanced: OS-Level Tuning**
**Kernel Parameters** (Linux):
```bash
 Increase system-wide file limit
echo "fs.file-max = 100000" >> /etc/sysctl.conf
sysctl -p

 Increase ephemeral ports (for S3/HDFS connections)
echo "net.ipv4.ip_local_port_range = 10000 65000" >> /etc/sysctl.conf
```

---

 **7. Example Workflow**
1. **Check Limits**:
   ```bash
   ulimit -n
   ```
2. **Consolidate Files**:
   ```python
   df = spark.read.parquet("s3a://bucket/input/")
   df.coalesce(100).write.parquet("s3a://bucket/output/")
   ```
3. **Monitor**:
   ```bash
   watch -n 1 "lsof -p <Spark_PID> | wc -l"
   ```

---

 **Key Takeaways**
1. **Increase Limits**: Set `ulimit -n 65536` and tune OS settings.
2. **Reduce Files**: Consolidate small files, decrease partitions.
3. **Fix Leaks**: Audit custom code/UDFs for unclosed resources.
4. **Monitor**: Use `lsof` and Spark UI to track file usage.

**Pro Tip**: For cloud storage (S3/HDFS), enable **asynchronous I/O** to reduce descriptor pressure:
```python
spark.conf.set("spark.hadoop.fs.s3a.threads.max", "20")
```


##Q19 - Data has duplicates due to appending Parquet without dedup. How to fix?

When duplicates arise from **appending to Parquet files without deduplication**, follow this step-by-step approach to clean existing data and prevent future issues:

---

 **1. Identify Duplicates**
First, detect duplicates using a **unique key** (e.g., `id`, `timestamp`, or composite columns):
```python
from pyspark.sql.functions import count

# Check duplicate counts
dupe_counts = df.groupBy("id", "timestamp").agg(count("*").alias("count")) \
                .filter("count > 1")
dupe_counts.show()
```

---

 **2. Deduplicate Existing Data**
 **Option A: Overwrite with Distinct Records**
```python
# Read all data, deduplicate, and overwrite
deduped_df = spark.read.parquet("s3://your-bucket/data/") \
                 .dropDuplicates(["id", "timestamp"])  # Or use primary key(s)

deduped_df.write.mode("overwrite").parquet("s3://your-bucket/data/")
```

 **Option B: Use Delta Lake for ACID Transactions**
```python
from delta.tables import DeltaTable

# Convert to Delta Lake and deduplicate
delta_table = DeltaTable.convertToDelta(spark, "parquet.`s3://your-bucket/data/`")
delta_table.alias("target").merge(
    delta_table.alias("source"),
    "target.id = source.id AND target.timestamp = source.timestamp"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
```

---

 **3. Prevent Future Duplicates**
 **A. Use `ignoreDuplicates` with Delta Lake**
```python
df.write.format("delta") \
    .mode("append") \
    .option("ignoreDuplicates", "true") \
    .save("s3://your-bucket/data/")
```

 **B. Pre-Deduplicate Before Append**
```python
# Deduplicate in-memory before writing
df.distinct().write.mode("append").parquet("s3://your-bucket/data/")

# Or use window functions for complex logic
from pyspark.sql.window import Window
window = Window.partitionBy("id").orderBy(col("timestamp").desc())
deduped_df = df.withColumn("rank", rank().over(window)) \
               .filter("rank = 1") \
               .drop("rank")
```

 **C. Leverage Partitioning**
Partition by a column (e.g., date) to isolate new data before appending:
```python
df.write.partitionBy("date").mode("append").parquet("s3://your-bucket/data/")
```

---

 **4. Optimize Deduplication Performance**
- **Filter New Data First**:  
  ```python
  new_data = spark.read.parquet("s3://new-data/") \
                   .join(deduped_df, ["id", "timestamp"], "left_anti")  # Only new records
  ```
- **Use Bloom Filters** (for large datasets):  
  ```python
  spark.conf.set("spark.sql.parquet.bloom.filter.enabled", "true")
  ```

---

 **5. Example End-to-End Fix**
```python
# Step 1: Read all data (including duplicates)
df = spark.read.parquet("s3://your-bucket/data/")

# Step 2: Deduplicate (keep latest record per ID)
from pyspark.sql.window import Window
window = Window.partitionBy("id").orderBy(col("timestamp").desc())
deduped_df = df.withColumn("rank", rank().over(window)) \
               .filter("rank = 1") \
               .drop("rank")

# Step 3: Overwrite with clean data
deduped_df.write.mode("overwrite").parquet("s3://your-bucket/data/")

# Step 4: Future writes use Delta Lake to prevent dupes
deduped_df.write.format("delta") \
    .mode("append") \
    .option("ignoreDuplicates", "true") \
    .save("s3://your-bucket/data/")
```

---

 **Key Considerations**
| **Approach**              | **Pros**                                    | **Cons**                                  |
|---------------------------|---------------------------------------------|------------------------------------------|
| **`dropDuplicates()`**    | Simple, no new dependencies.                | Slow for large datasets (full scan).     |
| **Delta Lake Merge**      | ACID compliant, incremental updates.        | Requires Delta Lake setup.               |
| **Pre-filter New Data**   | Fast for incremental loads.                 | Needs business logic for uniqueness.     |

**Best Practices**:
1. **Add Checksums**: Store hash keys (e.g., `md5(concat(id, timestamp))`) to detect dupes quickly.
2. **Monitor**: Log duplicate counts during pipeline runs.
3. **Backup**: Snapshot data before overwriting.

By combining **immediate cleanup** with **preventive measures**, you ensure data integrity while maintaining pipeline efficiency.




 **1. Check Spark UI for Immediate Clues**
Access the Spark UI (`http://<driver-node>:4040`) during/after the job and look for:
- **Skewed Stages**: Tasks with significantly longer durations than others.
  - *Fix*: Use `df.groupBy().count()` to check key distribution; apply salting or repartitioning.
- **Garbage Collection (GC) Time**: High GC pauses (>10% of task time).
  - *Fix*: Tune JVM settings:
    ```python
    spark.conf.set("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35")
    ```
- **Shuffle Spill**: Excessive disk spills (`spark.shuffle.spill` metrics).
  - *Fix*: Increase executor memory or reduce `spark.sql.shuffle.partitions`.

---

 **2. Review Cluster Metrics**
- **Resource Saturation**:
  - Check CPU, memory, and network usage on worker nodes (using `htop`, `dstat`, or cloud monitoring tools).
  - *Fix*: Allocate more resources or reduce parallelism (`spark.executor.instances`).
- **Node Failures**:
  - Look for `ExecutorLost` errors in logs.
  - *Fix*: Stabilize cluster or use spot instances with fault tolerance.

---

 **3. Identify Data/Code Issues**
# **A. Data Skew**
- **Diagnose**:
  ```python
  df.groupBy("key").count().orderBy("count", ascending=False).show()
  ```
- **Fix**:
  - **Salting**: Add random prefixes to skewed keys.
    ```python
    df = df.withColumn("salted_key", concat(col("key"), lit("_"), (rand() * 10).cast("int")))
    ```
  - **Repartition**: Balance data manually:
    ```python
    df = df.repartition(100, "key")  # More partitions for skewed keys
    ```

# **B. Inefficient Queries**
- **Diagnose**: Use `df.explain()` to check for:
  - Cartesian products.
  - Full scans due to missing filters.
- **Fix**:
  - Push filters upstream:
    ```python
    df.filter("date > '2023-01-01'").join(...)  # Instead of joining first
    ```
  - Use broadcast joins for small tables:
    ```python
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100MB")
    ```

---

 **4. Check External Dependencies**
- **Slow Data Sources**: JDBC, S3, or HDFS latency.
  - *Fix*:
    - For S3: Enable fast upload and multipart:
      ```python
      spark.conf.set("spark.hadoop.fs.s3a.fast.upload", "true")
      ```
    - For JDBC: Increase fetch size:
      ```python
      df = spark.read.jdbc(url, table, properties={"fetchSize": "5000"})
      ```
- **Network Issues**: Latency between executors/driver.
  - *Fix*: Use placement groups (AWS/Azure) or monitor network throughput.

---

 **5. Tune Spark Configs**
Adjust these key parameters based on workload:
| **Config**                              | **Purpose**                                  | **Recommended Value**          |
|-----------------------------------------|---------------------------------------------|--------------------------------|
| `spark.sql.shuffle.partitions`          | Reduce excessive shuffle partitions.        | `2-4x cores` (e.g., 200)      |
| `spark.executor.memoryOverhead`         | Prevent OOMs in off-heap.                  | `1-2GB` (scale with heap)      |
| `spark.dynamicAllocation.enabled`       | Scale executors dynamically.               | `true` (for variable loads)    |
| `spark.speculation`                     | Mitigate slow tasks.                       | `true` (for spot instances)    |

---

 **6. Log and Profile**
- **Enable Detailed Logging**:
  ```python
  spark.sparkContext.setLogLevel("DEBUG")  # Or "INFO" for less noise
  ```
- **Profile with JVM Tools**:
  - Attach `jstack` or `VisualVM` to executors to detect thread contention.

---

 **7. Example Debug Workflow**
```python
# Step 1: Check skew
df.groupBy("user_id").count().orderBy("count").show()

# Step 2: Tune shuffle
spark.conf.set("spark.sql.shuffle.partitions", "200")

# Step 3: Enable speculation
spark.conf.set("spark.speculation", "true")

# Step 4: Monitor with Spark UI
df.join(...).write.parquet("output_path")
```

---

 **Key Takeaways**
1. **Start with Spark UI**: Identify skewed stages, spills, or GC issues.
2. **Verify Cluster Health**: Check CPU/memory/network saturation.
3. **Audit Data**: Detect skew or inefficient queries.
4. **Tune Configs**: Adjust shuffle, memory, and parallelism.
5. **Isolate Variables**: Test with smaller data to reproduce intermittency.

**Pro Tip**: For recurring jobs, log metrics (e.g., task duration distributions) to correlate slowness with external factors (e.g., peak cluster usage times).
