
# Monitoring Performance of Jobs

Optimizing performance from a job and cluster perspective is a critical part of Production Delta Lake Performance. We often use a variety of methods listed in the table below to monitor jobs performance.

|Feature|Use|Link|
|-------|---|----|
|[Ganglia ]Metrics: Cluster Tuning | Controlling the knobs associated with seeking out maximal performance|https://docs.databricks.com/clusters/configure.html, https://www.youtube.com/watch?v=9fa8dnKbfsU|
|Spark UI: DAG Tuning | Controlling the stages associated with a Spark Job and a Query Execution Plan | https://databricks.com/session/understanding-query-plans-and-spark-uis|
|Logs | Spark Driver and Cluster Logs that provide execution and autoscaling details | https://docs.databricks.com/spark/latest/rdd-streaming/debugging-streaming-applications.html|
|EXPLAIN: Physical Plan | physical plan provides the fundamental information about the execution of the query |https://docs.databricks.com/sql/language-manual/sql-ref-syntax-qry-explain.html |

Useful blogs to understand how Delta Lake performance tuning works are listed below.
* [Photon, The Next Generation Query Engine on the Databricks Lakehouse Platform](https://www.databricks.com/blog/2021/06/17/announcing-photon-public-preview-the-next-generation-query-engine-on-the-databricks-lakehouse-platform.html)
* [Faster MERGE Performance With Low-Shuffle MERGE and Photon](https://www.databricks.com/blog/2022/10/17/faster-merge-performance-low-shuffle-merge-and-photon.html)
* [Understanding your Apache Spark Application Through Visualization](https://www.databricks.com/blog/2015/06/22/understanding-your-spark-application-through-visualization.html)


# Cluster Performance

Cluster performance tuning is an important step when quantifying delta performance. The distributed nature of Delta and Spark allow great horizontal scaling by adding more nodes to meet performance SLAs. Generally speaking, leverage autoscaling on Spark clusters to reduce costs and tackle peak throughput workloads.


## Live Metrics - Metrics UI

Databricks cluster performance can be observed in the Ganglia UI (or Metrics UI) which runs live on the cluster. Ganglia metrics are available for Databricks Runtime 12.2 and below and the metrics replacement is now enabled on clusters with DBR 13.

<img src='https://raw.githubusercontent.com/morganmazouchi/Performance-with-Photon/main/Images/Ganglia%20UI.png' width="1500">


# Spark UI

Databricks exposes the Spark UI which will provide a large amount of usable statistics to measure the performance of your jobs. Every job in Spark consists of a series of spark tasks (stages) which form a directed acyclic graph (DAG). Examining these DAGs can help identify bottleneck stages to determine where more performance can be extracted. 

<img src='https://raw.githubusercontent.com/morganmazouchi/Performance-with-Photon/main/Images/Spark%20UI-%20Job%20159.png' width="800">
<img src='https://raw.githubusercontent.com/morganmazouchi/Performance-with-Photon/main/Images/Spark%20UI-%20Job%20166%20-%20No%20Photon.png' width="1300">


## Speed up queries by identifying execution bottlenecks in Query Plans
A common methodology for speeding up queries is to first identify the longest running query operators. We are more interested in total time spent on a task rather than the exact “wall clock time” of an operator as we’re dealing with a distributed system and operators can be executed in parallel. Each query operator comes with a slew of statistics. In the case of a scan operator, metrics include number of files or data read, time spent waiting for cloud storage or time spent reading files. As a result, it is easy to answer questions such as which table should be optimized or whether a join could be improved. All blue DAGs in the query plan confirms that photon was disabled when the query ran. 

<img src='https://raw.githubusercontent.com/morganmazouchi/Performance-with-Photon/main/Images/Databricks%20Shell%20-%20Details%20for%20Query%20299.png' width="1200">

In [0]:
%run ./00-setup $mode="reuse"


username: mojgan.mazouchi@databricks.com
userhome: dbfs:/user/mojgan.mazouchi@databricks.com/PhotonPerformance
database: PhotonPerformance_mojgan_mazouchi_databricks_com_db


In [0]:
spark.conf.set("spark.databricks.photon.enabled", "false")
spark.conf.set("spark.databricks.photon.parquetWriter.enabled", "false")
spark.conf.set("spark.databricks.photon.window.enabled", "false")
spark.conf.set("spark.databricks.photon.sort.enabled", "false")
spark.conf.set("spark.databricks.photon.window.experimental.features.enabled", "false")

In [0]:
%scala
spark.sql("""EXPLAIN SELECT
  T_len.EmpLength,
  T_rate.IntRate,
  count(DISTINCT T.addr_state) cnt_loan_by_state,
  avg(loan_amnt) avg_loan_by_state,
  min(DISTINCT annual_inc) as min_annual_income,
  max(DISTINCT annual_inc) as max_annual_income,
  sum(total_pymnt) totalPayment_by_state
FROM
  LendingClub_silver T
  LEFT JOIN 
  (SELECT row_number() OVER(PARTITION BY addr_state ORDER BY avg_cur_bal DESC) as row_num_avgBal_state, *
  FROM LendingClub_EmpLength) T_len on T_len.emp_length = T.emp_length and T_len.avg_cur_bal BETWEEN 1 AND 2000
  LEFT JOIN LendingClub_IntRate T_rate on T_rate.int_rate = T.int_rate
WHERE
  (annual_inc> 16000) AND loan_status == "Current"
GROUP BY
  1,
  2
HAVING EmpLength IN ('3-5Years', '1year', 'Under1year')""").collect().foreach(println)

## Photon execution analysis in Query Plan
If you are using Photon on Databricks clusters, you can view Photon action in the Spark UI. The following screenshot shows the query details DAG. There are two indications of Photon in the DAG. First, Photon operators start with Photon, such as PhotonGroupingAgg. Secondly, in the DAG Photon operators and stages are colored orange, whereas the non-Photon ones are blue.

<img src='https://raw.githubusercontent.com/morganmazouchi/Performance-with-Photon/main/Images/Databricks%20Shell%20-%20Details%20for%20Query%20272.png' width="1800">

In [0]:
#Enable photon and it's support for sort and window functions
spark.conf.set("spark.databricks.photon.enabled", "true")
spark.conf.set("spark.databricks.photon.parquetWriter.enabled", "true")
spark.conf.set("spark.databricks.photon.window.enabled", "true")
spark.conf.set("spark.databricks.photon.sort.enabled", "true")
spark.conf.set("spark.databricks.photon.window.experimental.features.enabled", "true")

In [0]:
%scala
spark.sql("""EXPLAIN SELECT
  T_len.EmpLength,
  T_rate.IntRate,
  count(DISTINCT T.addr_state) cnt_loan_by_state,
  avg(loan_amnt) avg_loan_by_state,
  min(DISTINCT annual_inc) as min_annual_income,
  max(DISTINCT annual_inc) as max_annual_income,
  sum(total_pymnt) totalPayment_by_state
FROM
  LendingClub_silver T
  LEFT JOIN 
  (SELECT row_number() OVER(PARTITION BY addr_state ORDER BY avg_cur_bal DESC) as row_num_avgBal_state, *
  FROM LendingClub_EmpLength) T_len on T_len.emp_length = T.emp_length and T_len.avg_cur_bal BETWEEN 1 AND 100
  LEFT JOIN LendingClub_IntRate T_rate on T_rate.int_rate = T.int_rate
WHERE
  (annual_inc> 16000) AND loan_status == "Current"
GROUP BY
  1,
  2
HAVING EmpLength IN ('3-5Years', '1year', 'Under1year')""").collect().foreach(println)

### Photon-Enabled Clusters

By enabling the advice text (`set spark.databricks.adviceGenerator.acceleratedWithPhoton.enabled = true;`), you can trace photon-enabled clusters logs in the INFO section of Driver logs under Log4j output. Look specifically for "Accelerated with photon" in the logs to find out how much your queries and workloads accelerated by photon.

<img alt="Caution" title="Caution" style="vertical-align: text-bottom; position: relative; height:1.3em; top:0.0em" src="https://files.training.databricks.com/static/images/icon-warning.svg"/> 
** Advice text is disabled by default**, and you have to enable it in advance, prior to running your queries.

<img src='https://raw.githubusercontent.com/morganmazouchi/Performance-with-Photon/main/Images/log%204j%20output.png' width="2500">

In [0]:
%sql

set spark.databricks.adviceGenerator.acceleratedWithPhoton.enabled = true;

key,value
spark.databricks.adviceGenerator.acceleratedWithPhoton.enabled,True


In [0]:
%sql
SELECT
  T_len.EmpLength,
  T_rate.IntRate,
  count(DISTINCT T.addr_state) cnt_loan_by_state,
  avg(loan_amnt) avg_loan_by_state,
  min(DISTINCT annual_inc) as min_annual_income,
  max(DISTINCT annual_inc) as max_annual_income,
  sum(total_pymnt) totalPayment_by_state
FROM
  LendingClub_silver T
  LEFT JOIN 
  (SELECT row_number() OVER(PARTITION BY addr_state ORDER BY avg_cur_bal DESC) as row_num_avgBal_state, *
  FROM LendingClub_EmpLength) T_len on T_len.emp_length = T.emp_length and T_len.avg_cur_bal BETWEEN 1 AND 10
  LEFT JOIN LendingClub_IntRate T_rate on T_rate.int_rate = T.int_rate
WHERE
  (annual_inc> 16000) AND loan_status == "Current"
GROUP BY
  1,
  2
HAVING EmpLength IN ('3-5Years', '1year', 'Under1year')

EmpLength,IntRate,cnt_loan_by_state,avg_loan_by_state,min_annual_income,max_annual_income,totalPayment_by_state
1year,HighRate,50,15511.792100074785,16144.0,6000000.0,1684574234.8221097
Under1year,HighRate,50,15980.667546948356,16112.0,650000.0,1687129366.610441
3-5Years,MediumRate,50,14170.33729233472,16215.0,6200000.0,7440206431.133909
Under1year,MediumRate,50,14203.34233557374,16200.0,1300001.0,3668164600.199508
3-5Years,ExtremelyHighRate,50,17773.226341758895,16248.0,1100000.0,1239034098.0444412
1year,StandardRate,50,13561.58000986254,16380.0,1848400.0,3045121903.0125365
Under1year,ExtremelyHighRate,49,17769.116921558954,16500.0,585000.0,569117572.940545
Under1year,lowRate,50,14589.907453726864,19000.0,780000.0,309534402.7199998
3-5Years,lowRate,50,14139.463474827246,20000.0,1574060.0,630242833.5599998
3-5Years,HighRate,50,16039.83564721813,16034.0,3000000.0,3452779463.6565747
