## User-defined functions (UDFs) and vectorized UDFs

In Spark, a User-Defined Function (UDF) is a feature that allows you to define your own transformations using Python or Scala. UDFs work by transforming values from a single row within the dataframe. However, they have a performance overhead because each function needs a Python API call instead of running pure Spark code.

Vectorized UDFs, on the other hand, are introduced in Spark to make Python UDFs more efficient. With vectorized UDFs, you can directly apply a function to whole columns in the dataframe, making use of vectorized operations provided by libraries like Pandas. This reduces the overhead and improves performance.

Let's see an example of both UDF and vectorized UDF:

In [0]:
# Importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
import pandas as pd
from pyspark.sql.functions import pandas_udf

# Creating a Spark session
spark = SparkSession.builder.appName('UDF_example').getOrCreate()

# Sample dataframe
df = spark.createDataFrame([(1,), (2,), (3,)], ['value'])

# Regular UDF
def add_one(x):
    return x + 1

add_one_udf = udf(add_one, IntegerType())
df.withColumn('value_plus_one', add_one_udf(df['value'])).show()

# Vectorized UDF
@pandas_udf('int')
def add_one_vectorized(s: pd.Series) -> pd.Series:
    return s + 1

df.withColumn('value_plus_one_vectorized', add_one_vectorized(df['value'])).show()

+-----+--------------+
|value|value_plus_one|
+-----+--------------+
|    1|             2|
|    2|             3|
|    3|             4|
+-----+--------------+

+-----+-------------------------+
|value|value_plus_one_vectorized|
+-----+-------------------------+
|    1|                        2|
|    2|                        3|
|    3|                        4|
+-----+-------------------------+



## Spark Internals

Apache Spark is a distributed computing system that processes large datasets in parallel. Understanding its internals helps in optimizing Spark applications for better performance. Here are some key components:

- **Driver Program**: It's the central point and the entry point of the Spark Application. It runs the main function and creates the SparkContext.
- **SparkContext**: It's the client-side object that represents the connection to a Spark cluster and is used to create RDDs, accumulators, and broadcast variables on that cluster.
- **Cluster Manager**: Spark supports different cluster managers like Standalone, Mesos, and YARN. It's responsible for acquiring resources on the Spark cluster.
- **Executor**: Once the SparkContext connects to the Cluster Manager, it acquires executors on nodes in the cluster. Executors are Spark processes that run computations and store data for your application.
- **Task**: It's a unit of work that can be run on a partition of a distributed dataset and gets executed on a single executor.
- **Job**: A parallel computation consisting of multiple tasks that get spawned in response to a Spark action (e.g., save, collect).
- **Stage**: Jobs are divided into stages. Stages are classified as a set of tasks based on their dependencies on each other.

Let's visualize the Spark internals with a flowchart.

## Query Optimization

Query optimization is a crucial aspect of Spark that ensures efficient execution of operations. Spark's Catalyst engine handles query optimization. Here's how it works:

- **Logical Optimization**: This step involves rule-based transformations on the logical plan to optimize it. Examples include predicate pushdown, constant folding, and boolean simplifications.
- **Physical Planning**: The logical plan is translated into one or more physical plans. The best plan is selected based on cost estimation.
- **Code Generation**: To improve performance, Spark generates bytecode for the selected physical plan, which is then executed on the JVM.

Spark's ability to optimize queries means that developers can focus on the logic of their applications without worrying too much about performance tuning.

Let's see an example that demonstrates query optimization in Spark.

In [0]:
from pyspark.sql import functions as F

# Sample dataframe
data = [('Alice', 25), ('Bob', 30), ('Catherine', 29)]
columns = ['Name', 'Age']
df = spark.createDataFrame(data, columns)

# Applying multiple transformations
df_transformed = df.filter(F.col('Age') > 25).withColumn('Age after 5 years', F.col('Age') + 5)

# Show the execution plan
df_transformed.explain(True)

== Parsed Logical Plan ==
'Project [Name#32, Age#33L, '`+`('Age, 5) AS Age after 5 years#36]
+- Filter (Age#33L > cast(25 as bigint))
   +- LogicalRDD [Name#32, Age#33L], false

== Analyzed Logical Plan ==
Name: string, Age: bigint, Age after 5 years: bigint
Project [Name#32, Age#33L, (Age#33L + cast(5 as bigint)) AS Age after 5 years#36L]
+- Filter (Age#33L > cast(25 as bigint))
   +- LogicalRDD [Name#32, Age#33L], false

== Optimized Logical Plan ==
Project [Name#32, Age#33L, (Age#33L + 5) AS Age after 5 years#36L]
+- Filter (isnotnull(Age#33L) AND (Age#33L > 25))
   +- LogicalRDD [Name#32, Age#33L], false

== Physical Plan ==
*(1) Project [Name#32, Age#33L, (Age#33L + 5) AS Age after 5 years#36L]
+- *(1) Filter (isnotnull(Age#33L) AND (Age#33L > 25))
   +- *(1) Scan ExistingRDD[Name#32,Age#33L]

== Photon Explanation ==
Photon does not fully support the query because:
		Unsupported node: Scan ExistingRDD[Name#32,Age#33L].

Reference node:
	Scan ExistingRDD[Name#32,Age#33L]



## Partitioning

Partitioning is a fundamental concept in Spark that allows it to distribute data across the cluster and parallelize operations. Here's a brief overview:

- **What is Partitioning?**
  - Partitioning is the process of dividing a large dataset into smaller chunks (partitions) that can be processed in parallel.

- **Why is it Important?**
  - Partitioning ensures that data related to a particular computation is located close to the computation, reducing data shuffling and improving performance.
  - It allows Spark to parallelize operations, leading to faster execution.

- **Types of Partitioning:**
  - **Hash Partitioning**: Data is partitioned based on the hash value of the partition key.
  - **Range Partitioning**: Data is partitioned based on a range of values of the partition key.

- **Managing Partitions:**
  - Spark provides methods like `repartition()` and `coalesce()` to increase or decrease the number of partitions.

Let's see an example demonstrating partitioning in Spark.

In [0]:
# Checking the number of partitions in the original dataframe
num_partitions = df.rdd.getNumPartitions()

# Repartitioning the dataframe into 3 partitions
df_repartitioned = df.repartition(3)
num_partitions_repartitioned = df_repartitioned.rdd.getNumPartitions()

# Reducing the number of partitions using coalesce
df_coalesced = df_repartitioned.coalesce(2)
num_partitions_coalesced = df_coalesced.rdd.getNumPartitions()

num_partitions, num_partitions_repartitioned, num_partitions_coalesced

(4, 3, 2)

## Streaming API

Spark's Streaming API allows for processing real-time data streams. It provides a high-level API for stream processing using the same batch processing model that Spark's core API uses. Here are some key points:

- **DStreams**: At the heart of Spark Streaming is the concept of Discretized Stream or DStream, which represents a continuous stream of data. DStreams can be created by ingesting data from various sources like Kafka, Flume, and HDFS.
- **Micro-batching**: Spark Streaming processes data using a micro-batching approach. The incoming data is divided into small batches, and these batches are processed using Spark's core engine.
- **Stateful Operations**: Spark Streaming provides built-in functions to maintain and query state information over time, such as `updateStateByKey` and `window` operations.
- **Integration with Other Spark Components**: Spark Streaming can be easily integrated with other Spark components like MLlib and GraphX for machine learning and graph processing on streaming data.

Let's see a simple example of Spark Streaming (Note: This is a conceptual example and won't produce real-time streaming output in this notebook environment).

In [0]:
from pyspark.streaming import StreamingContext

# Creating a local StreamingContext with two execution threads and a batch interval of 1 second
ssc = StreamingContext(spark.sparkContext, 1)

# Creating a DStream that reads data from localhost:9999
lines = ssc.socketTextStream('localhost', 9999)

# Splitting each line into words
words = lines.flatMap(lambda line: line.split(' '))

# Counting each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Printing the first ten elements of each RDD generated in this DStream
wordCounts.pprint()

# Note: To run this code, you would need to first run a Netcat server using 'nc -lk 9999' in a terminal.



## Delta Lake

Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark and big data workloads. It's designed to provide reliability, performance, and data integrity to data lakes. Here are some key features and benefits of Delta Lake:

- **ACID Transactions**: Delta Lake ensures that data is stored reliably in the data lake by providing full ACID transaction capabilities.
- **Scalable Metadata Handling**: It can handle petabyte-scale tables with billions of partitions and files.
- **Time Travel (Data Versioning)**: Delta Lake provides snapshots of data, enabling developers to access and revert to earlier versions of data for audits, rollbacks, or reproducing experiments.
- **Unified Batch and Streaming Source and Sink**: With Delta Lake, you can run batch and streaming workloads simultaneously on the same dataset.
- **Schema Enforcement and Evolution**: It ensures data integrity by enforcing schemas and allowing for schema evolution.

Delta Lake works over your existing data and is fully compatible with the Spark API. This means you can use Delta Lake with your existing Spark code.

Let's see a simple example of using Delta Lake (Note: This is a conceptual example and might require additional setup in a real environment).

In [0]:
# Importing required libraries for Delta Lake
from delta.tables import DeltaTable

# Creating a sample dataframe
data = [('John', 'Doe', 29), ('Jane', 'Doe', 25)]
columns = ['first_name', 'last_name', 'age']
df = spark.createDataFrame(data, columns)

# Writing the dataframe to Delta Lake format
path = '/tmp/delta-table'
df.write.format('delta').save(path)

# Reading from Delta Lake
df_delta = spark.read.format('delta').load(path)
df_delta.show()

# Note: This is a conceptual example. In a real environment, you might need to set up Delta Lake and its dependencies.

+----------+---------+---+
|first_name|last_name|age|
+----------+---------+---+
|      John|      Doe| 29|
|      Jane|      Doe| 25|
+----------+---------+---+

