In [1]:
# --- 1) Point PySpark to the exact Python you're running right now ---
import os, sys

py = sys.executable  # e.g., C:\Users\PX\anaconda3\envs\music-chatbot\python.exe
os.environ["PYSPARK_DRIVER_PYTHON"] = py
os.environ["PYSPARK_PYTHON"] = py

# --- 2) Stop any existing Spark session cleanly (if already created) ---
try:
    spark.stop()
except Exception:
    pass

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("PySpark-Windows-Fix")
    # Ensures executors pick the same Python as the driver
    .config("spark.pyspark.driver.python", py)
    .config("spark.pyspark.python", py)
    # Optional: Arrow speeds up Pandas ↔ Spark conversions if available
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")
    .getOrCreate()
)

# 🔥 What is Apache Spark?

Apache **Spark** is an **open-source, distributed computing framework** designed for **big data processing** and **analytics**.  
It allows you to process **large datasets** in a **cluster of machines** (or locally) using **parallel computing** — much faster than traditional tools like Hadoop MapReduce.

---

## ⚡ Key Features
1. **Speed**
   - Uses in-memory computation (RDD caching) → much faster than Hadoop MapReduce (up to 100x).
2. **Ease of Use**
   - APIs available in Python (PySpark), Scala, Java, and SQL.
3. **Versatility**
   - Handles **batch processing**, **streaming**, **machine learning** (MLlib), and **graph processing** (GraphX).
4. **Scalability**
   - Runs on a laptop, a cluster of machines, or in the cloud (AWS EMR, Databricks, etc.).

---

## 🧠 High-Level Architecture
Spark has **two main parts**:

- **Driver Program**
  - The "brain" of Spark.
  - Creates the **SparkSession**, builds the DAG (Directed Acyclic Graph), and coordinates tasks.

- **Cluster Manager + Executors**
  - Cluster Manager assigns resources (CPU, memory).
  - Executors run the actual tasks on worker nodes.

---

In [2]:
data = [("Alice", 29), ("Bob", 34), ("Catherine", 25)]
df = spark.createDataFrame(data, ["Name", "Age"])
df.show()

print("\nInterpreter:", py)
print("Spark version:", spark.version)
import pyspark
print("PySpark version:", pyspark.__version__)


+---------+---+
|     Name|Age|
+---------+---+
|    Alice| 29|
|      Bob| 34|
|Catherine| 25|
+---------+---+


Interpreter: c:\Users\PX\anaconda3\envs\music-chatbot\python.exe
Spark version: 4.0.1
PySpark version: 4.0.1


# 🏗️ Spark Architecture (Driver, Executors, DAG)

Spark follows a **master/worker architecture**.  
Understanding this is crucial for interviews — it shows you know **how Spark runs your code under the hood**.

---

## 🧠 Key Components

### 1️⃣ **Driver Program**
- The "brain" of a Spark application.
- Runs in your process (where you call `SparkSession.builder`).
- Responsibilities:
  - Creates `SparkContext` (or `SparkSession` in modern API).
  - Converts your code into a **DAG** (Directed Acyclic Graph).
  - Splits the DAG into **stages**.
  - Schedules tasks to run on **executors**.
  - Collects results and sends them back to you.

---

### 2️⃣ **Cluster Manager**
- Allocates resources (CPU, memory) to Spark.
- Can be:
  - **Standalone** (built-in Spark manager)
  - **YARN** (Hadoop cluster manager)
  - **Mesos** or **Kubernetes**

---

### 3️⃣ **Executors**
- Worker processes running on cluster nodes.
- Each executor:
  - Runs **tasks** assigned by the driver.
  - Stores **cached data** in memory.
- Executors live for the lifetime of your application.

---

## 🗺️ DAG, Stages, and Tasks

1. **DAG (Directed Acyclic Graph)**
   - Logical plan of all transformations you wrote (`map`, `filter`, `select`).
   - Shows *what to do*, not how.

2. **Stages**
   - DAG is split into **stages** at shuffle boundaries (when data needs to move across the cluster).
   - Each stage is a set of parallel tasks.

3. **Tasks**
   - Smallest unit of work in Spark.
   - Each task is sent to an executor core.
   - Runs on a **partition** of the data.

---

## 🖼️ Visual Overview

Driver ➜ Cluster Manager ➜ Executors

In [5]:
# Inspect Spark environment to see driver, executor configs, and cluster manager info
print("=== Spark Configuration ===")
for k, v in spark.sparkContext.getConf().getAll():
    print(f"{k} = {v}")

print("\nDriver host:", spark.sparkContext.uiWebUrl or "No UI available")
print("Master:", spark.sparkContext.master)
print("Application ID:", spark.sparkContext.applicationId)
print("Default parallelism (tasks per stage):", spark.sparkContext.defaultParallelism)


=== Spark Configuration ===
spark.rdd.compress = True
spark.hadoop.fs.s3a.vectored.read.min.seek.size = 128K
spark.pyspark.python = c:\Users\PX\anaconda3\envs\music-chatbot\python.exe
spark.pyspark.driver.python = c:\Users\PX\anaconda3\envs\music-chatbot\python.exe
spark.executor.extraJavaOptions = -Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-modules=jdk.incubator.vector --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.b

# ⚙️ Spark Configuration (Explained)

Below is a quick explanation of the key properties you saw:

- **spark.rdd.compress** → Enables compression of serialized RDD partitions (saves memory, improves performance).
- **spark.hadoop.fs.s3a.vectored.read.min.seek.size** → Minimum seek size for vectored reads from S3 (optimizes remote reads).
- **spark.pyspark.python** → Path to Python executable used by executors.
- **spark.pyspark.driver.python** → Path to Python executable used by driver program.
- **spark.executor.extraJavaOptions** → Extra JVM options passed to executor processes (e.g., module opens, system properties).
- **spark.sql.artifact.isolation.enabled** → Whether to isolate SQL artifacts per session (false = shared globally).
- **spark.master** → Cluster master URL (`local[*]` means run locally using all available CPU cores).
- **spark.sql.execution.arrow.pyspark.enabled** → Enables Apache Arrow for faster Spark ↔ Pandas conversion.
- **spark.sql.warehouse.dir** → Default directory for Spark SQL managed tables (warehouse).
- **spark.driver.host** → Hostname of the machine running the driver program.
- **spark.executor.id** → ID of the executor running this session (`driver` means no cluster executors, local mode).
- **spark.submit.pyFiles** → List of `.zip`/`.egg`/`.py` files sent with job (empty here).
- **spark.driver.extraJavaOptions** → Extra JVM options passed to the driver process.
- **spark.hadoop.fs.s3a.vectored.read.max.merged.size** → Max size of merged S3 vectored reads (controls I/O efficiency).
- **spark.app.startTime** → Application start timestamp (epoch ms).
- **spark.submit.deployMode** → Deployment mode (`client` = driver runs locally, `cluster` = driver runs on cluster).
- **spark.app.id** → Unique application ID for this Spark job.
- **spark.app.submitTime** → Timestamp when the app was submitted.
- **spark.serializer.objectStreamReset** → Resets object output stream after this many objects (avoids memory leak).
- **spark.driver.port** → Port used by the driver to communicate with executors.
- **spark.app.name** → Name of the application (set in `SparkSession.builder.appName()`).
- **spark.ui.showConsoleProgress** → Whether to show progress bars in console for running stages.
- **Driver host (UI)** → URL for Spark Web UI (shows jobs, stages, storage).
- **Master** → The cluster master Spark is connected to (`local[*]` means no cluster, run locally).
- **Application ID** → Unique identifier of this running application (useful for logs).
- **Default parallelism** → Number of tasks per stage (usually equals number of cores available).


# 🚀 Benefits of Apache Spark

Apache Spark became the most popular big-data engine because of its **speed**, **ease of use**, and **versatility**.  
Here are the key benefits you should know for interviews:

---

## ⚡ 1. Speed
- **In-memory computation**: Avoids writing intermediate results to disk (unlike Hadoop MapReduce).
- Optimized query engine with DAG execution → up to **100x faster** than MapReduce for iterative workloads.

---

## 💻 2. Ease of Use
- High-level APIs available in **Python (PySpark)**, Scala, Java, R, and SQL.
- Provides a **unified interface** for batch processing, streaming, ML, and graph processing.

---

## 🔀 3. Unified Engine
- Same framework handles:
  - **Batch processing** (ETL, transformations)
  - **Streaming** (near real-time data with Structured Streaming)
  - **Machine Learning** (MLlib library)
  - **Graph processing** (GraphX)

---

## 📈 4. Scalability
- Runs on a laptop, a single server, or a large cluster.
- Works with on-prem clusters or cloud platforms (AWS EMR, Databricks, GCP Dataproc, Azure Synapse).

---

## 🛠️ 5. Fault Tolerance
- Uses **RDD lineage** to recompute lost partitions automatically.
- No need for manual recovery — ensures reliability even if executors fail.

---

## 💾 6. Multiple Data Source Support
- Reads from **HDFS, S3, Azure Blob, GCS, Cassandra, Kafka, JDBC, JSON, Parquet, ORC**, etc.
- Makes it easy to build pipelines across heterogeneous data systems.

---


In [8]:
import time
import pandas as pd
from pyspark.sql import functions as F

# --- PySpark Version ---
df_spark = spark.range(0, 1_000_000)

start_spark = time.time()
spark_result = df_spark.filter(df_spark.id % 7 == 0).count()
end_spark = time.time()

print(f"✅ PySpark result: {spark_result}")
print(f"⏱️ PySpark execution time: {end_spark - start_spark:.4f} seconds\n")

# --- Pandas Version ---
df_pandas = pd.DataFrame({"id": range(0, 1_000_000)})

start_pandas = time.time()
pandas_result = (df_pandas["id"] % 7 == 0).sum()
end_pandas = time.time()

print(f"✅ Pandas result: {pandas_result}")
print(f"⏱️ Pandas execution time: {end_pandas - start_pandas:.4f} seconds\n")

✅ PySpark result: 142858
⏱️ PySpark execution time: 0.1981 seconds

✅ Pandas result: 142858
⏱️ Pandas execution time: 0.0190 seconds



# 🐼 Pandas vs 🔥 Spark: Why Pandas Can Be Faster Here

In our test, **Pandas was faster** — and this is actually normal for small data.  
Here’s why:

---

## ⚠️ Spark Overhead
- Spark has to:
  - Spin up a JVM (Java Virtual Machine)
  - Create a Spark driver + executor
  - Serialize/deserialize data between Python and JVM
- This startup overhead can take **hundreds of milliseconds**, which dominates the runtime for small datasets.

---

## 🐼 Pandas Strength
- Pandas runs in a **single Python process**, no cluster overhead.
- For data that **fits in memory** (e.g., < 1–2 GB), Pandas is usually faster for simple operations.

---

## 🔥 When Spark Shines
- When the dataset is **too big for memory** on one machine (10+ GB).
- When you need to **distribute processing** across multiple cores/machines.
- When you need to integrate with **distributed storage** (HDFS, S3, Azure Blob, etc.).
- When running **complex pipelines** where Spark can optimize stages and cache intermediate results.

---

## 🎯 Interview Insight
- **Do not say Spark is "always faster."**  
  A strong candidate will say:
  > "Spark is designed for distributed, large-scale data. For small, local datasets, Pandas is often faster due to lower overhead."

This shows you understand **trade-offs** and choose the right tool for the job.


# 🔥 When PySpark is Faster (and Why)

PySpark usually beats Pandas when:
- Data is **large** (10M+ rows) or **doesn’t fit in RAM** on one machine.
- Workloads involve **wide shuffles** (e.g., `groupBy`, `join`, `distinct`) that benefit from parallelism.
- You can leverage **multiple cores** (or a cluster) to process partitions in parallel.

Below, we benchmark a large **groupBy + join** on up to tens of millions of rows.


In [11]:
import time, os, sys, numpy as np, pandas as pd
from pyspark.sql import SparkSession, functions as F

# Ensure Spark exists and uses this Python
try:
    spark
except NameError:
    py = sys.executable
    os.environ["PYSPARK_DRIVER_PYTHON"] = py
    os.environ["PYSPARK_PYTHON"] = py
    spark = (
        SparkSession.builder
        .appName("Deterministic-Groupby")
        .config("spark.pyspark.driver.python", py)
        .config("spark.pyspark.python", py)
        .getOrCreate()
    )

# Params
N = 100_000_000
K = 1_000
spark.conf.set("spark.sql.shuffle.partitions", 16)

# ---------- Spark: compute using integer sums, then convert ----------
t0 = time.perf_counter()

df_s = (
    spark.range(N)  # id: bigint
    .withColumn("key", (F.col("id") % K).cast("int"))
    # keep integer id for exact sums; we'll divide by 2.0 later
    .withColumn("id_int", F.col("id").cast("long"))
)

lookup_s = spark.range(K).withColumnRenamed("id", "key") \
    .withColumn("weight", (F.col("key") % 5).cast("int"))

joined_s = df_s.join(lookup_s, "key", "inner")

agg_s_int = (
    joined_s.groupBy("key")
    .agg(
        F.count("*").alias("cnt"),
        F.sum("id_int").alias("sum_id"),
        F.avg("weight").alias("avg_weight")
    )
    # Deterministic ordering: first by cnt desc, then by key asc
    .orderBy(F.desc("cnt"), F.asc("key"))
)

# convert to final metrics
agg_s = (
    agg_s_int
    .withColumn("sum_val", F.col("sum_id") * F.lit(0.5))     # exact then scaled
    .withColumn("avg_val", (F.col("sum_id") * F.lit(0.5)) / F.col("cnt"))
    .select("key", "cnt", "sum_val", "avg_val", "avg_weight")
)

spark_top5 = agg_s.limit(5).toPandas()
t1 = time.perf_counter()

print("✅ Spark (deterministic, stable) top-5:")
print(spark_top5)
print(f"⏱️ Spark time: {t1 - t0:.2f} sec\n")

# ---------- Pandas: same approach (integer sums, deterministic sort) ----------
t2 = time.perf_counter()

ids = np.arange(N, dtype=np.int64)
keys = (ids % K).astype(np.int32)
id_int = ids  # exact

df_p = pd.DataFrame({"key": keys, "id_int": id_int})
lookup_p = pd.DataFrame({"key": np.arange(K, dtype=np.int32), "weight": np.arange(K, dtype=np.int32) % 5})

joined_p = df_p.merge(lookup_p, on="key", how="inner")

agg_p_int = (
    joined_p.groupby("key", sort=False)
    .agg(cnt=("key", "size"), sum_id=("id_int", "sum"), avg_weight=("weight", "mean"))
    .reset_index()
)

agg_p_int["sum_val"] = agg_p_int["sum_id"] * 0.5
agg_p_int["avg_val"] = agg_p_int["sum_val"] / agg_p_int["cnt"]

agg_p = agg_p_int.sort_values(["cnt", "key"], ascending=[False, True])[["key","cnt","sum_val","avg_val","avg_weight"]]
pandas_top5 = agg_p.head(5).reset_index(drop=True)

t3 = time.perf_counter()

print("✅ Pandas (deterministic, stable) top-5:")
print(pandas_top5)
print(f"⏱️ Pandas time: {t3 - t2:.2f} sec\n")

# Quick equality check (within tolerance for floats)
print("Columns equal (keys, cnt):", 
      np.array_equal(spark_top5["key"].to_numpy(), pandas_top5["key"].to_numpy()) and 
      np.array_equal(spark_top5["cnt"].to_numpy(), pandas_top5["cnt"].to_numpy()))

✅ Spark (deterministic, stable) top-5:
   key     cnt       sum_val     avg_val  avg_weight
0    0  100000  2.499975e+12  24999750.0         0.0
1    1  100000  2.499975e+12  24999750.5         1.0
2    2  100000  2.499975e+12  24999751.0         2.0
3    3  100000  2.499975e+12  24999751.5         3.0
4    4  100000  2.499975e+12  24999752.0         4.0
⏱️ Spark time: 1.67 sec

✅ Pandas (deterministic, stable) top-5:
   key     cnt       sum_val     avg_val  avg_weight
0    0  100000  2.499975e+12  24999750.0         0.0
1    1  100000  2.499975e+12  24999750.5         1.0
2    2  100000  2.499975e+12  24999751.0         2.0
3    3  100000  2.499975e+12  24999751.5         3.0
4    4  100000  2.499975e+12  24999752.0         4.0
⏱️ Pandas time: 17.67 sec

Columns equal (keys, cnt): True


# 🎯 Key Spark Interview Questions & Answers

---

## ❓ Why is Spark faster than Hadoop MapReduce?

- **In-memory processing**: Spark keeps intermediate results in memory (RAM) instead of writing to disk between each map and reduce step (MapReduce uses disk I/O heavily).
- **DAG Execution**: Spark uses a Directed Acyclic Graph (DAG) to optimize the entire job before execution, minimizing unnecessary data shuffles and recomputations.
- **Optimized engine**: Spark uses pipelining and task scheduling that maximizes CPU utilization across cluster nodes.

**Result:** Spark can be **10x–100x faster** than MapReduce for iterative or interactive workloads.

---

## ❓ What does "in-memory computation" mean?

- Intermediate data from transformations (like `map`, `filter`, `join`) are stored in **RAM** rather than on disk.
- This avoids expensive read/write operations to disk.
- Spark allows explicit **caching/persisting** of data in memory (`df.cache()`), which speeds up repeated operations on the same dataset.

---

## ❓ What are the main benefits of using Spark in a big data pipeline?

1. **Speed** – In-memory processing + DAG optimization = fast analytics.
2. **Ease of Use** – High-level APIs in Python, Scala, Java, SQL.
3. **Unified Engine** – Batch, streaming, ML, graph processing all in one framework.
4. **Scalability** – Runs locally or on clusters with thousands of nodes.
5. **Fault Tolerance** – Automatic recovery from failures using RDD lineage.
6. **Wide Data Source Support** – Reads/writes from HDFS, S3, JDBC, Kafka, Parquet, etc.

---

## ❓ How does Spark achieve fault tolerance?

- Spark's core abstraction, **RDD (Resilient Distributed Dataset)**, keeps track of the **lineage** of transformations used to create it.
- If a partition is lost (e.g., executor crashes), Spark **recomputes only the lost partitions** from their lineage rather than restarting the entire job.
- For cached/persisted RDDs, Spark can replicate partitions across nodes (optional) to avoid recomputation.

**In short:** Spark uses **lineage + recomputation** (and optionally replication) to handle node failures gracefully.
