# Workload Insights Notebook

This notebook analyzes the denormalized representation of query workload. This denormalized dataset contains the information from applications, queries, plans, and runtime metrics. The plans were also annotated with signatures to identify subexpressions. We hope that the analysis in the notebook helps you in making data-driven decisions.


## Features
1. Sample Rows
3. Query Count
4. Operator Frequencies
5. Overlapping Queries
6. Common Subexpressions Per Operator
7. Selected Views
8. SparkCruise Savings
9. Filter Selectivity
10. Exchange Operator Statistics
11. Recurring Jobs

## Setup

In [None]:
from bokeh.plotting import figure, output_file, show
from bokeh.embed import file_html
from bokeh.resources import CDN
import pandas as pd

artifactDir = "/peregrine/"
schema = "AppID: string, AppName: string, NormAppName: string, UserName: string, ClusterName: string, Subscription: string, AppSubmitTime: long, AppStartTime: long, AppEndTime: long, AppWallClockTime: long, QueryID: long, AppQueryID: string, QueryStartTime: long, QueryEndTime: long, QueryWallClockTime: long, OperatorName: string, OperatorID: long, TreeLevel: long, ParentID: long, ChildCount: long, LogicalName: string, StrictSignature: string, NonStrictSignature: string, PRowCount: long, PExclusiveTime: long, PSerialTime: long, Parameters: string, Bytes: long, RowCount: long, ExclusiveTime: long, MaxMemory: long, InputCard: long, AvgRowLength: long, InputDataset: string"
logicalExps = spark.read.format("csv").option("sep", "|").option("schema", schema).option("header", "true").load(artifactDir + "/logical_ir")
logicalExps.createOrReplaceTempView("LogicalExps")
physicalExps = spark.read.format("csv").option("sep", "|").option("schema", schema).option("header", "true").load(artifactDir + "/physical_ir")
physicalExps.createOrReplaceTempView("PhysicalExps")
analysisExps = spark.sql("SELECT * FROM LogicalExps WHERE ChildCount > 0")
analysisExps.createOrReplaceTempView("AnalysisExps")
repeatSubexps = spark.sql("SELECT StrictSignature FROM AnalysisExps GROUP BY StrictSignature HAVING COUNT(DISTINCT AppQueryID) > 1")
repeatSubexps.createOrReplaceTempView("RepeatSubexps")

## Sample Rows
Example of records in intermediate representation.


In [None]:
sampleQuery = """
SELECT AppID, AppName, AppStartTime, QueryID, QueryWallClockTime, OperatorName, LogicalName, StrictSignature, PRowCount, PExclusiveTime
FROM PhysicalExps 
WHERE LENGTH(StrictSignature) > 0 AND PExclusiveTime > 0 
ORDER BY rand() 
LIMIT 10"""
sample  = spark.sql(sampleQuery)
display(sample)

## Query and Operator Count

Number of queries and operators in workload.

In [None]:
queryOpCountQuery = """
SELECT COUNT(DISTINCT AppQueryID) AS QueryCount, COUNT(*) AS OperatorCount, COUNT(DISTINCT OperatorName) AS DistinctOperators
FROM AnalysisExps"""
totalQueryOpCount  = spark.sql(queryOpCountQuery)
display(totalQueryOpCount)

## Operator Frequency
Frequency of logical and physical operators in workload.


In [None]:
logicalOpFreqQuery = """
SELECT OperatorName, COUNT(*) AS Frequency
FROM LogicalExps
GROUP BY OperatorName
ORDER BY Frequency DESC"""
logicalOpFreq  = spark.sql(logicalOpFreqQuery)
display(logicalOpFreq)

Frequency of physical operators in workload.


In [None]:
physicalOpFreqQuery = """
SELECT OperatorName, COUNT(*) AS Frequency
FROM PhysicalExps
GROUP BY OperatorName
ORDER BY Frequency DESC"""
physicalOpFreq  = spark.sql(physicalOpFreqQuery)
display(physicalOpFreq)

### Overlapping queries
Queries with overlapping computations.


In [None]:
queryCountQuery = """
SELECT COUNT(DISTINCT AppQueryID) AS QueryCount 
FROM AnalysisExps"""
queriesWithRepeatQuery = """
SELECT COUNT(DISTINCT AppQueryID) AS QueriesWithOneOverlap
FROM AnalysisExps
WHERE StrictSignature IN 
( SELECT StrictSignature
FROM RepeatSubexps )"""
queriesWithTwoRepeatsQuery = """
SELECT COUNT(*) AS QueriesWithTwoOverlaps FROM (
SELECT AppQueryID, COUNT(*) AS Repeats 
FROM AnalysisExps
WHERE StrictSignature IN 
( SELECT StrictSignature
FROM RepeatSubexps )
GROUP BY AppQueryID
HAVING Repeats > 1
ORDER BY Repeats DESC)"""
allQueryOverlapQuery = """
SELECT *, CAST((QueriesWithOneOverlap/QueryCount)*100 AS Decimal(38,2)) AS OverlapPercent 
FROM ({}) AS R1, ({}) AS R2, ({}) AS R3 """.format(queryCountQuery, queriesWithRepeatQuery, queriesWithTwoRepeatsQuery)
queryOverlap  = spark.sql(allQueryOverlapQuery)
display(queryOverlap)

## Overlapping Computations

Overlapping computations per operator.


In [None]:
opFreqQuery = """
SELECT OperatorName, COUNT(*) AS Total
FROM AnalysisExps
GROUP BY OperatorName
ORDER BY Total DESC"""
opRepeatSubexpQuery = """
SELECT OperatorName, COUNT(*) AS Repeats
FROM AnalysisExps
WHERE StrictSignature IN 
( SELECT StrictSignature
FROM RepeatSubexps )
GROUP BY OperatorName
ORDER BY Repeats DESC"""
opDistinctRepeatQuery = """
SELECT OperatorName, COUNT(*) AS DistinctRepeats
FROM ( SELECT DISTINCT OperatorName, StrictSignature 
FROM AnalysisExps
WHERE StrictSignature IN 
( SELECT StrictSignature
FROM RepeatSubexps ))
GROUP BY OperatorName
ORDER BY DistinctRepeats DESC"""
opAvgRepeatsQuery = """
SELECT R1.OperatorName, Total, Repeats, DistinctRepeats, CAST(Repeats/DistinctRepeats AS Decimal(38,2)) AS AvgRepFrequency, CAST((Repeats/Total)*100 AS Decimal(38,2)) AS RepeatPercent  
FROM ({}) AS R1, ({}) AS R2, ({}) AS R3
WHERE R1.OperatorName = R2.OperatorName AND R2.OperatorName = R3.OperatorName
ORDER BY RepeatPercent DESC""".format(opFreqQuery, opRepeatSubexpQuery, opDistinctRepeatQuery)
opAvgRepeats = spark.sql(opAvgRepeatsQuery)
display(opAvgRepeats)

## Selected Views
Per operator summary of selected views.

In [None]:
selViews = spark.read.format("csv").option("sep", "|").option("schema", schema).option("header", "true").load(artifactDir + "views_ir")
selViews.createOrReplaceTempView("SelViews")
views = spark.sql("SELECT DISTINCT StrictSignature FROM SelViews")
views.createOrReplaceTempView("Views")

In [None]:
distinctViewsQuery = """
SELECT P.LogicalName AS LogicalName, COUNT(DISTINCT P.StrictSignature) AS ViewCount
FROM Views V, PhysicalExps P
WHERE V.StrictSignature = P.StrictSignature 
GROUP BY P.LogicalName
ORDER BY ViewCount DESC"""

viewSubexprsOpsQuery = """
SELECT P.LogicalName AS LogicalName, COUNT(*) AS ViewRepeats
FROM Views V, PhysicalExps P
WHERE V.StrictSignature = P.StrictSignature 
GROUP BY P.LogicalName
ORDER BY ViewRepeats DESC"""

joinViewQuery = """
SELECT X.LogicalName, ViewCount, ViewRepeats
FROM ({}) AS X, ({}) AS Y
WHERE X.LogicalName = Y.LogicalName""".format(distinctViewsQuery,viewSubexprsOpsQuery)

joinView = spark.sql(joinViewQuery)
display(joinView)

## SparkCruise Savings
Potential savings per view.


In [None]:
avgBenefitsQuery = """
SELECT LogicalName, NumOccurrences, AvgSerialTime_ms, AvgRowCount, AvgRowLength_bytes
FROM (
SELECT P.LogicalName AS LogicalName, P.StrictSignature AS Id, COUNT(*) AS NumOccurrences, AVG(PSerialTime) AS AvgSerialTime_ms, AVG(PRowCount) AS AvgRowCount, AVG(AvgRowLength) AS AvgRowLength_bytes
FROM Views V, PhysicalExps P
WHERE V.StrictSignature = P.StrictSignature
GROUP BY P.LogicalName, P.StrictSignature)
"""

avgBenefits = spark.sql(avgBenefitsQuery)
display(avgBenefits)

## Filter Selectivity
Selectivity of filters.


In [None]:
filterSelQuery = """
SELECT F.AppQueryID, F.OperatorName AS FilterOp, F.RowCount AS PassCount, T.OperatorName AS ScanOp, F.InputCard AS TotalCount, CAST(F.RowCount/F.InputCard AS Decimal(38, 5)) AS FilterSel
FROM PhysicalExps F, PhysicalExps T 
WHERE F.AppQueryID = T.AppQueryID AND
F.OperatorName = 'FilterExec' AND 
T.ParentID = F.OperatorID AND
T.RowCount > 0 AND 
T.ChildCount = 0
ORDER BY FilterSel"""

filterSel = spark.sql(filterSelQuery).toPandas()
filterSel['pdf'] = filterSel['FilterSel']/sum(filterSel['FilterSel'])
filterSel['ecdf'] = (filterSel['pdf'].cumsum())

p = figure(plot_width=400, plot_height=400, title="Filter Selectivity")
p.line(filterSel['FilterSel'], filterSel['ecdf'], line_width=2)
p.xaxis.axis_label = "Filter Selectivity"
p.yaxis.axis_label = "CDF"
html = file_html(p, CDN, "Filter Selectivity")
displayHTML(html)

## Exchange
How many rows are shuffled in real-world Spark workloads?

In [None]:
def getBucket(rowCount):
    rows = int(rowCount)
    if rows <=1:
        return "1. 0-1"
    elif (rows > 1 and rows <= 1e3):
        return "2. 1-1k"
    elif (rows > 1e3 and rows <= 1e5):
        return "3. 1k-100k"
    elif (rows > 1e5 and rows <= 1e6):
        return "4. 100k-1M"
    elif (rows > 1e6 and rows <= 1e8):
        return "5. 1M-100M"
    elif (rows > 1e8 and rows <=  1e9):
        return "6. 100M-1B"
    else:
        return "7. >1B"

spark.udf.register("getBucket", getBucket)
exBucketQuery = """
SELECT Bucket AS RowCountBucket, CAST(AVG(MB) AS Decimal(38,2)) AS AvgDataSizeInMB, CAST(MAX(MB) AS Decimal(38,2)) AS MaxDataSizeInMB, COUNT(*) AS Count
FROM (
SELECT getBucket(PRowCount) AS Bucket, Bytes/(1024.0*1024) AS MB
FROM PhysicalExps
WHERE PRowCount > 0 AND Bytes > 0 AND OperatorName LIKE '%ShuffleExchangeExec%')
GROUP BY Bucket
ORDER BY RowCountBucket"""
exBucket = spark.sql(exBucketQuery)
display(exBucket)

## Recurring jobs
Recurring jobs share same non-strict signature at the root level. To capture temporal patterns, we can take the intersection of the result set for a few consecutive days and then split the intesection set in hourly, daily repeat patterns based on the AppSubmitTime value.

In [None]:
recurringJobsQuery = """
SELECT Subscription, NonStrictSignature, COUNT(*) AS Count
FROM AnalysisExps
WHERE TreeLevel = 0 AND LENGTH(NonStrictSignature) > 0
GROUP BY Subscription, NonStrictSignature
HAVING COUNT(*) > 1
ORDER BY Count DESC"""
recurringJobs = spark.sql(recurringJobsQuery)
display(recurringJobs)