# Apache Spark Introduction

* Introduction
* Apache Spark and Hadoop MapReduce
* Spark Core
* RDD - Resilient Distributed Dataset
* Spark DataFrame and SQL
* Spark Streaming
* Example of creating a simple SparkS application

## 1. Introduction

![image.png](attachment:25a4bd53-b7d9-45b8-a298-0af92cf6d9a7.png)

- **Spark Core:** is the general execution engine that underpins Spark. All other functions are built on top of Spark Core. It provides computing capabilities on RAM memory and also on reference data sets in external storage systems.
- **Spark SQL:** is a component on top of Spark Core, introducing a new data abstraction concept called SchemaRDD, providing support for structured and semi-structured data.
- **Spark Streaming:** takes advantage of Spark Core's memory-base scheduling capabilities to perform streaming analytics. It takes data in mini-batches and performs RDD (Resilient Distributed Data Set) transformations on those data mini-batches.
- **MLlib (Machine Learning Library):** is a distributed machine learning framework on Spark that takes advantage of the high-speed computing capabilities thanks to the distributed memory-based Spark architecture.
- **GraphX:** is a distributed graph processing framework. It provides an API to perform graph calculations that can model user-defined graphs using a pre-optimized API.

## 2. Apache Spark Vs Hadoop MapReduce

**The operating mechanism of MapReduce (MR):**
- Input data is read from HDFS (component in charge of storage in Hadoop) → processed by specified operations → output is written to HDFS → data continues to be loaded → next operation is performed → output continues to be written into HDFS. That sequence of `[read-process-write]` steps is repeated until the job is completed
- Each step needs to write output to HDFS before the next step is executed → this creates problems with storage and replication, increasing processing latency since most of it is done on Disks that are inherently I/ O is not high

![image.png](attachment:238cf886-65c0-4544-8b3a-4d37688ce10b.png)

**The operating mechanism of Spark**
- Overcoming the shortcomings of Hadoop MapReduce, Spark introduces a new concept RDD — Resilient Distributed Dataset acts as a `basic data structure` in Spark
- RDD is defined as an abstraction for a set of `immutable elements` (essentially stored on read-only memory cells)
- Input data from the storage system only needs to be `loaded once`, the steps to transform and process input data are planned, optimized and performed continuously until the output is returned at the end of the job
- The entire process takes place on `RAM memory` (when RAM runs out, it will be transferred to Disk) to take advantage of high I/O performance, thereby reducing execution time `10–100 times` smaller than Hadoop. MapReduce

![image.png](attachment:12f18425-0a82-4ef3-bc68-d55f0cde7417.png)

## 3. Spark Core

**SparkContext:**
- Core functionality for low-level programming and cluster interaction
- Creates RDDs
- Performs transformations and defines actions

**SparkSession:**
- Extends and includes SparkContext functionality
- Higher-level abstractions like DataFrames and Datasets
- Supports structured querying using SQL or DataFrame API
- Provides data source APIs, machine learning algorithms, and streaming capabilities

![image.png](attachment:050b9449-feeb-416c-9012-ebd3ed683c4d.png)

**A Spark program will include 2 main components:**

- **Driver Program:** Is a JVM Process, containing the main() function like any other JVM program, it plays the role of coordinating the code/processing logic on the Driver. Driver program contains SparkSession
- **Executors:** Are workers, responsible for performing logic calculations received from the Driver. Data to be processed can be loaded directly into the Executor's memory.


## 4. RDD - Resilient Distributed Dataset

**What is RDDs:**
- Immutable: transformations create new RDDs
- Distributed: data partitioned and processed in parallel
- Resilient: lineage tracks transformations for fault tolerance
- Lazily evaluated: execution plan optimized, transformations evaluated when necessary
- Fault-tolerant operations: map, filter, reduce, collect, count, save, etc

![image.png](attachment:cd866a85-3527-4ca5-83d2-d26da12dc3cd.png)

## 5. Spark DataFrame and Spark SQL

When executing, Spark SQL will still `call down` to the Core layer below, using `RDD for calculation`. Some important features of Spark SQL can be summarized as follows:
- Built on top of the Spark Core layer, inheriting all the features that RDD has.
- Work with data sets that are DataSet or DataFrame (structured, distributed data sets)
- High performance, scalability and fault tolerance
- Interoperable with other components in the overall Spark Framework (such as Streaming/ Mllib, GraphX)
- Includes 2 components: `DataSet API` and `Catalyst Optimizer`.

![image.png](attachment:85b377e4-a280-4787-a83b-5a6f5b924dde.png)!

**Advantages of DataFrames over RDDs:**
- Leverage Spark's Catalyst Optimizer: DataFrames uses Catalyst Optimizer, a powerful optimization engine that improves performance by performing advanced optimizations.
- Specific optimization techniques:
    - **Predicate Pushdown:** Data filtering (filter) is pushed to a `lower level`, helping to reduce the amount of data processed from the source.
    - **Constant Folding:** `Precalculate constant values` in expressions to reduce calculation work at runtime.
    - **Loop Unrolling:** Optimize the loop to `reduce the number of iterations` and increase execution speed.

## 6. Spark Streaming

- **Traditional Stream Processing**

Traditionally, distributed stream processing has been implemented with a `record-at-time` processing model. This architecture can achieve very `low-level` latency which mean the input records can be processed by pipeline and the output result can be generated within `miliseconds`

But it has some drawbacks, this model is not efficient at recovering from the node failiures. If a node is slower than other nodes in cluster, it can recover from failure very fast with a lot of extra resources or can you the minimal resources but the recovery time will be very slow

![image.png](attachment:4f0ff4c6-ad5b-4ffd-b976-ca815c7026c2.png)

- **Micro-Batch Stream Processing (DStream)**

The streaming computation is modeled as a continuous `series of small`, map/reduce-style batch processing jobs on small chunks of the stream data. It means the Spark Streaming will divide data input from input stream into micro-batch, and the output will be aggregated in each batch. This breaking down the streaming computation inro smaller tasks will lead to two advantages:
- Spark agile task scheduling can be very quickly to recover from failures. It utilizes `multiple nodes` for recomputation
- It ensure the output is the same for many times the task recomputed, it is called `deterministic` characteristic

Unfortunely, the micro-batch model cannot achieve milliseconds-level latencies but `second-level`


![image.png](attachment:43b5f324-c983-49d9-b1ca-8e7812a9a85a.png)

- **Structured Streaming in Spark Streaming**

**Drawbacks of DStream (old version of Spark Streaming):**
- Lack of single API for batch and stream processing
- Lack of separation between logical and physical plan
- Lack of native support for event-time windows

**Structured Streaming (latest version of Spark Streaming):**
- A single, unified programming model and interface for batch and stream processing
- A broader definition of stream processing

**Unbounded Table for stream appending data row:**

![image.png](attachment:9e5ac3c1-deed-4df3-9cc1-107c42e360aa.png)

**Incremetal execution on streaming data:**

![image.png](attachment:16630f57-2689-4210-b3a2-8f4981c65705.png)

## 7. Example for creating a simple Spark application

- **Create SparkSession**

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .appName('Ingest checkin table into bronze') \
    .master('spark://spark-master:7077') \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083")\
    .config("spark.hadoop.fs.s3a.path.style.access", "true")\
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")\
    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')\
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
    .config('spark.sql.warehouse.dir', f's3a://lakehouse/')\
    .enableHiveSupport()\
    .getOrCreate()

In [15]:
import random

## TODO: Generate 100 sample data
names = ["Alice", "Bob", "Charlie", "David", "Emma", "Frank", "Grace", "Hannah", "Isaac", "Julia"]
data = [{"Name": random.choice(names), "Age": random.randint(20, 40)} for _ in range(100)]

df = spark.createDataFrame(data)

In [18]:
df.show(5)

+---+-----+
|Age| Name|
+---+-----+
| 32| Emma|
| 28|Isaac|
| 40|Julia|
| 24|David|
| 26|Alice|
+---+-----+
only showing top 5 rows



In [19]:
df.printSchema()

root
 |-- Age: long (nullable = true)
 |-- Name: string (nullable = true)



In [20]:
print("The number of rows", df.count())
print("The number of columns", len(df.columns))

The number of rows 100
The number of columns 2


- **Kill Spark Session**

In [31]:
spark.stop()