-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>

# Query Optimization

We'll explore query plans and optimizations for several examples including logical optimizations and examples with and without predicate pushdown.

##### Objectives
1. Logical optimizations
1. Predicate pushdown
1. No predicate pushdown

##### Methods
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html" target="_blank">DataFrame</a>: `explain`

![query optimization](https://files.training.databricks.com/images/aspwd/query_optimization_catalyst.png)

![query optimization aqe](https://files.training.databricks.com/images/aspwd/query_optimization_aqe.png)

Let’s run our set up cell, and get our initial DataFrame stored in the variable `df`. Displaying this DataFrame shows us events data.

In [0]:
%run ./Includes/Classroom-Setup

In [0]:
df = spark.read.parquet(eventsPath)
display(df)

device,ecommerce,event_name,event_previous_timestamp,event_timestamp,geo,items,traffic_source,user_first_touch_timestamp,user_id
macOS,"List(null, null, null)",warranty,1593878899217692.0,1593878946592107,"List(Montrose, MI)",List(),google,1593878899217692,UA000000107379500
Windows,"List(null, null, null)",press,1593876662175340.0,1593877011756535,"List(Northampton, MA)",List(),google,1593876662175340,UA000000107359357
macOS,"List(null, null, null)",add_item,1593878792892652.0,1593878815459100,"List(Salinas, CA)","List(List(null, M_STAN_T, Standard Twin Mattress, 595.0, 595.0, 1))",youtube,1593878455472030,UA000000107375547
iOS,"List(null, null, null)",mattresses,1593878178791663.0,1593878809276923,"List(Everett, MA)",List(),facebook,1593877903116176,UA000000107370581
Windows,"List(null, null, null)",mattresses,,1593878628143633,"List(Cottage Grove, MN)",List(),google,1593878628143633,UA000000107377108
Windows,"List(null, null, null)",main,,1593878634344194,"List(Medina, MN)",List(),youtube,1593878634344194,UA000000107377161
iOS,"List(null, null, null)",main,,1593877936171803,"List(Mount Pleasant, UT)",List(),direct,1593877936171803,UA000000107370851
macOS,"List(null, null, null)",main,,1593876843215329,"List(Piedmont, AL)",List(),instagram,1593876843215329,UA000000107360961
Android,"List(null, null, null)",warranty,1593878529774474.0,1593879213196400,"List(Rancho Santa Margarita, CA)",List(),instagram,1593878529774474,UA000000107376205
Windows,"List(null, null, null)",main,,1593876713246514,"List(Elyria, OH)",List(),facebook,1593876713246514,UA000000107359805


### Logical Optimization

`explain(..)` prints the query plans, optionally formatted by a given explain mode. Compare the following logical plan & physical plan, noting how Catalyst handled the multiple `filter` transformations.

In [0]:
from pyspark.sql.functions import col

limitEventsDF = (df
                 .filter(col("event_name") != "reviews")
                 .filter(col("event_name") != "checkout")
                 .filter(col("event_name") != "register")
                 .filter(col("event_name") != "email_coupon")
                 .filter(col("event_name") != "cc_info")
                 .filter(col("event_name") != "delivery")
                 .filter(col("event_name") != "shipping_info")
                 .filter(col("event_name") != "press")
                )

limitEventsDF.explain(True)

Of course, we could have written the query originally using a single `filter` condition ourselves. Compare the previous and following query plans.

In [0]:
betterDF = (df
            .filter((col("event_name").isNotNull()) &
                    (col("event_name") != "reviews") &
                    (col("event_name") != "checkout") &
                    (col("event_name") != "register") &
                    (col("event_name") != "email_coupon") &
                    (col("event_name") != "cc_info") &
                    (col("event_name") != "delivery") &
                    (col("event_name") != "shipping_info") &
                    (col("event_name") != "press"))
           )

betterDF.explain(True)

Of course, we wouldn't write the following code intentionally, but in a long, complex query you might not notice the duplicate filter conditions. Let's see what Catalyst does with this query.

In [0]:
stupidDF = (df
            .filter(col("event_name") != "finalize")
            .filter(col("event_name") != "finalize")
            .filter(col("event_name") != "finalize")
            .filter(col("event_name") != "finalize")
            .filter(col("event_name") != "finalize")
           )

stupidDF.explain(True)

### Caching

By default the data of a DataFrame is present on a Spark cluster only while it is being processed during a query -- it is not automatically persisted on the cluster afterwards. (Spark is a data processing engine, not a data storage system.) You can explicity request Spark to persist a DataFrame on the cluster by invoking its `cache` method.

If you do cache a DataFrame, you should always explictly evict it from cache by invoking `unpersist` when you no longer need it.

<img src="https://files.training.databricks.com/images/icon_best_32.png" alt="Best Practice"> Caching a DataFrame can be appropriate if you are certain that you will use the same DataFrame multiple times, as in:

- Exploratory data analysis
- Machine learning model training

<img src="https://files.training.databricks.com/images/icon_warn_32.png" alt="Warning"> Aside from those use cases, you should **not** cache DataFrames because it is likely that you'll *degrade* the performance of your application.

- Caching consumes cluster resources that could otherwise be used for task execution
- Caching can prevent Spark from performing query optimizations, as shown in the next example

### Predicate Pushdown

Here is example reading from a JDBC source, where Catalyst determines that *predicate pushdown* can take place.

In [0]:
%scala
// Ensure that the driver class is loaded
Class.forName("org.postgresql.Driver")

In [0]:
jdbcURL = "jdbc:postgresql://54.213.33.240/training"

# Username and Password w/read-only rights
connProperties = {
    "user" : "training",
    "password" : "training"
}

ppDF = (spark
        .read
        .jdbc(
            url=jdbcURL,                  # the JDBC URL
            table="training.people_1m",   # the name of the table
            column="id",                  # the name of a column of an integral type that will be used for partitioning
            lowerBound=1,                 # the minimum value of columnName used to decide partition stride
            upperBound=1000000,           # the maximum value of columnName used to decide partition stride
            numPartitions=8,              # the number of partitions/connections
            properties=connProperties     # the connection properties
        )
        .filter(col("gender") == "M")   # Filter the data by gender
       )

ppDF.explain()

Note the lack of a **Filter** and the presence of a **PushedFilters** in the **Scan**. The filter operation is pushed to the database and only the matching records are sent to Spark. This can greatly reduce the amount of data that Spark needs to ingest.

### No Predicate Pushdown

In comparison, caching the data before filtering eliminates the possibility for the predicate push down.

In [0]:
cachedDF = (spark
            .read
            .jdbc(
                url=jdbcURL,
                table="training.people_1m",
                column="id",
                lowerBound=1,
                upperBound=1000000,
                numPartitions=8,
                properties=connProperties
            )
           )

cachedDF.cache()
filteredDF = cachedDF.filter(col("gender") == "M")

filteredDF.explain()

In addition to the **Scan** (the JDBC read) we saw in the previous example, here we also see the **InMemoryTableScan** followed by a **Filter** in the explain plan.

This means Spark had to read ALL the data from the database and cache it, and then scan it in cache to find the records matching the filter condition.

Remember to clean up after ourselves!

In [0]:
cachedDF.unpersist()

### Clean up classroom

In [0]:
%run ./Includes/Classroom-Cleanup

-sandbox
&copy; 2022 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="https://help.databricks.com/">Support</a>