# SkewBalancer: Auto Fix Spark Skew
This notebook helps you detect **skewed data** and fix it automatically using a tool called `SkewBalancer`.

If you're new to programming, don't worry!
We’ll guide you step by step

# Step 1: Install Required Packages

This notebook uses **PySpark**, **Pandas**, **Matplotlib**, and **Seaborn**.
Run this cell once to make sure everything is installed:


In [None]:
# Install required Python packages (only if not already installed)
!pip install pyspark pandas matplotlib seaborn --quiet

# Step 2: Import Libraries

We now import the Python libraries we'll use, including Spark and the `ValueSkewBalancer` class.


In [None]:
from pyspark.sql import SparkSession
from skewbalancer import *

# Step 3: Start a Spark Session

This step launches a local Spark instance to process your data.

In [None]:
# ----------------------------
# Spark Session Configuration
# ----------------------------

# Basic Settings
spark_app_name: str = "SkewBalancer_Test"
spark_master: str = "local[*]"                      # Use all local cores
spark_driver_memory: str = "4g"                     # Driver memory limit
spark_executor_memory: str = "4g"                   # Executor memory (mostly ignored in local mode)
spark_log_level: str = "WARN"                       # Reduce log noise: INFO, WARN, ERROR

# Performance Tuners
spark_configs: dict = {
    "spark.sql.shuffle.partitions": "8",            # Default number of shuffle partitions
    "spark.default.parallelism": "8",                # Used in RDD operations
    "spark.sql.adaptive.enabled": "true",           # Enable Adaptive Query Execution (AQE)
    "spark.sql.adaptive.coalescePartitions.enabled": "true",  # Coalesce after AQE
    "spark.sql.broadcastTimeout": "120",            # Broadcast timeout in seconds
    "spark.serializer": "org.apache.spark.serializer.KryoSerializer",  # Fast serialization
    "spark.sql.codegen.wholeStage": "true",         # Enable full query optimization
}

# ----------------------------
# Build Spark Session
# ----------------------------
spark_builder = SparkSession.builder.appName(spark_app_name).master(spark_master)

# Apply memory settings
spark_builder = spark_builder.config("spark.driver.memory", spark_driver_memory)
spark_builder = spark_builder.config("spark.executor.memory", spark_executor_memory)

# Apply advanced configs
for k, v in spark_configs.items():
    spark_builder = spark_builder.config(k, v)

# Create Spark session
spark = spark_builder.getOrCreate()

# Set log level
spark.sparkContext.setLogLevel(spark_log_level)

# Confirm Spark details
print(f"Spark session started (v{spark.version}) | Driver Memory: {spark_driver_memory} | Executor Memor: {spark_executor_memory}")

# Step 4: Load Your Data (Any File Type)

`SkewBalancer` supports many file types:
 - `.csv` — comma-separated values
 - `.json` — structured JSON records
 - `.parquet` — fast compressed format
 - `.delta` — advanced format for big data (if Delta Lake is installed)
 
Set your file path and file type below.
We'll automatically load it into a Spark DataFrame!

In [27]:
# Replace this path with your own CSV
input_file = r"C:\Users\OmarAttia\IdeaProjects\Spark\SkewBalancer\data\realtime_garbage_100k.csv"
file_type = "csv"                 # Options: "csv", "json", "parquet", "delta"

# Step 5: Use Smart Schema Detection
Instead of relying on `inferSchema=True`, we now use `.schemaVisor()` — a smart schema detection method that:

- Samples a small portion of the data (~5K records)
- Analyzes string columns, numeric-like strings, and missing values
- Generates an optimized schema using Spark types (`IntegerType`, `DoubleType`, `StringType`)
- Prevents over-allocation of memory and speeds up Spark's I/O performance

This makes your pipeline more scalable, especially for large, messy, or semi-structured datasets.

In [28]:
# Step 5: Use schemaVisor to generate a schema from sampled data
df_sample = spark.read.option("header", "true").option("inferSchema", "false").csv(input_file)
schema = ValueSkewBalancer.schemaVisor(df_sample)

## (OPTIONAL) Step 5.1: Auto-Detect Primary or Composite Key
After determining the optimal schema with `schemaVisor()`, we now identify a suitable **Primary Key** (or **Composite Key**) using `.detectKey()`.

This helps:
- Enforce schema constraints
- Optimize `join` performance
- Enable `deduplication` and `identity logic`

The method:
- Analyzes `nulls` and `uniqueness` across all columns
- Picks the most confident column as a **primary key**
- If no strong single-column key exists, it tries **column combinations** (composite keys)
- Returns `type`, `columns`, and `confidence score` **(Remember that is all algorithm. You could skip this step)**

In [29]:
# Detect Primary or Composite Key
key_result = ValueSkewBalancer.detectKey(df_sample, max_composite=3, verbose=True)

if key_result:
    print(f"\nDetected {key_result['type']} key:")
    print(f"Columns: {key_result['columns']}")
    print(f"Confidence: {key_result['confidence'] * 100:.2f}%")
else:
    print("No reliable primary or composite key found.")

customer_id: distinct=100000, nulls=0, score=1.0000
event_type: distinct=4, nulls=0, score=0.0000
value: distinct=8940, nulls=0, score=0.0894
description: distinct=5, nulls=0, score=0.0001
event_time: distinct=100000, nulls=0, score=1.0000

Detected primary key:
Columns: ['customer_id']
Confidence: 100.00%


## Step 6: Read Data Using Smart Schema

We now use the schema returned by `schemaVisor()` to load your data.

`inferSchema` is now turned off — we trust our smarter logic


In [30]:
if file_type == "csv":
    df = spark.read.option("header", "true").schema(schema).csv(input_file)
elif file_type == "json":
    df = spark.read.option("multiline", "true").schema(schema).json(input_file)
elif file_type == "parquet":
    df = spark.read.schema(schema).parquet(input_file)
elif file_type == "delta":
    df = spark.read.format("delta").schema(schema).load(input_file)
else:
    raise ValueError(f"Unsupported file type: {file_type}")

> **TIP**
> Not sure which file type you have?
> - `.csv` is usually text with commas
> - `.json` is readable by web browsers
> - `.parquet` is often used in cloud warehouses (like AWS, Azure)
> - `.delta` is for Spark projects with **Delta Lake** enabled


## Step 7: Preview Your Data

Let’s check what columns are inside your file.

In [31]:
# Show schema and preview
df.printSchema()
df.show(5)

root
 |-- customer_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- value: double (nullable = true)
 |-- description: string (nullable = true)
 |-- event_time: string (nullable = true)

+--------------------+----------+-----+-----------+--------------------+
|         customer_id|event_type|value|description|          event_time|
+--------------------+----------+-----+-----------+--------------------+
|f2c41853-0d67-420...|      view| 9.39|    ignored|2025-06-28 21:07:...|
|        Y47MY2XGL79V|    signup| 60.2|  important|2025-06-19 12:22:...|
|c7384f76-021d-463...|      view|26.33|       spam|2025-06-15 02:28:...|
|    1660d54a24bc451f|  purchase|18.26|  important|2025-07-01 12:03:...|
|        3X92GM6KPU6H|     click| 3.39|     normal|2025-07-03 14:39:...|
+--------------------+----------+-----+-----------+--------------------+
only showing top 5 rows


## Step 8: Automatically Detect Skew and Balance Your Data

Now we let `SkewBalancer`:
- Detect the most skewed numeric column
- Pick the best group-by column (usually string column with low unique values)
- Apply salting to split skewed values into smaller "chunks"
- Repartition the data for faster Spark performance

You can change:
- `output_dir`: where charts and logs are saved
- `partitions`: number of output partitions (default = 8)
- `verbose`: if `True`, will print detailed info for learning


In [32]:
# Automatically detect skew and optimize the DataFrame
output_dir = "outputs"        # Directory to save plots and explain logs
num_partitions = 8            # Number of partitions after salting
verbose_mode = True           # Show detailed progress and stats

df_balanced = auto_balance_skew(
    df,
    output_dir=output_dir,
    partitions=num_partitions,
    verbose=verbose_mode
)

value: skewness = 0.26
[AUTO-DETECTED] Skewed column = value, GroupBy = event_type
 Thank you for using this tool. - Omar Attia+----------+-----+
|event_type|count|
+----------+-----+
|    signup|25099|
|  purchase|24809|
|      view|25151|
|     click|24941|
+----------+-----+

[Before_Salting] Time: 0.145 sec
+----------+-----+
|event_type|count|
+----------+-----+
|  purchase|24809|
|    signup|25099|
|      view|25151|
|     click|24941|
+----------+-----+

[After_Salting] Time: 0.367 sec

[Before_Salting] Partition sizes:
  Partition 0: 60461 records
  Partition 1: 39539 records

[After_Salting] Partition sizes:
  Partition 0: 9902 records
  Partition 1: 9762 records
  Partition 2: 9460 records
  Partition 3: 9952 records
  Partition 4: 10764 records
  Partition 5: 9336 records
  Partition 6: 10477 records
  Partition 7: 10927 records
  Partition 8: 9922 records
  Partition 9: 9498 records


## Step 9: Check the Output and Partitions

Now let’s preview:
- A few rows from the new column `salted_key` (which Spark uses to balance the data)
- The number of records inside each Spark partition

This helps you **verify** that the skew was reduced and data is now more balanced across workers.


In [33]:
# Detect the salted column name dynamically
salted_col = [col for col in df_balanced.columns if "salted" in col.lower()][0]

# Show some of the salted values
df_balanced.select(salted_col).show(5)

# Show partition distribution
ValueSkewBalancer.show_partition_sizes(df_balanced, label="Salted Result")

+----------+
|salted_key|
+----------+
|   18.26_4|
|     3.0_1|
|   30.76_6|
|   23.06_5|
|   13.67_3|
+----------+
only showing top 5 rows

[Salted Result] Partition sizes:
  Partition 0: 9902 records
  Partition 1: 9762 records
  Partition 2: 9460 records
  Partition 3: 9952 records
  Partition 4: 10764 records
  Partition 5: 9336 records
  Partition 6: 10477 records
  Partition 7: 10927 records
  Partition 8: 9922 records
  Partition 9: 9498 records


## Step 10: Compare Original vs Optimized Performance

Let’s now compare:
- The **original plan** before salting
- The **optimized plan** after salting

We:
- Group the data by a selected **categorical column** (usually a string like "type")
- Time how long `.groupBy().count()` takes
- Log the physical execution plan using `.explain()`

This shows how `SkewBalancer` changes how Spark works under the hood!


In [35]:
# Define or auto-detect the groupBy column (usually low-cardinality string)
if "groupby_col" not in locals():
    groupby_col = ValueSkewBalancer.detect_low_cardinality_categorical(df)

# Define output log paths
o_log_output_dir = f"outputs/logs/original-plan.txt"
s_log_output_dir = f"outputs/logs/salted-plan.txt"

# Compare groupBy performance before vs after
print(f"[Original Plan using '{groupby_col}']")
ValueSkewBalancer.timeit(lambda: df.groupBy(groupby_col).count().show(), label="Original Count")
ValueSkewBalancer.log_explain(df.groupBy(groupby_col).count(), o_log_output_dir)

print(f"[Salted Plan using '{groupby_col}']")
ValueSkewBalancer.timeit(lambda: df_balanced.groupBy(groupby_col).count().show(), label="Salted Count")
ValueSkewBalancer.log_explain(df_balanced.groupBy(groupby_col).count(), s_log_output_dir)


{"ts": "2025-07-07 13:48:13.072", "level": "ERROR", "logger": "DataFrameQueryContextLogger", "msg": "[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `type` cannot be resolved. Did you mean one of the following? [`value`, `event_type`, `event_time`, `customer_id`, `description`]. SQLSTATE: 42703", "context": {"file": "jdk.internal.reflect.GeneratedMethodAccessor46.invoke(Unknown Source)", "line": "", "fragment": "col", "errorClass": "UNRESOLVED_COLUMN.WITH_SUGGESTION"}, "exception": {"class": "Py4JJavaError", "msg": "An error occurred while calling o1665.count.\n: org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `type` cannot be resolved. Did you mean one of the following? [`value`, `event_type`, `event_time`, `customer_id`, `description`]. SQLSTATE: 42703;\n'Aggregate ['type], ['type, count(1) AS count#1569L]\n+- Relation [customer_id#1334,event_type#1335,value#1336,de

[Original Plan using 'type']


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `type` cannot be resolved. Did you mean one of the following? [`value`, `event_type`, `event_time`, `customer_id`, `description`]. SQLSTATE: 42703;
'Aggregate ['type], ['type, count(1) AS count#1569L]
+- Relation [customer_id#1334,event_type#1335,value#1336,description#1337,event_time#1338] csv


## Step 11: Visualize the Skew Fix

Let’s look at the visual reports generated by SkewBalancer:

- **Z-Score Chart**: Shows how extreme the values are before vs. after salting
- **Box Plot**: Displays how data is spread (min, max, median, outliers)
- **Histogram**: Compares the distribution of values before and after fixing skew

These help you understand what *skew* actually looks like and how it was balanced!


In [None]:
from IPython.display import Image, display
import os

# Try to detect the skewed column used for salting
try:
    skewed_column = df_balanced.columns[-1].split("_")[0]
except Exception:
    skewed_column = "unknown"

# Display visualizations if they exist
plot_types = ["z_score_comparison", "box_plot", "histogram"]
for plot_type in plot_types:
    plot_path = f"outputs/{plot_type}_{skewed_column}.png"
    if os.path.exists(plot_path):
        display(Image(filename=plot_path))
    else:
        print(f"[Warning] Plot not found: {plot_path}")


> 🧠 **What does this mean?**
> The Z-Score chart helps you spot "extreme" values.
> If most bars stay within the blue/red lines, your data is less skewed now!


# Step 12: Stop Spark Session

Always clean up when done.


In [None]:
spark.stop()