<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><ul class="toc-item"><li><span><a href="#Terminology" data-toc-modified-id="Terminology-0.1">Terminology</a></span><ul class="toc-item"><li><span><a href="#SparkContext" data-toc-modified-id="SparkContext-0.1.1">SparkContext</a></span></li><li><span><a href="#SparkSession" data-toc-modified-id="SparkSession-0.1.2">SparkSession</a></span></li><li><span><a href="#Transformations" data-toc-modified-id="Transformations-0.1.3">Transformations</a></span></li><li><span><a href="#Actions" data-toc-modified-id="Actions-0.1.4">Actions</a></span></li><li><span><a href="#Spark-Configuration-Files" data-toc-modified-id="Spark-Configuration-Files-0.1.5">Spark Configuration Files</a></span></li></ul></li></ul></li><li><span><a href="#Spark-SQL:" data-toc-modified-id="Spark-SQL:-1">Spark SQL:</a></span><ul class="toc-item"><li><ul class="toc-item"><li><span><a href="#Creating-the-Spark-Entry-point-(SparkSession)" data-toc-modified-id="Creating-the-Spark-Entry-point-(SparkSession)-1.0.1">Creating the Spark Entry point (SparkSession)</a></span></li><li><span><a href="#Data-Preparation" data-toc-modified-id="Data-Preparation-1.0.2">Data Preparation</a></span></li></ul></li></ul></li></ul></div>

In [1]:
import pyspark
import datetime
import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt

In [3]:
pyspark.__version__

'3.5.0'

- [Spark 3.1.0](https://spark.apache.org/docs/3.0.1/index.html)
- [Spark Python API](https://spark.apache.org/docs/latest/api/python/index.html#)
- [PySpark API Reference](https://spark.apache.org/docs/latest/api/python/reference/index.html#api-reference)
- [Transformations and Actions](https://spark.apache.org/docs/latest/api/python/reference/pyspark.streaming.html#transformations-and-actions)
- [Broadcast and Accumulator](https://spark.apache.org/docs/latest/api/python/reference/pyspark.html#broadcast-and-accumulator)

- One use of Spark SQL is to execute SQL queries. Spark SQL can also be used to read data from an existing Hive installation. When running SQL from within another programming language the results will be returned as a `Dataset`/`DataFrame`.
- The `Dataset` API is available in Scala and Java. Python does not have the support for the Dataset API.
- The `DataFrame` API is available in Scala, Java, Python, and R
   

### Terminology

<center><img src="../assets/spark/cluster-overview.png" width=400></center>

PySpark architecture is the underlying structure that enables the Python API for Apache Spark to distribute and process data across clusters efficiently. It comprises various components that work together to perform distributed data processing and analysis. Here's a detailed explanation of PySpark architecture:

-   `Driver Program`: The driver program is the entry point of a PySpark application. It's the main Python script that defines the sequence of operations to be executed on the Spark cluster. The driver program interacts with the SparkContext to coordinate tasks and collect results.
-   `SparkContext (Driver)`: The SparkContext is the central component of the driver program. It represents the connection to the Spark cluster and manages the allocation of resources, including memory and CPU, on the cluster. The SparkContext coordinates with the cluster manager to distribute tasks to worker nodes.
-   `Cluster Manager`: The cluster manager (e.g., Apache Mesos, Hadoop YARN, or Spark's standalone cluster manager) is responsible for managing the resources of the cluster. It allocates tasks to available worker nodes based on available resources and scheduling policies.
-   `Executor (Worker Nodes)`: Executors are worker nodes in the Spark cluster where actual computations take place. Each executor is responsible for running tasks on its allocated resources (CPU, memory). Executors store data in memory and on disk and communicate with the driver program and other executors.
-   `Task`: A task is a unit of work that is executed on an executor. Tasks are based on the operations defined in the driver program and include transformations and actions on RDDs or DataFrames.
-   `Job`: A job is a sequence of tasks that are launched as a result of an action being triggered in the driver program. Each action on an RDD or DataFrame triggers the execution of one or more jobs.
-   `Distributed Data`: Data is distributed across partitions, with each partition residing on a different executor. Data processing tasks are executed in parallel on each partition, enabling efficient distributed computing.
-   `Resilient Distributed Dataset (RDD)`: RDDs are the fundamental data structures in PySpark. They represent distributed collections of data, partitioned across the cluster. RDDs support transformations and actions, and they provide fault tolerance through lineage information.
-   `DataFrame and Catalyst Optimizer`: DataFrames are higher-level abstractions that provide schema-aware, optimized querying capabilities. The Catalyst Optimizer optimizes DataFrame queries and execution plans for better performance.
-   `Spark Core`: Spark Core is the foundation of the Spark architecture. It includes essential libraries and APIs for distributed task scheduling, memory management, and fault tolerance.
-   `Cluster Communication`: The driver program communicates with the executors and the cluster manager to distribute tasks and collect results. Communication happens via serialized data, allowing data to be transferred efficiently across the network.
-   `Storage Management`: Spark provides various storage levels for caching and persisting RDDs in memory or on disk. This helps reduce recomputation and improves performance.
-   `Shuffle and Data Movement`: Shuffle is the process of redistributing data across partitions. This often occurs when data needs to be grouped or aggregated. Spark minimizes data shuffling to optimize performance.
-   `Cluster Monitoring and Management`: Cluster monitoring tools track the status of the cluster, resource utilization, task execution progress, and failures. Monitoring helps administrators optimize cluster performance and troubleshoot issues.
-   `Serialization`: Data serialization is used to convert data structures into a format that can be transmitted across the network or stored. PySpark uses serialization to send tasks, data, and results between driver and executor nodes.

PySpark's architecture is designed to harness the power of distributed computing while abstracting away the complexities of managing clusters and parallelism. It allows developers to focus on defining high-level operations and let Spark handle the distribution, optimization, and fault tolerance of data processing tasks.

#### SparkContext

The SparkContext is the entry point to the underlying Spark engine and was the primary entry point to Spark before version 2.0. It is responsible for coordinating the resources and orchestrating the processing of data in a Spark application.

The SparkContext is responsible for setting up internal services, including the scheduler, the task scheduler, and the cluster manager. It also sets up external services, such as Hadoop Distributed File System (HDFS), Apache Cassandra, and Apache HBase.

In summary, the SparkContext is responsible for low-level programming of Spark, including job scheduling, task dispatching, and cluster management.

#### SparkSession

The SparkSession, introduced in Spark 2.0, is a higher-level entry point to Spark that provides a single unified interface to interact with Spark. The SparkSession combines the functionality of the SparkContext, SQLContext, and HiveContext into a single object.

The SparkSession provides a seamless integration with Spark SQL, which is the Spark module for structured data processing. It allows Spark applications to read and write data in various file formats, such as CSV, JSON, and Parquet, and execute SQL queries against it. The SparkSession also provides an API for working with datasets, which are a type-safe extension of the DataFrame API.

In summary, the SparkSession provides a high-level API for working with Spark that combines the functionality of the SparkContext, SQLContext, and HiveContext.

#### Transformations

- ***Selecting Columns**

```python
df.select("column1", "column2")
```

- ***Filtering Rows**

```python
df.filter(df.column1 == "value")
```

- ***Grouping and Aggretting**

```python
df.groupBy("column1").agg({"column2": "sum"})
```

- ***Joining**

```python
joined_df = df1.join(df2, "common_column")
```

- ***Ordering**

```python
df.orderBy("column1")
```

- **Windowing**

```python
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window = Window.partitionBy("column1").orderBy("column2")
df.withColumn("row_number", row_number().over(window))
```

#### Actions

- **Counting Rows**

    ```python
    df.count()
    ```

- **Collecting Data**

    ```python
    df.collect()
    ```

- **Writting Data**

    ```python
    # CSV
    df.write.csv("path/to/output.csv", header=True)

    # JSON
    df.write.json("path/to/output.json")

    # Parquet
    df.write.parquet("path/to/output.parquet")
    ```

- **Showing Data**

    ```python
    df.show()
    ```

- **Summarizing Data**

    ```python
    df.describe().show()
    ```

#### Spark Configuration Files

- `$ code $SPARK_HOME/conf`

## Spark SQL:

In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.functions import col, rand, desc, asc, sum as Fsum, udf     ## udf => UserDefinedFunction

#### Creating the Spark Entry point (SparkSession)

In [2]:
spark = SparkSession.builder \
        .appName("auto") \
        .getOrCreate()
# Enable eager evaluation for better formatting of the output


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/11/25 18:11:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10000000)
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

spark.conf.get("spark.sql.sources.bucketing.enabled")

spark.conf.get("spark.sql.autoBroadcastJoinThreshold")

# Disable Broadcast Join
spark.conf.set("spar.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spar.sql.adaptive.enabled", False)

In [5]:
spark.conf.get("spark.sql.warehouse.dir")

'file:/Users/am/mydocs/Software_Development/notes_hub/nbs/spark-warehouse'

In [4]:
# spark.conf.get("spark.sql.parquet.filterPushDown")

In [23]:
# spark.sparkContext.getConf().getAll()
# spark

#### Data Preparation

```python
# CSV
df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)

# JSON
df = spark.read.json("path/to/file.json")

# Parquet
df = spark.read.parquet("path/to/file.parquet")
```

In [9]:
DATA_DIR = os.environ['DATA'] + '/IBM_Data_Analysis'

In [10]:
# ! head -3 {DATA_DIR}/imports-85.csv

In [11]:
column_names = ['symboling', 'normalized-losses', 'make', 'fuel-type', 'aspiration',
       'num-of-doors', 'body-style', 'drive-wheels', 'engine-location',
       'wheel-base', 'length', 'width', 'height', 'curb-weight', 'engine-type',
       'num-of-cylinders', 'engine-size', 'fuel-system', 'bore', 'stroke',
       'compression-ratio', 'horsepower', 'peak-rpm', 'city-mpg',
       'highway-mpg', 'price']

In [12]:
sqldf = spark.read.csv(DATA_DIR + "/imports-85.csv/", header=False, inferSchema=True) # pyspark.sql.dataframe.DataFrame

In [13]:
# sqldf.columns

In [16]:
columns = sqldf.columns

for old_col, new_col in zip(columns, column_names):
    sqldf = sqldf.withColumnRenamed(old_col, new_col)

In [47]:
# # creates a temporary view against which we can run SQL queries.
# df = df.createOrReplaceTempView('auto')

In [48]:
# spark.sql("SELECT * FROM user_log LIMIT 2").show()

In [17]:
sqldf.printSchema()

root
 |-- symboling: integer (nullable = true)
 |-- normalized-losses: string (nullable = true)
 |-- make: string (nullable = true)
 |-- fuel-type: string (nullable = true)
 |-- aspiration: string (nullable = true)
 |-- num-of-doors: string (nullable = true)
 |-- body-style: string (nullable = true)
 |-- drive-wheels: string (nullable = true)
 |-- engine-location: string (nullable = true)
 |-- wheel-base: double (nullable = true)
 |-- length: double (nullable = true)
 |-- width: double (nullable = true)
 |-- height: double (nullable = true)
 |-- curb-weight: integer (nullable = true)
 |-- engine-type: string (nullable = true)
 |-- num-of-cylinders: string (nullable = true)
 |-- engine-size: integer (nullable = true)
 |-- fuel-system: string (nullable = true)
 |-- bore: string (nullable = true)
 |-- stroke: string (nullable = true)
 |-- compression-ratio: double (nullable = true)
 |-- horsepower: string (nullable = true)
 |-- peak-rpm: string (nullable = true)
 |-- city-mpg: integer (nu

In [18]:
# sqldf.describe()

In [20]:
sqldf.take(2)

[Row(symboling=3, normalized-losses='?', make='alfa-romero', fuel-type='gas', aspiration='std', num-of-doors='two', body-style='convertible', drive-wheels='rwd', engine-location='front', wheel-base=88.6, length=168.8, width=64.1, height=48.8, curb-weight=2548, engine-type='dohc', num-of-cylinders='four', engine-size=130, fuel-system='mpfi', bore='3.47', stroke='2.68', compression-ratio=9.0, horsepower='111', peak-rpm='5000', city-mpg=21, highway-mpg=27, price='13495'),
 Row(symboling=3, normalized-losses='?', make='alfa-romero', fuel-type='gas', aspiration='std', num-of-doors='two', body-style='convertible', drive-wheels='rwd', engine-location='front', wheel-base=88.6, length=168.8, width=64.1, height=48.8, curb-weight=2548, engine-type='dohc', num-of-cylinders='four', engine-size=130, fuel-system='mpfi', bore='3.47', stroke='2.68', compression-ratio=9.0, horsepower='111', peak-rpm='5000', city-mpg=21, highway-mpg=27, price='16500')]

- [75. Databricks | Pyspark | Performance Optimization - Bucketing](https://www.youtube.com/watch?v=fp0PN9Y9QiY)
- [74. Databricks | Pyspark | Interview Question: Sort-Merge Join (SMJ)](https://www.youtube.com/watch?v=DFtvA5O58X4&list=PLgPb8HXOGtsQeiFz1y9dcLuXjRh8teQtw&index=70)

- [Broadcast Join](https://www.youtube.com/watch?v=4ck3KOfSCzE)

- `spark.sql.autoBroadtcastJoinThreshold`

In [6]:
df = spark.range(1, 10000, 1, 10).select(col("id"), rand(10).alias("Attribute"))

In [7]:
df.show(5)

[Stage 0:>                                                          (0 + 0) / 1][Stage 0:>                                                          (0 + 1) / 1]

+---+------------------+
| id|         Attribute|
+---+------------------+
|  1|0.1709497137955568|
|  2|0.8051143958005459|
|  3|0.5775925576589018|
|  4|0.9476047869880925|
|  5|   0.2093704977577|
+---+------------------+
only showing top 5 rows



                                                                                

In [6]:
# df.write.partitionBy("Gender").parquet(output_path, mode="overwrite")

In [7]:
df.count()

9999

In [8]:
df.write.format("parquet").saveAsTable("non_bucketed_table")

                                                                                

In [30]:
! open file:/Users/am/mydocs/Software_Development/notes_hub/nbs/spark-warehouse