#### Spark Internals

Example to remember -> A classroom has to count candies'bags

The classroom is the cluster, the teacher is the driver, a desk with 2 students is an executor and a student is a core. Two students at the same desk share the same accessories, pencils, notebooks...
Candy bags are our dataset and each candy bag is a partition. Each candy piece is a record in our dataset.

**spark.speculation**

Apache Spark has the ‘speculative execution’ feature to handle the slow tasks in a stage due to environmental issues like slow network, disk, etc. If one task is running slowly in a stage, the Spark driver can launch a speculation task for it on a different host. Between the regular task and its speculation task, the Spark system will later take the result from the first successfully completed task and kill the slower one.

So, if I set `spark.speculation` to true -> it will re-launch one or more tasks if they are running slowly in a stage.

**spark.memory.fraction**

It expresses the size of M as a fraction of the (JVM heap space - 300MiB) (default 0.6). The rest of the space (40%) is reserved for user data structures, internal metadata in Spark, and safeguarding against OOM errors in the case of sparse and unusually large records.

It is the percentage of memory used for computation in shuffles, joins, sorts and aggregations.

##### Shuffling

##### Narrow and wide transformations 
* In Narrow transformation (select, filter, cast, union, contains, map, flatMap, MapPartition, sample, union, drop, cache, coalesce --> when numPartitions is reduced), all the elements that are required to compute the records in single partition live in the single partition of parent RDD. A limited subset of partition is used to calculate the result. Narrow transformations are the result of map() and filter() functions and these compute data that live on a single partition meaning there will not be any data movement between partitions to execute narrow transformations.
* In wide transformation (distinct, groupBy, sort, join, orderBy, repartition, collect, cartesian, intersection, reducedByKey, groupByKey), all the elements that are required to compute the records in the single partition may live in many partitions of parent RDD. Wider transformations are the result of groupByKey() and reduceByKey() functions and these compute data that live on many partitions meaning there will be data movements between partitions to execute wider transformations. Since these shuffles the data, they also called shuffle transformations.

Spark supports two types of shared variables: broadcast variables and accumulators.

**Accumulators**, which are variables that are only “added” to, such as counters and sums, through an associative and commutative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types.

As a user, you can create named or unnamed accumulators. A named accumulator will display in the web UI for the stage that modifies that accumulator. Spark displays the value for each accumulator modified by a task in the “Tasks” table.

Tracking accumulators in the UI can be useful for understanding the progress of running stages.

An accumulator is created from an initial value v by calling `SparkContext.accumulator(v)`. Tasks running on a cluster can then add to it using the add method or the += operator. However, they cannot read its value. Only the driver program can read the accumulator’s value, using its value method.

Accumulators do not change the lazy evaluation model of Spark. If they are being updated within an operation on an RDD, their value is only updated once that RDD is computed as part of an action. Consequently, accumulator updates are not guaranteed to be executed when made within a lazy transformation like map().

**BROADCAST VARIABLES (and accumulators)**. For the exam remember that broadcast variables are immutable and lazily replicated across all nodes in the cluster when an action is triggered. Broadcast variables are efficient at scale, as they avoid the cost of serializing data for every task. They can be used in the context of RDDs or Structured APIs.

In Spark RDD and DataFrame, Broadcast variables are read-only shared variables that are cached and available on all nodes in a cluster in-order to access or use by the tasks. Instead of sending this data along with every task, spark distributes broadcast variables to the machine using efficient broadcast algorithms to reduce communication costs.

Let me explain with an example, assume you are getting a two-letter country state code in a file and you wanted to transform it to full state name, (for example CA to California, NY to New York e.t.c) by doing a lookup to reference mapping. In some instances, this data could be large and you may have many such lookups (like zip code).
Instead of distributing this information along with each task over the network (overhead and time consuming), we can use the broadcast variable to cache this lookup info on each machine and tasks use this cached info while executing the transformations.

When you run a Spark RDD, DataFrame jobs that has the Broadcast variables defined and used, Spark does the following.

* Spark breaks the job into stages that have distributed shuffling and actions are executed with in the stage.
* Later Stages are also broken into tasks
* Spark broadcasts the common data (reusable) needed by tasks within each stage.
* The broadcasted data is cache in serialized format and deserialized before executing each task.

You should be creating and using broadcast variables for data that shared across multiple stages and tasks.
Note that broadcast variables are not sent to executors with sc.broadcast(variable) call instead, they will be sent to executors when they are first used.

The Spark Broadcast is created using the broadcast(v) method of the SparkContext class. This method takes the argument v that you want to broadcast.

A broadcast variable is entirely cached on each worker node so it doesn't need to be shipped or shuffled between nodes with each stage.

`spark.sql.autoBroadcastJoinThreshold` -> property used to configure the broadcasting of a dataframe without the use of the broadcast() operation. Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE tableName COMPUTE STATISTICS noscan has been run. (Default 10485760 (10 MB))

broadcast function: Default is 10mb but we have used till 300 mb which is controlled by spark.sql.autoBroadcastJoinThreshold.

A custom broadcast class can be defined by extending org.apache.spark.utilbroadcastV2 in Java or Scala or pyspark.Accumulatorparams in Python.

It is a way of updating a value inside a variety of transformations and propagating that value to the driver node in an efficient and fault-tolerant way.

It provides a mutable variable that Spark cluster can safely update on a per-row basis.

Join operations are a common type of transformation in big data analytics in which two data sets, in the form of tables or DataFrames, are merged over a common
matching key. Join operations trigger a large amount of data movement across Spark executors.

At the heart of these transformations is how Spark computes what data to produce, what keys and associated data to write to the disk, and how to transfer those keys and data to nodes as part of operations like groupBy(), join(), agg(), sortBy(), and reduceByKey(). This movement is commonly referred to as the shuffle.


-> **Join, big table-to-small table**

When the table is small enough to fit into the memory of a single worker node, we can optimize our join. It can often be more efficient to use a broadcast join. What this means is that we will replicate our small DataFrame onto every worker node in the cluster. Now this sounds expensive. However, what this does is
prevent us from performing the all-to-all communication during the entire join process. Instead, we perform it only once at the beginning and then let each individual worker node perform the work without having to wait or communicate with any other worker node.

**BROADCAST JOIN** (Broadcast Hash Join, also known as map-side-only join): It is a join operation of a large data frame with a smaller data frame in PySpark Join model. It reduces the data shuffling by broadcasting the smaller data frame in the nodes of PySpark cluster. The data is sent and broadcasted to all nodes in the cluster. This is an optimal and cost-efficient join model that can be used in the PySpark application.

Broadcasting is something that publishes the data to all the nodes of a cluster in PySpark data frame. Broadcasting further avoids the shuffling of data and the data network operation is comparatively lesser.

The broadcast hash join is employed when two data sets, one small (fitting in the driver’s and executor’s memory) and another large enough to ideally be spared from movement, need to be joined over certain conditions or columns. Using a Spark broadcast variable, the smaller data set is broadcasted by the driver to all Spark executors, and subsequently joined with the larger data set on each executor.

By default Spark will use a broadcast join if the smaller data set is less than 10 MB. This configuration is set in `spark.sql.autoBroadcastJoinThreshold`; you can decrease or increase the size depending on how much memory you have on each executor and in the driver. If you are confident that you have enough memory you can use a broadcast join with DataFrames larger than 10 MB (even up to 100 MB).

The default value of spark.sql.autoBroadcastJoinThreshold is 10MB but my table size is 15 MB. Hence, the broadcast join will not take place.

The BHJ is the easiest and fastest join Spark offers, since it does not involve any shuffle of the data set; all the data is available locally to the executor after a broadcast. You just have to be sure that you have enough memory both on the Spark driver’s and the executors’ side to hold the smaller data set in memory.

In Spark 3.0, you can use `joinedDF.explain('mode')` to display a readable and digestible output. The modes include 'simple', 'extended', 'codegen', 'cost', and
'formatted'.

Specifying a value of -1 in `spark.sql.autoBroadcastJoinThreshold` will cause Spark to always resort to a shuffle sort merge join. (-1 disable broadcasting)

Broadcast is not a valid join type. If i want to join the DataFrame "itemsDF" with the larger DataFrame "transactionsDF" on column "itemID" I can use this code:

`transactionsDf.join(broadcast(itemsDf), "itemId")`

This would imply a inner join (the default in dataframe.join())

**Shuffle Sort Merge Join**, as the name indicates, involves a sort operation. Shuffle Sort Merge Join has 3 phases.
* Shuffle Phase – both datasets are shuffled
* Sort Phase – records are sorted by key on both sides
* Merge Phase – iterate over both sides and join based on the join key.
Shuffle Sort Merge Join is preferred when both datasets are big and can not fit in memory – with or without shuffle.

`spark.sql.join.preferSortMergeJoin`  by default is set to true as this is preferred when datasets are big on both sides. 
Spark will pick Broadcast Hash Join if a dataset is small. In our case both datasets are small so to force a Sort Merge join we are setting `spark.sql.autoBroadcastJoinThreshold`  to -1 and this will disable Broadcast Hash Join.

Sort merge join doesn’t work on non equi joins. Both shuffle and sort are expensive operations. Use this join when a broadcast hash and shuffle hash joins are not possible.

The sort-merge algorithm is an efficient way to merge two large data sets over a common key that is sortable, unique, and can be assigned to or stored in the same partition—that is, two data sets with a common hashable key that end up being on the same partition. This join scheme has two phases: a sort phase followed by a merge phase. By default, the SortMergeJoin is enabled via spark.sql.join.preferSortMerge Join.

When to use a shuffle sort merge join -> When each key within two large data sets can be sorted and hashed to the same partition by Spark.

When different join strategy hints are specified on both sides of a join -> Spark prioritizes the broadcast hint over the merge.

##### Spark accumulator variables

* Accumulators provide a shared, mutable variable that a Spark cluster can safely update on a per-row basis.
* For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will be applied only once, meaning that restarted tasks will not update the value.
* In transformations, each task’s update can be applied more than once if tasks or job stages are re-executed.
* You can define your own custom accumulator class by extending `org.apache.spark.util.AccumulatorV2` in Java or Scala or `pyspark.AccumulatorParam` in Python.
* The Spark UI doesn't display all accumulators used by your application.

Spark Accumulators are shared variables which are only “added” through an associative and commutative operation and are used to perform counters (Similar to Map-reduce counters) or sum operations.
Spark by default supports to create an accumulators of any numeric type and provide a capability to add custom accumulator types.

Broadcast variables are shared, immutable variables that are cached on every machine in the cluster instead of being serialized with every single task. The canonical use case is to pass around a small table that does fit in memory on executors.

Programmers can create following accumulators
* named accumulators
* unnamed accumulators

When you create a named accumulator, you can see them on Spark web UI under the “Accumulator” tab. On this tab, you will see two tables; the first table “accumulable” – consists of all named accumulator variables and their values. And on the second table “Tasks” – value for each accumulator modified by a task.

And, unnamed accumulators are not shows on Spark web UI, For all practical purposes it is suggestable to use named accumulators.

Spark by default provides accumulator methods for long, double and collection types. All these methods are present in SparkContext class and return 
* LongAccumulator 
* DoubleAccumulator
* CollectionAccumulator

Accumulators are variables that are only “added” to through an associative operation and can therefore, be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types. If accumulators are created with a name, they will be displayed in Spark’s UI. An accumulator is created from an initial value v by calling SparkContext.accumulator(v).

* In Apache Spark all transformations are evaluated lazily and all the actions are evaluated eagerly. In this case, the only command that will be evaluated lazily is df.join(). Eagerly evaluated commands are df.collect(), df.take(), df.show(), df.saveAsTable() (all actions)

* Which command does not generate a shuffle? -> it must be a narrow transformation, for example map() as wide dependencies generate a shuffle (example collect, orderBy, repartition, distinct, join).

SHUFFLE: A shuffle is the process by which data is compared across partitions. We use shuffle to redistribute data among different executors or even among machines. Shuffle is an expensive operation.

The following spark config property represents the number of partition used in a wide transformation like join():

`spark.sql.shuffle.partitions`

-> Configures the number of partitions to use when shuffling data for joins or aggregations. (Default 200)

You can change the default number of 200 partitions:

`spark.conf.set("spark.sql.shuffle.partitions, 100)`

Spark provides `spark.sql.shuffle.partitions` and `spark.default.parallelism` configurations to work with parallelism or partitions. 

Before we jump into the differences let’s understand what is Spark shuffle? The Spark **shuffle** is a mechanism for redistributing or re-partitioning data so that the data grouped differently across partitions (functions like groupBy(), union(), join() can give a shuffle as a result, a shuffle among the executors or even among the machines). Spark shuffle is a very expensive operation as it moves the data between executors or even between worker nodes in a cluster. Spark automatically triggers the shuffle when we perform aggregation and join operations on RDD and DataFrame.

`spark.default.parallelism` was introduced with RDD hence this property is only applicable to RDD. The default value for this configuration set to the number of all cores on all nodes in a cluster, on local, it is set to the number of cores on your system.

Whereas `spark.sql.shuffle.partitions` was introduced with DataFrame and it only works with DataFrame, the default value for this configuration set to 200.

When we program in spark we start with "read" and then for example "select, filter, group by, filter and finally write" but for spark is the opposite, it starts executing backwards, from "write" and then it sees which is the transformation tha precedes it. 

Shuffles introduce stage boundaries
Shuffles demarcate stage boundaries
-> shuffle write
-> shuffle read

**Caching data** explicity accomplish the same thing. First execution caches some results, subsequent execution can read cache. For example if we cached at step 3, we would skip the first 3 operations.

##### Catalyst optimizer and query optimization

Spark uses two engines to optimize and run the queries - **Catalyst and Tungsten**, in that order. Catalyst basically generates an optimized physical query plan from the logical query plan by applying a series of transformations like predicate pushdown, column pruning, and constant folding on the logical plan. This optimized query plan is then used by Tungsten to generate optimized code, that resembles hand written code, by making use of Whole-stage Codegen functionality introduced in Spark 2.0.

Catalyst optimizer > This is the mechanism that makes our queries so incredibly fast.
Extensible query optimizer, it contains a general library for representing trees and applying rules to manipulate them. It has several public extension points, including external data sources and users'defined types.

At the core of Spark SQL is the Catalyst optimizer, which leverages advanced programming language features (e.g. Scala’s pattern matching and quasi quotes) in a novel way to build an extensible query optimizer. Catalyst is based on functional programming constructs in Scala and designed with these key two purposes:
* Easily add new optimization techniques and features to Spark SQL
* Enable external developers to extend the optimizer (e.g. adding data source specific rules, support for new data types, etc.)

Catalyst contains a general library for representing trees and applying rules to manipulate them. On top of this framework, it has libraries specific to relational query processing (e.g., expressions, logical query plans), and several sets of rules that handle different phases of query execution: analysis, logical optimization, physical planning, and code generation to compile parts of queries to Java bytecode. For the latter, it uses another Scala feature, quasiquotes, that makes it easy to generate code at runtime from composable expressions. Catalyst also offers several public extension points, including external data sources and user-defined types. As well, Catalyst supports both rule-based and cost-based optimization.

##### Project Tungsten

Project Tungsten will be the largest change to Spark’s execution engine since the project’s inception. It focuses on substantially improving the efficiency of memory and CPU for Spark applications, to push performance closer to the limits of modern hardware. The goal of Project Tungsten is to improve Spark execution by optimising Spark jobs for CPU and memory efficiency. This effort includes three initiatives:

* Memory Management and Binary Processing: leveraging application semantics to manage memory explicitly and eliminate the overhead of JVM object model and garbage collection.
* Cache-aware computation: algorithms and data structures to exploit memory hierarchy.
* Code generation: using code generation to exploit modern compilers and CPUs.

Improvements of project Tungsten take place at code generation (the second-generation Tungsten engine, introduced in Spark 2.0, uses this approach to generate compact RDD code for final execution).

![query optimization](https://files.training.databricks.com/images/aspwd/query_optimization_catalyst.png)

* Unresolved logical plan -> the instructions, what the developer logically wants to happen but columns, UDFS ecc are not resolved so the may not exist or we may have typos in the code.
This logical plan only represents a set of abstract transformations, it’s purely to convert the user’s set of expressions into the most optimized version. It does this by converting user code into an **unresolved logical plan**. This plan is unresolved because although your code might be valid, the tables or columns that it refers to might or might not exist. Spark uses the catalog, a repository of all table and DataFrame information, to resolve columns and tables in the analyzer. The analyzer might reject the unresolved logical plan if the required table or column name does not exist in the catalog. If the analyzer can resolve it, the result is passed through the Catalyst Optimizer, a collection of rules that attempt to optimize the
logical plan by pushing down predicates or selections.
* (Metadata catalog) Then analysis happens, here is where we evaluate columns names, tables names ecc. 
* Logical plan -> then we make sure we are not referring to no existing columns or we have an order by to sort. We pontentially rewrite and reorder and so on the logical sequence. From this we get the OPTIMIZED LOGICAL PLAN.
* Next comes the physical plans where the catalyst optimizer determines that there are multiple ways of executing a query. They represent what the engine will actually do. It is distinct from the logical plan because all of the optimization has been applied. Each optimization provides a different benefit -> cost model, each possible physical plan is evaluated according to the cost model and one is selected. After successfully creating an optimized logical plan, Spark then begins the physical planning process. The physical plan, often called a Spark plan, specifies how the logical plan will execute on the cluster by generating different physical execution strategies and comparing them through a cost model.
* Finally we have code generation. Once the process is done the selected physical plan is compiled down to RDDs. Physical planning results in a series of RDDs and transformations.

**CATALOG** The catalog is an interface through which the user may create, drop, alter or query underlying databases, tables, functions, etc.

##### Adaptive Query Execution (AQE)

-> Spark Query optimization technique that simplifies tuning of shuffle partition number.

(AQE) is a new feature of spark 3.0. Disabled by default but it is recommended to enable it. It creates runtime statistics, these are based on the statistics of the finished planned nodes and re-optimize the execution plan of the remaining queries. Adaptive Query Execution re-optimises queries at materialisation points.
AQE does not apply to all kinds of queries, but only to queries that are not streaming queries and that contain at least one exchange (typically expressed through a join, aggregate, or window operator) or one subquery.

Adaptive Query Execution features are dynamically switching join strategies and dynamically optimising skew joins.

As soon as one or more of these stages finish materialization, the framework marks them complete in the physical query plan and updates the logical query plan accordingly, with the runtime statistics retrieved from completed stages. Based on these new statistics, the framework then runs the optimizer (with a selected list of logical optimization rules), the physical planner, as well as the physical optimization rules, which include the regular physical rules and the adaptive-execution-specific rules, such as coalescing partitions, skew join handling, etc. Now that we’ve got a newly optimized query plan with some completed stages, the adaptive execution framework will search for and execute new query stages whose child stages have all been materialized, and repeat the above execute-reoptimize-execute process until the entire query is done.

The AQE framework is shipped with three features:

- Dynamically coalescing shuffle partitions
- Dynamically switching join strategies
- Dynamically optimizing skew joins


**Data skew** can severely downgrade the performance of join queries. This feature dynamically handles skew in sort-merge join by splitting (and replicating if needed) skewed tasks into roughly evenly sized tasks. Spark  dynamically handles skew in sort-merge join by splitting skewed partitions. The configuration used to enable this feature is `spark.sql.adaptive.enabled` `spark.sql.adaptive.skewJoin.enabled` (For the last one: Spark dynamically handles skew in sort-merge join by splitting skewed partitions.)

AQE converts sort-merge join to broadcast hash join when the runtime statistics of any join side is smaller than the broadcast hash join threshold. Which property is used to enable this feature? -> `spark.sql.adaptive.localShuffleReader.enabled`

`storesDF.describe("sqft")` returns a DataFrame containing summary statistics only for columns qft in DataFrame storesDF.

`explain(..)` prints the query plans, optionally formatted by a given explain mode.

In [0]:
df = spark.read.parquet("/mnt/training/ecommerce/events/events.parquet")
df.show(7)

+-------+------------------+----------+------------------------+----------------+--------------------+--------------------+--------------+--------------------------+-----------------+
| device|         ecommerce|event_name|event_previous_timestamp| event_timestamp|                 geo|               items|traffic_source|user_first_touch_timestamp|          user_id|
+-------+------------------+----------+------------------------+----------------+--------------------+--------------------+--------------+--------------------------+-----------------+
|  macOS|{null, null, null}|  warranty|        1593878899217692|1593878946592107|      {Montrose, MI}|                  []|        google|          1593878899217692|UA000000107379500|
|Windows|{null, null, null}|     press|        1593876662175340|1593877011756535|   {Northampton, MA}|                  []|        google|          1593876662175340|UA000000107359357|
|  macOS|{null, null, null}|  add_item|        1593878792892652|1593878815459100

In [0]:
from pyspark.sql.functions import col

limitEventsDF = (df
                 .filter(col("event_name") != "reviews")
                 .filter(col("event_name") != "checkout")
                 .filter(col("event_name") != "register")
                 .filter(col("event_name") != "email_coupon")
                 .filter(col("event_name") != "cc_info")
                 .filter(col("event_name") != "delivery")
                 .filter(col("event_name") != "shipping_info")
                 .filter(col("event_name") != "press")
                )

limitEventsDF.explain(True)

== Parsed Logical Plan ==
'Filter NOT ('event_name = press)
+- Filter NOT (event_name#4083 = shipping_info)
   +- Filter NOT (event_name#4083 = delivery)
      +- Filter NOT (event_name#4083 = cc_info)
         +- Filter NOT (event_name#4083 = email_coupon)
            +- Filter NOT (event_name#4083 = register)
               +- Filter NOT (event_name#4083 = checkout)
                  +- Filter NOT (event_name#4083 = reviews)
                     +- Relation [device#4081,ecommerce#4082,event_name#4083,event_previous_timestamp#4084L,event_timestamp#4085L,geo#4086,items#4087,traffic_source#4088,user_first_touch_timestamp#4089L,user_id#4090] parquet

== Analyzed Logical Plan ==
device: string, ecommerce: struct<purchase_revenue_in_usd:double,total_item_quantity:bigint,unique_items:bigint>, event_name: string, event_previous_timestamp: bigint, event_timestamp: bigint, geo: struct<city:string,state:string>, items: array<struct<coupon:string,item_id:string,item_name:string,item_revenue_in_u

##### Caching

What is the difference between caching and persistence? In Spark they are synonymous. Two API calls, cache() and persist(), offer these capabilities. The latter provides more control over how and where your data is stored—in memory and on disk, serialized and unserialized. Both contribute to better performance for frequently accessed DataFrames or tables.


By default the data of a DataFrame is present on a Spark cluster only while it is being processed during a query -- it is not automatically persisted on the cluster afterwards. (Spark is a data processing engine, not a data storage system.) You can explicity request Spark to persist a DataFrame on the cluster by invoking its `cache` method.

If you do cache a DataFrame, you should always explictly evict it from cache by invoking `unpersist` when you no longer need it. (`cachedDF.unpersist()`)

The elements that should be cached are frequently accessed DataFrames or tables. (As a general rule, you should use memory caching judiciously, as it can incur resource costs in serializing and deserializing, depending on the StorageLevel used, don't cache dataframes that are too big to fit in memory).

<img src="https://files.training.databricks.com/images/icon_best_32.png" alt="Best Practice"> Caching a DataFrame can be appropriate if you are certain that you will use the same DataFrame multiple times, as in:

- Exploratory data analysis
- Machine learning model training
- Common use cases for caching are scenarios where you will want to access a large data set repeatedly for queries or transformations. -> DataFrames accessed commonly for doing frequent transformations during ETL or building data pipelines.

<img src="https://files.training.databricks.com/images/icon_warn_32.png" alt="Warning"> Aside from those use cases, you should **not** cache DataFrames because it is likely that you'll *degrade* the performance of your application.

- Caching consumes cluster resources that could otherwise be used for task execution
- Caching can prevent Spark from performing query optimizations, as shown in the next example
- You should not cache pr persist when DataFrames that are too big to fit in memory

When you use cache() or persist(), the DataFrame is not fully cached until you invoke an action that goes through every record (e.g., count()). If you use an action like take(1), only one partition will be cached because Catalyst realizes that you do not need to compute all the partitions just to retrieve one record.

When using DataFrame.persist() data on disk is always serialized. Data on disk is always serialized using either Java or Kryo serialization.

Difference cache() and persist() 
* cache() command always places data in memory and disk by default (MEMORY_AND_DISK). Cache() will store as many of the partitions read in memory across Spark executors as memory allows. While a DataFrame may be fractionally cached, partitions cannot be fractionally cached (e.g., if you have 8 partitions but only 4.5 partitions can fit in memory, only 4 will be cached). However, if not all your partitions are cached, when you want to access the data again, the partitions that are not cached will have to be recomputed, slowing down your Spark job.
* persist() method can take a StorageLevel object to specify exactly where to cache data. (default MEMORY_AND_DISK) The default argument can be updated. Persist is nuanced, providing control over how your data is cached.

Data is always serialized when stored on disk, whereas you need to specify if you wish to serialize data in memory.

The cache() method will store as many of the partitions read in memory across Spark executors as memory allows. DataFrame may be fractionally cached, partitions cannot be fractionally cached (e.g., if you have 8 partitions but only 4.5 partitions can fit in memory, only 4 will be cached)

Example:
"Cache a df as SERIALIZED Java objects in the JVM and; 
If the df does not fit in memory, store the partitions that don’t fit on disk, and read them from 
there when they’re needed; 
Replicate each partition on two cluster nodes."
-> `df.persist(StorageLevel.MEMORY_AND_DISK_2_SER)`

* The default storage level for a DataFrame is StorageLevel.MEMORY_AND_DISK.
* The DataFrame class does not have an uncache() operation.
* Explicit caching can decrease application performance by interferring with the Catalyst optimizer's ability to optimize some queries.
* The default storage level is MEMORY_AND_DISK for cache() and persist() both in Spark 3.0.0. However, it was changed to MEMORY_AND_DISK_DESER in later versions (Spark 3.1.1). IF the exam is having an option as MEMORY_AND_DISK_DESER then go for it else MEMORY_AND_DISK is the correct answer.

* MEMORY_ONLY: Este es el comportamiento predeterminado del método cache() para almacenar RDD, y almacena DataFrames o RDD como objetos deserializados en la memoria JVM. Cuando no hay suficiente memoria disponible, no se guardarán los DataFrame de algunas particiones y se volverán a calcular cuando sea necesario. Esto requiere más memoria y a diferencia de RDD, esto sería más lento que el nivel MEMORY_AND_DISK ya que vuelve a calcular las particiones no guardadas y volver a calcular la representación en columnas en memoria de la tabla subyacente y esto es costoso.
* MEMORY_ONLY_SER: Es lo mismo que MEMORY_ONLY, pero la diferencia es que almacena el RDD como objetos serializados en la memoria JVM. Se necesita menos memoria (uso eficiente del espacio) que MEMORY_ONLY, ya que guarda objetos como serializados y requiere algunos ciclos adicionales de CPU para deserializar lo que puede implicar mas tiempo de procesamiento.
* MEMORY_AND_DISK: Este es el comportamiento predeterminado para almacenar DataFrames o Datasets. En este nivel de almacenamiento, el DataFrame se almacenará en la memoria JVM como objetos deserializados. Cuando el almacenamiento requerido es mayor que la memoria disponible, almacena algunas de las particiones sobrantes en el disco y lee los datos del disco cuando es necesario. ( Data is stored directly as objects in memory and a copy is serialized and stored on disk. Data is stored directly as objects in memory, but if there’s insufficient memory the rest is serialized and stored on disk.)
* MEMORY_AND_DISK_SER: Esto es parecido a MEMORY_AND_DISK, la diferencia es que serializa los objetos DataFrame en la memoria y en el disco cuando no hay espacio disponible. Like MEMORY_AND_DISK, but data is serialized when stored in memory. (Data is always serialized when stored on disk.)
* DISK_ONLY: En este nivel de almacenamiento, DataFrame se almacena sólo en el disco y el tiempo de cálculo de la CPU suele ser alto.

Si usamos la opción memoria only y el RDD no entra en memoria, desecha lo que no entre, y si hace falta usarlo de nuevo volverá a ejecutar la secuencia de transformaciones previas. En estos casos es muy útil la opción compartida de memoria y disco. 

La opción memoria y disco deja datos en disco cuando la memoria se llena.

La opción memoria y disco ser guarda en memoria la representación serializada de los datos. 

El metodo **persist** permite elegir el tipo de persistencia. 

Bear in mind that data is always serialized when stored on disk, whereas you need to specify if you wish to serialize data in memory (example MEMORY_AND_DISK_2_SER). StorageLevel.MEMORY_AND_DISK_SER_2 is same as MEMORY_AND_DISK_SER storage level but replicate each partition to two cluster nodes.

StorageLevel.MEMORY_AND_DISK_SER is same as MEMORY_AND_DISK storage level difference being it serializes the DataFrame objects in memory and on disk when space is not available.

The tab "storage" of Spark UI is useful to investigate information about your Cached DataFrames.

The amount of memory available to each executor is controlled by `spark.executor.memory`. During map and shuffle operations, Spark writes to and reads from the local disk’s shuffle files, so there is heavy I/O activity. This can result in a bottleneck, because the default configurations are suboptimal for large-scale Spark jobs.

Not only can you cache DataFrames, but you can also cache the tables or views derived from DataFrames. 

**Spark SQL** can cache tables using an in-memory columnar format by calling **spark.catalog.cacheTable("tableName")** or **dataFrame.cache()**. Then Spark SQL will scan only required columns and will automatically tune compression to minimize memory usage and GC pressure. You can call **spark.catalog.uncacheTable("tableName")** to remove the table from memory.

If you do CACHE TABLE MyTableName in SQL though, it is defaulted to be eager caching and will cache the entire table. You can choose LAZY caching in SQL like so:

`CACHE LAZY TABLE MyTableName`

##### Predicate pushdown

Predicate push down is another feature of Spark and Parquet that can improve query performance by reducing the amount of data read from Parquet files. Predicate push down works by evaluating filtering predicates in the query against metadata stored in the Parquet files. Parquet can optionally store statistics (in particular the minimum and maximum value for a column chunk) in the relevant metadata section of its files and can use that information to take decisions, for example, to skip reading chunks of data if the provided filter predicate value in the query is outside the range of values stored for a given column. 

Predicate Pushdown points to the filter conditions typically the ‘where clause’ which determines the number of rows to be returned. It basically relates to which rows will be filtered, not which columns.

**Catalyst** basically generates an optimized physical query plan from the logical query plan by applying a series of transformations like predicate pushdown, column pruning, and constant folding on the logical plan.

* Predicate pushdown corresponds to WHERE clause in the SQL query. If these can be use directly by external system (like Relational Databases ) or for partition pruning (like in Parquet) this means reduced amount of data that has to be transferred / loaded from disk.
* Constant folding is the process of recognizing and evaluating constant expressions at compile time rather than computing them at runtime. This is not in any particular way specific to Catalyst. It is just a standard compilation technique and its benefits should be obvious. It is better to compute expression once than repeat this for each row.
* Projection pruning benefits are pretty much the same as for predicate pushdown. If some columns are not used, downstream data source may discard this on read.

In [0]:
%scala
// Ensure that the driver class is loaded
Class.forName("org.postgresql.Driver")

In [0]:
jdbcURL = "jdbc:postgresql://54.213.33.240/training"

# Username and Password w/read-only rights
connProperties = {
    "user" : "training",
    "password" : "training"
}

ppDF = (spark
        .read
        .jdbc(
            url=jdbcURL,                  # the JDBC URL
            table="training.people_1m",   # the name of the table
            column="id",                  # the name of a column of an integral type that will be used for partitioning
            lowerBound=1,                 # the minimum value of columnName used to decide partition stride
            upperBound=1000000,           # the maximum value of columnName used to decide partition stride
            numPartitions=8,              # the number of partitions/connections
            properties=connProperties     # the connection properties
        )
        .filter(col("gender") == "M")   # Filter the data by gender
       )

ppDF.explain()

== Physical Plan ==
*(1) Scan JDBCRelation(training.people_1m) [numPartitions=8] [id#4149,firstName#4150,middleName#4151,lastName#4152,gender#4153,birthDate#4154,ssn#4155,salary#4156] PushedFilters: [*IsNotNull(gender), *EqualTo(gender,M)], ReadSchema: struct<id:int,firstName:string,middleName:string,lastName:string,gender:string,birthDate:timestam...




Note the lack of a **Filter** and the presence of a **PushedFilters** in the **Scan**. The filter operation is pushed to the database and only the matching records are sent to Spark. This can greatly reduce the amount of data that Spark needs to ingest.

In [0]:
 # Caching the data before filtering eliminates the possibility for the predicate push down.

cachedDF = (spark
            .read
            .jdbc(
                url=jdbcURL,
                table="training.people_1m",
                column="id",
                lowerBound=1,
                upperBound=1000000,
                numPartitions=8,
                properties=connProperties
            )
           )

cachedDF.cache()
filteredDF = cachedDF.filter(col("gender") == "M")

filteredDF.explain()

== Physical Plan ==
*(1) Filter (isnotnull(gender#4169) AND (gender#4169 = M))
+- InMemoryTableScan [id#4165, firstName#4166, middleName#4167, lastName#4168, gender#4169, birthDate#4170, ssn#4171, salary#4172], [isnotnull(gender#4169), (gender#4169 = M)]
      +- InMemoryRelation [id#4165, firstName#4166, middleName#4167, lastName#4168, gender#4169, birthDate#4170, ssn#4171, salary#4172], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *(1) Scan JDBCRelation(training.people_1m) [numPartitions=8] [id#4165,firstName#4166,middleName#4167,lastName#4168,gender#4169,birthDate#4170,ssn#4171,salary#4172] PushedFilters: [], ReadSchema: struct<id:int,firstName:string,middleName:string,lastName:string,gender:string,birthDate:timestam...




In addition to the **Scan** (the JDBC read) we saw in the previous example, here we also see the **InMemoryTableScan** followed by a **Filter** in the explain plan.

This means Spark had to read ALL the data from the database and cache it, and then scan it in cache to find the records matching the filter condition.

In [0]:
cachedDF.unpersist() # For DataFrames there is no method such as uncache().

Out[5]: DataFrame[id: int, firstName: string, middleName: string, lastName: string, gender: string, birthDate: timestamp, ssn: string, salary: int]

`spark.sql("CACHE LAZY TABLE flights_tbl")` -> Only cache the table when it is first used, instead of immediately.

Uncache a table named MY_TABLE -> `spark.catalog.uncacheTable("MY_TABLE")` For tables there is NOT a method like unpersist().

Spark catalog allows you to cache or uncache tables. You can also do it using spark.sql("uncache table table_name")

##### Partitioning

Partitioning of the DataFrame defines the layout of the DataFrame or Dataset’s physical distribution across the cluster.

The spark API uses the term core meaning a thread available for parallel execution, we can also refer to it as a slot to avoid confusion with the number of cores in the underliying cpu. It is not necessary an equal number. 

In most cases if you create a cluster you should know how many cores you have. However to check you can use:
`sc.defaultParallelism`
`spark.sparkContext.defaultParallelism`

In local mode you have a number of cores on the local machine. 

partitions of data: `df.rdd.getNumPartitions()`

A partition is a small piece of the total dataset

In [0]:
df = spark.read.parquet("/mnt/training/ecommerce/events/events.parquet")
df.rdd.getNumPartitions()

Out[6]: 4

In [0]:
# Access "SparkContext" through "SparkSession" to get the number of cores or slots.
# Use the "defaultParallelism" attribute to get the number of cores in a cluster.

print(spark.sparkContext.defaultParallelism)

# print(sc.defaultParallelism) "SparkContext" is also provided in Databricks notebooks as the variable "sc".

8


- **coalesce()** --> returns new DF with exactly N partitions when N < current # partitions (narrow transformation)
Can only be used to reduce the number of partitions.  It cannot increase the number of partitions. It will not even throw an error but simply ignore the operation.
- **repartition()** --> returns new DF with exactly N partitions (wide transformation)
Can be used to reduce or increase the number of partitions. Repartition will incur a full shuffle of the data, regardless of whether one is necessary. This means that you should typically only repartition when the future number of partitions is greater than your current number of partitions or when you are looking to partition by a set of columns:
`df.repartition(5, col("DEST_COUNTRY_NAME"))`


`df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)`

As an argument you can also have the col name, but it is the second argument after the number of partitions. Example: `transactionsDf.repartition(24, "itemId")`

So if I want to increase the number of partition I have only 1 option (repartition), if I want to reduce I can choose. Coalesce is a narrow transformation and perform better because it avoids shuffle, however it cannot guarantee even distribution of records across the partitions (for example I can end up with only a few partitions with the 80% of the data). It is faster. Repartition will give us a uniform distribution, but it is a wide transformation so we have the added cost of a shuffling operation, means it is slower.

Example: dataframe is very larga and has a large number of partitions, more than there are executors in the cluster. Assuming that there is one core per executor -> performance will be suboptimal because not all executors will be utilized at the same time.

Example:

I have a DataFrame (df1) which I want to repartition on the country column and create a new DataFrame (df2) I use -> `df2 = df1.repartition("Country")`

How many partitions will I have in df2? It depends on the `spark.sql.shuffle.partitions` value. If the value for `spark.sql.shuffle.partitions=200` then your new DataFrame df2 will have 200 partitions.

---
You have a DataFrame df. You already know that the DataFrame has got 20 partitions. You want to write this DataFrame as a Parquet file on a given path. How many parquet files will be created after the write operation? 20 parquet files.


The number of parquet files depends on the number of DataFRame partitions.

**Dynamic partition pruning (DPP)** -> to skip over the data you don’t need in a query’s results.
The key optimisation technique in DPP is to take the result of the filter from the dimension table and inject it into the fact table as part of the scan operation to limit the data read. It is enabled by default so that you don’t have to explicitly configure it. All this happens 
dynamically when you perform joins between two tables.

The typical scenario where DPP is optimal is when you are joining two tables: a fact table (partitioned over multiple columns) and a dimension table (nonpartitioned).

-> Allows you to read only as much data as you need.

It works based on the PushDownPredicate property. Using this, Spark can read the partitions only that are needed for the processing, rather than processing all the partitions. 
Processing the whole dataset and applying filtration is dilatory. Rather, pushing down the filter phase, before processing, will reduce the processing overhead.

`spark.sql.optimizer.dynamicPartitionPruning.enabled` -> Spark will generate predicate for partition column when it's used as join key to allow you to read as only required data.

The key optimization technique in DPP is to take the result of the filter from the dimension table (unpartitioned) and inject it into the fact table (partitioned) as part of the scan operation to limit the data read.

repartitionedDF = df.repartition(8)

df2 = df1.repartition(10, "Country")

The API is df.repartition(numPartitions, Column*). Repartition on a column name will create a hash partitioned DataFrame. The number of the partition depends on the spark.sql.shuffle.partitions value. If the value for spark.sql.shuffle.partitions=200 then your new Dataframe df2 will have 200 partitions.
However, you can override the spark.sql.shuffle.partitions passing the numPartitions in the API call.

`spark.files.maxPartitionBytes` -> The configuration you will change if you want to control the maximum partition size when reading files.

`spark.executor.cores` -> The configuration you will change if you want to control the number of available cores for the executors.

In [0]:
repartitionedDF.rdd.getNumPartitions()

Out[9]: 8

In [0]:
coalesceDF = df.coalesce(8)

In [0]:
coalesceDF.rdd.getNumPartitions()

Out[11]: 4

We always want the number of partitions to be a multiple of the number of slots (cores), that way every slot is used and every slot is assigned a task. 
Is advised that each partition is roughly around 200mb (based on efficiency)
CSVs are large on disc but small on ram
Parquet are highly compressed but not on ram

Example: I have 10 partitions and 8 slots (cores), is it better to increase it to 16 partitions or decrease to 8? It depends, if I decrese to 8 partitions they should not be more than 200MB, in this case better to choose 16 partitions.

##### Maximizing Spark parallelism

Much of Spark’s efficiency is due to its ability to run multiple tasks in parallel at scale.

In data management parlance, a partition is a way to arrange data into a subset of configurable and readable chunks or blocks of contiguous data on disk. These subsets of data can be read or processed independently and in parallel, if necessary, by more than a single thread in a process.

Spark is embarrassingly efficient at processing its tasks in parallel. For large-scale workloads a Spark job will have many stages, and within each stage there will be many tasks. 

To optimize resource utilization and maximize parallelism, the ideal is at least as many partitions as there are cores on the executor. If there are more partitions than there are cores on each executor, all the cores are kept busy. You can think of partitions as atomic units of parallelism: a single thread running on a single core can work on a single partition.

The size of a partition in Spark is dictated by `spark.sql.files.maxPartitionBytes`. The default is 128 MB.

Shuffle partitions are created during the shuffle stage. By default, the number of shuffle partitions is set to 200 in spark.sql.shuffle.partitions. You can adjust this number depending on the size of the data set you have, to reduce the amount of small partitions being sent across the network to executors’ tasks.

The default value for spark.sql.shuffle.partitions is too high for smaller or streaming workloads; you may want to reduce it to a lower value such as the number of cores on the executors or less. 

There is no magic formula for the number of shuffle partitions to set for the shuffle stage; the number may vary depending on your use case, data set, number of cores, and the amount of executor memory available. It's a trial and error approach.

Created during operations like groupBy() or join(), also known as wide transformations, shuffle partitions consume both network and disk I/O resources.

Wide transformations have to shuffle data, once the data is shuffled it has to be repartitioned.
Unlike repartition and coalesce we do not specify how many partitions to use.
The problem is the number of partition we eneded up with. 

Spark engineers decided the number 200 for the new partition size. However we can change it, see below.

deafult shuffle partitions:
`spark.conf.get("spark.sql.shuffle.partitions")`
`spark.conf.get("spark.sql.shuffle.partitions", "8")`

In [0]:
spark.conf.get("spark.sql.shuffle.partitions")

Out[12]: '200'

In [0]:
# Assuming that the data set isn't too large, you could configure the default number of shuffle partitions to match the number of cores:

spark.conf.set("spark.sql.shuffle.partitions", spark.sparkContext.defaultParallelism)
print(spark.conf.get("spark.sql.shuffle.partitions"))

8


* Don't allow partition size to increase > 200mb per 8gb of core total memory (For small data 3 partitions per core)

* Always err on the side of too many small than too few large partitions

* Size default shuffle partitions by dividing largest shuffle stage input by the target partition size
example: 4TB/200MB = 20000 shuffle partition count

Shuffle is very expensive. Best number of partitions dependes on data. If there are too many small partitions it can also slow down the query. Important to have the right number of partitions. 

 When writing a DataFrame to storage, the number of DataFrame partitions determines the number of data files written.

##### Bucketing

Bucketing is another file organization approach with which you can control the data that is specifically written to each file. This can help avoid shuffles later when you go to read the data because data with the same bucket ID will all be grouped together into one physical partition. This means that the data is prepartitioned according to how you expect to use that data later on, meaning you can avoid expensive shuffles when joining or aggregating.
Rather than partitioning on a specific column (which might write out a ton of directories), it’s probably worthwhile to explore bucketing the data instead. This will create a certain number of files and organize our data into those “buckets”.

Example with scala:

`val numberBuckets = 10`

`val columnToBucketBy = "count"`

`csvFile.write.format("parquet").mode("overwrite")
.bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")
`

We can eliminate the Exchange (shuffle) step from the Shuffle Sort Join scheme if we create partitioned buckets columns on which we want to perform frequent equi-joins. Presorting and reorganizing data in this way boosts performance, as it allows us to skip the expensive Exchange operation and go straight to WholeStageCodegen.

Adaptive Query Execution (AQE) is now able to dynamically coalesce shuffle partitions</a> at runtime. This means that you can set `spark.sql.shuffle.partitions` based on the largest data set your application processes and allow AQE to reduce the number of partitions automatically when there is less data to process.

The `spark.sql.adaptive.enabled` configuration option controls whether AQE is turned on/off.

One of the most important questions for Adaptive Query Execution is when to reoptimize. Spark operators are often pipelined and executed in parallel processes. However, a shuffle or broadcast exchange breaks this pipeline. We call them materialization points and use the term “query stages” to denote subsections bounded by these materialization points in a query. Each query stage materializes its intermediate result and the following stage can only proceed if all the parallel processes running the materialization have completed. This provides a natural opportunity for reoptimization, for it is when data statistics on all partitions are available and successive operations have not started yet.

When the query starts, the Adaptive Query Execution framework first kicks off all the leaf stages — the stages that do not depend on any other stages. As soon as one or more of these stages finish materialization, the framework marks them complete in the physical query plan and updates the logical query plan accordingly, with the runtime statistics retrieved from completed stages. Based on these new statistics, the framework then runs the optimizer (with a selected list of logical optimization rules), the physical planner, as well as the physical optimization rules, which include the regular physical rules and the adaptive-execution-specific rules, such as coalescing partitions, skew join handling, etc. Now that we’ve got a newly optimized query plan with some completed stages, the adaptive execution framework will search for and execute new query stages whose child stages have all been materialized, and repeat the above execute-reoptimize-execute process until the entire query is done.

In Spark 3.0, the AQE framework is shipped with three features:

* Dynamically coalescing shuffle partitions
* Dynamically switching join strategies
* Dynamically optimizing skew joins

In [0]:
spark.conf.get("spark.sql.adaptive.enabled")

Out[14]: 'true'

##### Out-of-Memory errors

An out-of-memory error occurs when either the driver or an executor does not have enough memory to collect or process the data allocated to it.

The OutOfMemory Exception can occur at the Driver or Executor level.

* Driver is a Java process where the main() method of our Java/Scala/Python program runs. It executes the code and creates a SparkSession/ SparkContext which is responsible to create Data Frame, Dataset, RDD to execute SQL, perform Transformation & Action, etc.
* Executors are launched at the start of a Spark Application with the help of Cluster Manager. These can be dynamically launched and removed by the Driver as and when required. It runs an individual task and returns the result to the Driver. It can also persist data in the worker nodes for re-usability.

* OutOfMemory at the Driver Level:
OutOfMemory error can occur here due to incorrect usage of Spark. The driver in the Spark architecture is only supposed to be an orchestrator and is therefore provided less memory than the executors. You should always be aware of what operations or tasks are loaded to your driver. Few unconscious operations which we might have performed could also be the cause of error: `Collect()`
* OutOfMemory at the Executor Level:
There are a few common reasons also that would cause this failure:
* Inefficient queries
* High concurrency
* Incorrect configuration

##### Garbage collection tuning

JVM garbage collection can be a problem when you have large “churn” in terms of the RDDs stored by your program. (It is usually not a problem in programs that just read an RDD once and then run many operations on it.) When Java needs to evict old objects to make room for new ones, it will need to trace through all your Java objects and find the unused ones. The main point to remember here is that the cost of garbage collection is proportional to the number of Java objects, so using data structures with fewer objects (e.g. an array of Ints instead of a LinkedList) greatly lowers this cost. An even better method is to persist objects in serialized form, as described above: now there will be only one object (a byte array) per RDD partition. Before trying other techniques, the first thing to try if GC is a problem is to use serialized caching.

GC can also be a problem due to interference between your tasks’ working memory (the amount of space needed to run the task) and the RDDs cached on your nodes. We will discuss how to control the space allocated to the RDD cache to mitigate this.

-> A simplified description of the garbage collection procedure: When Eden is full, a minor GC is run on Eden and objects that are alive from Eden and Survivor1 are copied to Survivor2. The Survivor regions are swapped. If an object is old enough or Survivor2 is full, it is moved to Old. Finally, when Old is close to full, a full GC is invoked.

Possible strategies in order to decrease garbage collection time:
- Use structured APIs and create fewer objects
- Increase the Java Heap Size

You gather statistics on how frequently garbage collection occurs and the amount of time it takes by adding -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps to the Java options. You can find the GC statistics -> check in the worker's log file for garbage collection details.

Which property is used to set FAIR scheduler so you can allocate jobs to different resource pools? -> `spark.scheduler.mode`

The use of the `spark.scheduler.allocation.file` configuration is to create and configure FAIR schedular pools.