# Query plans

- how to read spark query plan 

In [28]:
import pyspark

In [29]:
pyspark.__version__

'3.5.1'

# Spark context & spark session

- Spark session is meant for authentication
- Spark context is meant for authorization

# Schema Definition

In [60]:

from pyspark.sql.types import * 
schema = StructType([StructField("author", StringType(), False),
                     StructField("title", StringType(), False),
                     StructField("pages", IntegerType(), False)])

# In Python DDL: schema = "author STRING, title STRING, pages INT"

In [67]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

# Define schema for our data using DDL 
schema = "`Id` INT, `First` STRING, `Last` STRING, `Url` STRING, `Published` STRING, `Hits` INT, `Campaigns` ARRAY<STRING>"

# Create our static data 
data = [[1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter","51", "LinkedIn"]], 
        [2, "Brooke","Wenig", "https://tinyurl.2", "5/5/2018", 8908, ["twitter", "LinkedIn"]],
        [3, "Denny", "Lee", "https://tinyurl.3", "6/7/2019", 7659, ["web", "twitter", "FB", "LinkedIn"]],
        [4, "Tathagata", "Das", "https://tinyurl.4", "5/12/2018", 10568, ["twitter", "FB"]],
        [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web", "twitter", "FB", "LinkedIn"]],
        [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568, ["twitter", "LinkedIn"]]]


In [65]:
spark.stop()

In [66]:
spark = SparkSession.builder \
        .appName("Schemaexample2") \
        .master("local[*]") \
        .getOrCreate()

In [59]:
conf = SparkConf(loadDefaults=False)
conf.toDebugString()

''

In [68]:
# Create a DataFrame using the schema defined above
blogs_df = spark.createDataFrame(data, schema) 

In [69]:
# Show the DataFrame; it should reflect our table above 
blogs_df.show() 


                                                                                

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535|[twitter, 51, Lin...|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+



In [70]:
blogs_df.rdd.getNumPartitions()

16

In [71]:
blogs_df.explain(True)

== Parsed Logical Plan ==
LogicalRDD [Id#855, First#856, Last#857, Url#858, Published#859, Hits#860, Campaigns#861], false

== Analyzed Logical Plan ==
Id: int, First: string, Last: string, Url: string, Published: string, Hits: int, Campaigns: array<string>
LogicalRDD [Id#855, First#856, Last#857, Url#858, Published#859, Hits#860, Campaigns#861], false

== Optimized Logical Plan ==
LogicalRDD [Id#855, First#856, Last#857, Url#858, Published#859, Hits#860, Campaigns#861], false

== Physical Plan ==
*(1) Scan ExistingRDD[Id#855,First#856,Last#857,Url#858,Published#859,Hits#860,Campaigns#861]



In [35]:
# Print the schema used by Spark to process the DataFrame 
print(blogs_df.printSchema())

root
 |-- Id: integer (nullable = true)
 |-- First: string (nullable = true)
 |-- Last: string (nullable = true)
 |-- Url: string (nullable = true)
 |-- Published: string (nullable = true)
 |-- Hits: integer (nullable = true)
 |-- Campaigns: array (nullable = true)
 |    |-- element: string (containsNull = true)

None


In [36]:
blogs_df.explain()

== Physical Plan ==
*(1) Scan ExistingRDD[Id#344,First#345,Last#346,Url#347,Published#348,Hits#349,Campaigns#350]




# Spark Optimization

## Dynamic allocation

- spark.dynamicAllocation.enabled true
- spark.dynamicAllocation.minExecutors 2
- spark.dynamicAllocation.schedulerBacklogTimeout 1m
- spark.dynamicAllocation.maxExecutors 20
- spark.dynamicAllocation.executorIdleTimeout 2min

By default spark.dynamicAllocation.enabled is set to false. When enabled with the settings shown here, the Spark driver will request that the cluster manager create two executors to start with, as a minimum (spark.dynamicAllocation.minExecu tors). As the task queue backlog increases, new executors will be requested each time the backlog timeout (spark.dynamicAllocation.schedulerBacklogTimeout) is exceeded. In this case, whenever there are pending tasks that have not been scheduled for over 1 minute, the driver will request that a new executor be launched to schedule backlogged tasks, up to a maximum of 20 (spark.dynamicAllocation.maxExecu tors). By contrast, if an executor finishes a task and is idle for 2 minutes (spark.dynamicAllocation.executorIdleTimeout), the Spark driver will terminate it.

## Memory and the shuffle service

![title](memory.png)

# Partitions

- To optimize resource utilization and maximize parallelism, the ideal is at least as many partitions as there are cores on the executor
- a single thread running on a single core can work on a single partition
- **How partitions**
    - are created. As mentioned previously, Spark’s tasks process data as partitions read from disk into memory. Data on disk is laid out in chunks or contiguous file blocks, depending on the store. By default, file blocks on data stores range in size from 64 MB to 128 MB. For example, on HDFS and S3 the default size is 128 MB (this is configurable). A contiguous collection of these blocks constitutes a partition.

- The size of a partition in Spark is dictated by spark.sql.files.maxPartitionBytes.

### Shuffle partitions
- shuffle partitions are created during the shuffle stage. By default, the number of shuffle partitions is set to **200** in **spark.sql.shuffle.partitions**. You can adjust this number depending on the size of the data set you have, to reduce the amount of small partitions being sent across the network to executors’ tasks.

# Caching

In [45]:
 
df = spark.range(1 * 10000000).toDF("id")
df.cache() # Cache the data 
df.count() # Materialize the cache

                                                                                

10000000

In [38]:
df.count() # Now get it from the cache res4: Long = 10000000 Command took 0.44 seconds

10000000

# Persist

- persist(StorageLevel.LEVEL) is nuanced, providing control over how your data is cached via StorageLevel. Table 7-2 summarizes the different storage levels. Data on disk is always serialized using either Java or Kryo serialization.

![title](persist.png)

In [39]:
import pyspark.storagelevel as sl
# Create a DataFrame with 10M records  
df = spark.range(1 * 100000000).toDF("id")
df.persist(sl.StorageLevel.DISK_ONLY) #Serialize the data and cache it on disk 
df.count() # Materialize the cache res2: Long = 10000000 Command took 2.08 seconds 


                                                                                

100000000

In [40]:
df.count() #Now get it from the cache res3: Long = 10000000 Command took 0.38 seconds

100000000

# Joins 

## Broadcast join

- By default Spark will use a broadcast join if the smaller data set is **less than 10 MB**. This configuration is set **spark.sql.autoBroadcastJoinThreshold**
- Specifying a value of **-1** in **spark.sql.autoBroadcastJoinThreshold** will cause Spark to always resort to a shuffle sort merge join

In [72]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import random

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Join Example") \
    .getOrCreate()

# Set autoBroadcastJoinThreshold to -1 to disable broadcast join
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

# Define sample data for states and items
states = {0: "AZ", 1: "CO", 2: "CA", 3: "TX", 4: "NY", 5: "MI"}
items = {0: "SKU-0", 1: "SKU-1", 2: "SKU-2", 3: "SKU-3", 4: "SKU-4", 5: "SKU-5"}

# Generate sample data for usersDF
users_data = [(id, f"user_{id}", f"user_{id}@databricks.com", states[random.randint(0, 4)]) for id in range(1000001)]
usersDF = spark.createDataFrame(users_data, ["uid", "login", "email", "user_state"])

# Generate sample data for ordersDF
orders_data = [(r, r, random.randint(0, 9999), 10 * r * 0.2, states[random.randint(0, 4)], items[random.randint(0, 4)]) for r in range(1000001)]
ordersDF = spark.createDataFrame(orders_data, ["transaction_id", "quantity", "users_id", "amount", "state", "items"])

# Perform the join operation
usersOrdersDF = ordersDF.join(usersDF, col("users_id") == col("uid"))

# Show the joined results
usersOrdersDF.show(truncate=False)



24/07/05 17:09:40 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
24/07/05 17:14:24 WARN TaskSetManager: Stage 3 contains a task of very large size (1534 KiB). The maximum recommended task size is 1000 KiB.
24/07/05 17:14:32 WARN TaskSetManager: Stage 4 contains a task of very large size (2977 KiB). The maximum recommended task size is 1000 KiB.

+--------------+--------+--------+--------+-----+-----+---+------+---------------------+----------+
|transaction_id|quantity|users_id|amount  |state|items|uid|login |email                |user_state|
+--------------+--------+--------+--------+-----+-----+---+------+---------------------+----------+
|4731          |4731    |0       |9462.0  |AZ   |SKU-2|0  |user_0|user_0@databricks.com|TX        |
|11856         |11856   |0       |23712.0 |CA   |SKU-1|0  |user_0|user_0@databricks.com|TX        |
|12050         |12050   |0       |24100.0 |CA   |SKU-3|0  |user_0|user_0@databricks.com|TX        |
|17285         |17285   |0       |34570.0 |CO   |SKU-3|0  |user_0|user_0@databricks.com|TX        |
|24361         |24361   |0       |48722.0 |CO   |SKU-0|0  |user_0|user_0@databricks.com|TX        |
|24867         |24867   |0       |49734.0 |TX   |SKU-2|0  |user_0|user_0@databricks.com|TX        |
|32284         |32284   |0       |64568.0 |CA   |SKU-4|0  |user_0|user_0@databricks.com|TX        |


                                                                                

In [44]:
usersOrdersDF.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [users_id#701L], [uid#691L], Inner
   :- Sort [users_id#701L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(users_id#701L, 200), ENSURE_REQUIREMENTS, [plan_id=1009]
   :     +- Filter isnotnull(users_id#701L)
   :        +- Scan ExistingRDD[transaction_id#699L,quantity#700L,users_id#701L,amount#702,state#703,items#704]
   +- Sort [uid#691L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(uid#691L, 200), ENSURE_REQUIREMENTS, [plan_id=1010]
         +- Filter isnotnull(uid#691L)
            +- Scan ExistingRDD[uid#691L,login#692,email#693,user_state#694]


