## Query Optimization

![Query Optimization](pics/Query%20Optimization.PNG)

### Cache

- **df.cache()** -> Will persist the df in the cluster memory  
*By doing so, you will be using Spark memory for storage which affects its efficiency*
- **df.unpersist()** -> Will unpersist the df from the cluster memory

### Reading from JDBCs

In [None]:
df = (spark
      .read
      .jdbc(
        url="jdbc:driver://ip/db_name",
        table="db_name.table_name",
        column="col_1", #Name of the column that will be used for partitioning
        lowerBound=1, #Min value of col_1 to decide partition stride
        upperBound=1000000, #Max value of col_1 to decide partition stride
        numPartitions=8, #Number of partitions/connections
        properties={"user" : "user_name",
                    "password" : "user_password"}
      )
)

### Partitions
Partitions are created that are equal to the number of CPU cores in the machine.  
Data in a partition exists on a single node in the cluster.

- **Core/Slot** -> Means a thread available for parallel execution. It can also be called **Slot**  
*Generally, the number of slots is decided while setting the cluster but in case it is unknown, the way to check is:*  
*sc.defaultParallelism*  
*spark.sparkContext.defaultParallelism*

- **Partitions** -> Small piece of a dataset.  
*To find the number of partitions of a RDD, you can use:*  
*df.rdd.getNumPartitions()*

Partitions should be equal or a multiple of the number of cores.  
The recommended size for each partition is 200MB.  
When using less than that per partition, lower the number to 1 partition per core.  
When using more than that, increase the number of partitions by a multiple of the number of cores.

## Repartition a DataFrame:
- **coalesce()** -> Returns new df with exactly N partitions when N < current n# of partitions  
*Narrow transformation, performs better*  
*No Shuffling*  
*Not able to increase n# of partitions*
  
- **repartition()** -> Returns new df with exactly N partitions  
*Wide transformation*  
*Evenly balanced partition sizes*  
*Requires shuffling all data*

## Shuffle Partitions:
To set the number of shuffle partitions:  
spark.conf.set("spark.sql.shuffle.partitions", value)  

To check the number of shuffle partitions:  
print(spark.conf.get("spark.sql.shuffle.partitions")  )

## Adaptive Query Execution (Spark 3):
Able to coalesce shuffle partitions at runtime. The following command controls whether AQE is on/off:  
spark.conf.get("spark.sql.adaptive.enabled")