# 1. SparkSession: The Entry Point to Spark

SparkSession serves as the unified entry point to all Spark functionality in Spark 2.0 and later. It encapsulates SparkContext, SQLContext, HiveContext, and StreamingContext into a single interface, making Spark programming more streamlined and intuitive.

The entry point to Spark is the SparkSession. It allows access to all Spark features from PySpark. In PySpark, the **`SparkSession`** is part of the **Structured API** (which includes DataFrames and SQL-like operations). That API lives in the `pyspark.sql` module — even though you're not necessarily writing SQL.

```
pyspark/
├── sql/
│   ├── SparkSession.py     <-- defines SparkSession class
│   ├── DataFrame.py        <-- DataFrame operations
│   └── functions.py        <-- for withColumn, col, etc.
├── rdd/
│   └── RDD.py              <-- lower-level API
```

The PySpark API is organized into key modules:
* `pyspark.sql`: includes all higher-level DataFrame and SQL APIs
* `pyspark.rdd`: older, low-level RDD API

In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
      .appName("SafeBind")
      .master("local[*]")
      # Bind and advertise on loop‑back only
      .config("spark.driver.bindAddress", "127.0.0.1")
      .config("spark.driver.host", "127.0.0.1")
      .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/07 18:37:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# 2. SparkSession Architecture

## 2.1 Component Hierarchy

SparkSession sits at the top of the Spark architectural hierarchy:

```
SparkSession
    ├── SparkContext
    │   └── Scheduler
    │       ├── DAGScheduler
    │       └── TaskScheduler
    ├── SQLContext
    │   └── Catalyst Optimizer
    └── StreamingContext (optional)
```


## 2.2 Core Components

### 2.2.1 What's Inside a SparkSession?

Internally, SparkSession wraps:
* A SparkContext (the original core interface to the cluster)
* A SQLContext (for DataFrames and SQL)
* A HiveContext (if working with Hive)

In [2]:
# Access underlying SparkContext
sc = spark.sparkContext

# View SparkContext details
print(f"Master: {sc.master}")
print(f"App Name: {sc.appName}")
print(f"Spark Version: {sc.version}")

Master: local[*]
App Name: SafeBind
Spark Version: 3.5.5


In [3]:
spark.sparkContext

### 2.2.2 SQLContext

The SQLContext handles all SQL and DataFrame operations:

In [4]:
# Create a sample DataFrame
data = [("John", 30), ("Alice", 25), ("Bob", 45)]
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)

# Register as temporary view to use with SQL
df.createOrReplaceTempView("my_table")

# Now we can use SQL
result = spark.sql("SELECT * FROM my_table WHERE age > 25")
result.show()

                                                                                

+----+---+
|name|age|
+----+---+
|John| 30|
| Bob| 45|
+----+---+



### 2.2.3 Catalog
The Catalog interface provides access to tables, databases, and functions:

In [5]:
# Register as a temporary view
df.createOrReplaceTempView("people")

# Now we can list tables (will show "people")
tables = spark.catalog.listTables()
print("Tables:")
for table in tables:
    print(f"  - {table.name} (isTemporary: {table.isTemporary})")

# List all databases (at minimum will have "default")
databases = spark.catalog.listDatabases()
print("\nDatabases:")
for db in databases:
    print(f"  - {db.name}")

# List all functions (system and user-defined)
functions = spark.catalog.listFunctions()
print("\nFunctions (first 5):")
for i, function in enumerate(functions[:5]):
    print(f"  - {function.name} (isSystem: {function.isTemporary})")

Tables:
  - my_table (isTemporary: True)
  - people (isTemporary: True)

Databases:
  - default

Functions (first 5):
  - ! (isSystem: True)
  - != (isSystem: True)
  - % (isSystem: True)
  - & (isSystem: True)
  - * (isSystem: True)


# 3. SparkSession Creation & Configuration
SparkSession uses the builder pattern for flexible configuration:

## 3.1 Builder Pattern

In [6]:
# stopping Older Session
spark.stop()

# create new Spark Session
spark = SparkSession.builder \
    .appName("MyApp") \
    .master("local[*]") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.driver.host", "127.0.0.1") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

### 3.1.1 CPU Parallelism Configuration

When setting the master URL with `local[n]`, you control the CPU parallelism:

In [7]:
# Use 4 threads
spark = SparkSession.builder.master("local[4]").getOrCreate()

# Use all available cores
spark = SparkSession.builder.master("local[*]").getOrCreate()

25/05/07 18:38:21 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
25/05/07 18:38:21 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [8]:
# Get Java Set of executor memory keys
java_set = spark.sparkContext._jsc.sc().getExecutorMemoryStatus().keySet()

# Use Java iterator to safely loop through elements
hosts_iter = java_set.iterator()
hosts = []

while hosts_iter.hasNext():
    hosts.append(str(hosts_iter.next()))

cores = spark.sparkContext.defaultParallelism

# Display results
print("Executors:", len(hosts))
print("Executor Hosts:", hosts)
print("Default Parallelism (cores):", cores)

Executors: 1
Executor Hosts: ['127.0.0.1:39725']
Default Parallelism (cores): 2


In [9]:
spark.stop()

# Use all available cores, run the job with N worker threads in the JVM
spark = SparkSession.builder \
    .appName("ParallelTest") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.driver.host", "127.0.0.1") \
    .master("local[*]") \
    .getOrCreate()

Spark doesn’t verify how many CPU cores you really have. If your machine has fewer than 256 cores, the extra threads just time‑share the available CPUs. If you have, say, 8 cores, the operating system schedules those 256 Spark threads onto the 8 cores—so they run in small slices. Performance usually does not improve beyond the real core count; in fact, too many threads can add overhead.

In [10]:
# Get Java Set of executor memory keys
java_set = spark.sparkContext._jsc.sc().getExecutorMemoryStatus().keySet()

# Use Java iterator to safely loop through elements
hosts_iter = java_set.iterator()
hosts = []

while hosts_iter.hasNext():
    hosts.append(str(hosts_iter.next()))

cores = spark.sparkContext.defaultParallelism

# Display results
print("Executors:", len(hosts))
print("Executor Hosts:", hosts)
print("Default Parallelism (cores):", cores)

Executors: 1
Executor Hosts: ['127.0.0.1:38601']
Default Parallelism (cores): 2


**Increasing CPU Parallelism in Spark**

Pros:
- **Faster execution** — more tasks run simultaneously
- **Better hardware use** — maximizes CPU utilization
- **Handles larger data** — improved performance on big workloads
- **Closer to real clusters** — mimics distributed behavior in dev

Cons:
- **Higher memory use** — more threads = more memory pressure
- **Wasteful for small jobs** — extra overhead with little gain
- **Diminishing returns** — beyond a point, no added benefit
- **Can affect other apps** — may slow down your system

Use `local[*]` to automatically match the number of available cores.

## 3.2 Configuration Sources

SparkSession configurations can come from multiple sources (in order of precedence):
1. Programmatic configuration in code
2. Command line options
3. Properties file (spark-defaults.conf)
4. Environment variables

## 3.3 Hive Integration

Enable Hive support to use Hive features:

In [11]:
spark.stop() 

spark = SparkSession.builder \
    .appName("Hive Integration Example") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.driver.host", "127.0.0.1") \
    .enableHiveSupport() \
    .getOrCreate()

# 4. SparkSession Deep Internals


## 4.1 Session State Management

SparkSession maintains session state including:
- Temporary views
- UDFs registered in the session
- In-memory catalog state
- Session-level configuration

In [12]:
# Session-specific configurations
spark.conf.set("spark.sql.shuffle.partitions", 100)
current_value = spark.conf.get("spark.sql.shuffle.partitions")

## 4.2 Lazy Evaluation

SparkSession operations follow Spark's lazy evaluation pattern:

In [13]:
# Create a DataFrame directly without needing a file
data = [("Alice", 25), ("Bob", 35), ("Charlie", 45), ("Diana", 28), ("Edward", 50)]
columns = ["name", "age"]

# This creates a logical plan, but no execution yet
df = spark.createDataFrame(data, columns)
print("DataFrame created - no computation yet")

# This adds to the logical plan, still no execution
df_filtered = df.filter(df.age > 30)
print("Filter applied - still no computation")

# Let's add another transformation
df_names = df_filtered.select("name")
print("Selection applied - still no computation")

# Action triggers execution of the entire plan
result_count = df_filtered.count()
print(f"Action triggered - Count result: {result_count}")

# Another action will execute its plan separately
names_list = df_names.collect()
print(f"Second action triggered - Names collected: {names_list}")

DataFrame created - no computation yet
Filter applied - still no computation
Selection applied - still no computation


                                                                                

Action triggered - Count result: 3




Second action triggered - Names collected: [Row(name='Bob'), Row(name='Charlie'), Row(name='Edward')]


                                                                                

This demonstrates how Spark builds a logical execution plan through transformations (creating DataFrame, filter, select) but doesn't execute anything until an action (count, collect) is called.

## 4.3 Resource Management
SparkSession coordinates with the cluster manager (YARN, Kubernetes, etc.) through SparkContext:

In [14]:
# Access SparkContext
sc = spark.sparkContext

# View current resource allocation
print(f"Default Parallelism: {sc.defaultParallelism}")
print(f"Default Minimum Partitions for RDDs: {sc.defaultMinPartitions}")

# Alternative way to get default parallelism
parallelism = spark.conf.get("spark.default.parallelism", "not set")
print(f"Default Parallelism from conf: {parallelism}")

# Other resource configurations
executor_memory = spark.conf.get("spark.executor.memory", "1g")
executor_cores = spark.conf.get("spark.executor.cores", "not set")

print(f"Executor Memory: {executor_memory}")
print(f"Executor Cores: {executor_cores}")

Default Parallelism: 2
Default Minimum Partitions for RDDs: 2
Default Parallelism from conf: not set
Executor Memory: 1g
Executor Cores: not set


Note: `defaultMinPartitions` isn't actually an attribute of SparkContext - it's used here to demonstrate a concept. In practice, you would check specific configurations that control minimum partitions.

# 5. SparkSession Tuning & Optimization

## 5.1 Memory Configuration
The code explicitly tunes Spark’s JVM heap:

* `spark.driver.memory "4g"` → allocates 4 GB to the driver (the “head‑chef” process that plans jobs).
* `spark.executor.memory "8g"` → gives each executor (worker) 8 GB for task execution.
* `spark.memory.fraction 0.8` → lets tasks use up to 80 % of that heap for execution and caching.
* `spark.memory.storageFraction 0.5` → reserves half of that 80 % for cached data (the rest is for shuffle and computation).


In [15]:
# Memory Configuration
spark = SparkSession.builder \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.driver.host", "127.0.0.1") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.memory.storageFraction", "0.5") \
    .getOrCreate()

25/05/07 18:38:31 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## 5.2 Execution Tuning

* **`spark.sql.shuffle.partitions = 200`**
  Sets the default number of output partitions after wide operations (e.g., `groupBy`, `join`). More partitions mean smaller individual tasks but more scheduling overhead; tune this to match cluster size (e.g., cores × 2–3).

* **`spark.sql.autoBroadcastJoinThreshold = "10m"`**
  Any table ≤ 10 MB is automatically broadcast to all executors, turning large‑small joins into fast map‑side joins and avoiding costly shuffles.

* **`spark.sql.adaptive.enabled = true`**
  Activates Adaptive Query Execution (AQE): Spark can change join strategies, shuffle partition counts, and skew handling at run‑time based on actual data statistics, often yielding better performance without manual tuning.


In [16]:
# SQL operations tuning
spark.conf.set("spark.sql.shuffle.partitions", 200)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10m")
spark.conf.set("spark.sql.adaptive.enabled", "true")

## 5.3 I/O Optimization
* **`spark.sql.files.maxPartitionBytes = "128m"`**
  When Spark scans files (CSV/Parquet/JSON), it splits them into input partitions no larger than 128 MB. This balances work per task: big enough to keep each task busy, but small enough to create parallelism and avoid memory blow‑ups.

* **`spark.sql.files.openCostInBytes = "4m"`**
  A cost model hint: Spark treats opening a new file as equivalent to reading 4 MB of data. Raising or lowering this value influences how Spark coalesces tiny files into larger splits, helping mitigate the “small‑files problem.”

* **`spark.sql.parquet.compression.codec = "snappy"`**
  Uses Snappy compression for Parquet output. Snappy offers a good trade‑off: fast compression/decompression with moderate space savings, resulting in quicker I/O compared to heavier codecs like Gzip.


In [17]:
# I/O tuning
spark.conf.set("spark.sql.files.maxPartitionBytes", "128m")
spark.conf.set("spark.sql.files.openCostInBytes", "4m")
spark.conf.set("spark.sql.parquet.compression.codec", "snappy")

## 5.4 Dynamic Allocation
* **`spark.dynamicAllocation.enabled = true`**
  Turns on **dynamic executor allocation**: Spark automatically adds or removes executors at run‑time based on the workload’s current demand.

* **`spark.dynamicAllocation.minExecutors = 2`**
  Spark will never go below 2 executors, ensuring a baseline level of parallelism even during idle periods.

* **`spark.dynamicAllocation.maxExecutors = 20`**
  Caps the pool at 20 executors, preventing runaway cluster usage and keeping costs predictable.

* **`spark.shuffle.service.enabled = true`**
  Enables the external shuffle service; required for dynamic allocation so that shuffle files remain accessible when executors are removed and later tasks still need those files.

Together, these settings let Spark flex between 2 and 20 executors, scaling up during peaks and shrinking when the job is less busy—while keeping shuffle data safe during the ups‑and‑downs.


In [18]:
spark.stop()

spark = SparkSession.builder \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.driver.host", "127.0.0.1") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", 2) \
    .config("spark.dynamicAllocation.maxExecutors", 20) \
    .config("spark.shuffle.service.enabled", "true") \
    .getOrCreate()

## 5.5 Serialization Options
* **`spark.serializer = "org.apache.spark.serializer.KryoSerializer"`**
  Switches Spark from the default Java serializer to **Kryo**, which is far more compact and faster to (de)serialize. This reduces network traffic and memory footprint, improving shuffle and cache performance.

* **`spark.kryo.registrationRequired = false`**
  Lets Spark serialize any class on‑the‑fly without pre‑registering it. Keeping this `false` is convenient (no manual class list), but registering frequently‑used custom classes can squeeze out even more speed and space efficiency.


In [19]:
# Use Kryo serialization for better performance
spark = SparkSession.builder \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.driver.host", "127.0.0.1") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryo.registrationRequired", "false") \
    .getOrCreate()

25/05/07 18:38:33 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
