In [1]:
import pandas as pd
import numpy as np
import panel as pn
import json
import param
import hvplot.pandas

pn.extension('tabulator')
pd.set_option('display.max_columns', 500)

# Load and Transform Data

In [2]:
def convert_elapsed_time_to_minutes(elapsedTime):
    if elapsedTime[-2:] == "ms":
        time = elapsedTime[:-2]
        return float(time) / 60**2
    unit = elapsedTime[-1:]
    time = float(elapsedTime[:-1])
    if unit == 's':
        return time / 60
    elif unit == 'm':
        return time
    elif unit == 'h':
        return time * 60
    
def convert_byte_to_gigabyte(value):
    byte = float(value[:-1])
    return byte / 10**9

# This is not all the columns with bytes. It excludes columns that contains failed query stats properties
columns_with_bytes = [
    'userMemoryReservation',
    'totalMemoryReservation',
    'peakUserMemoryReservation',
    'peakTotalMemoryReservation',
    'peakTaskUserMemory',
    'peakTaskTotalMemory',
    'physicalInputDataSize',
    'internalNetworkInputDataSize',
    'rawInputDataSize',
    'physicalWrittenDataSize',
    'revocableMemoryReservation',
    'processedInputDataSize',
    'peakRevocableMemoryReservation',
    'peakTaskRevocableMemory',
]

def load_data():
    df = pd.read_csv('_select_queryid_json_parse_querystats_as_queryStats_query_state__202312141659.csv', converters={'queryStats':json.loads}, sep="|")
    new_df = df.join(pd.json_normalize(df["queryStats"]))
    new_df.drop(columns=[
        'queryStats', 
        'stageGcStatistics', 
        'operatorSummaries', 
        'dynamicFiltersStats.dynamicFilterDomainStats',
        'dynamicFiltersStats.dynamicFiltersCompleted',
        'dynamicFiltersStats.lazyDynamicFilters',
        'dynamicFiltersStats.replicatedDynamicFilters',
        'dynamicFiltersStats.totalDynamicFilters',
    ], inplace=True)
    new_df['elapsedTime'] = new_df['elapsedTime'].apply(convert_elapsed_time_to_minutes)
    for col in columns_with_bytes:
        new_df[col] = new_df[col].apply(convert_byte_to_gigabyte)
    return new_df

# cache data to improve dashboard performance
if 'data' not in pn.state.cache.keys():

    df = load_data()

    pn.state.cache['data'] = df.copy()

else: 

    df = pn.state.cache['data']

In [4]:
df.tail()

Unnamed: 0,queryid,query_state,cluster_name,analysisTime,blockedDrivers,blockedReasons,completedDrivers,completedTasks,createTime,cumulativeUserMemory,dispatchingTime,elapsedTime,executionStartTime,executionTime,failedCpuTime,failedCumulativeUserMemory,failedInputBlockedTime,failedInternalNetworkInputDataSize,failedInternalNetworkInputPositions,failedOutputBlockedTime,failedOutputDataSize,failedOutputPositions,failedPhysicalInputDataSize,failedPhysicalInputPositions,failedPhysicalInputReadTime,failedPhysicalWrittenDataSize,failedProcessedInputDataSize,failedProcessedInputPositions,failedRawInputDataSize,failedRawInputPositions,failedScheduledTime,failedTasks,finishingTime,fullyBlocked,inputBlockedTime,internalNetworkInputDataSize,internalNetworkInputPositions,lastHeartbeat,logicalWrittenDataSize,outputBlockedTime,outputDataSize,outputPositions,peakRevocableMemoryReservation,peakTaskRevocableMemory,peakTaskTotalMemory,peakTaskUserMemory,peakTotalMemoryReservation,peakUserMemoryReservation,physicalInputDataSize,physicalInputPositions,physicalInputReadTime,physicalWrittenDataSize,planningTime,processedInputDataSize,processedInputPositions,progressPercentage,queuedDrivers,queuedTime,rawInputDataSize,rawInputPositions,resourceWaitingTime,revocableMemoryReservation,runningDrivers,runningTasks,scheduled,spilledDataSize,totalBlockedTime,totalCpuTime,totalDrivers,totalMemoryReservation,totalScheduledTime,totalTasks,userMemoryReservation,writtenPositions,endTime
9995,20231204_101613_05621_7cehs,FINISHED,trino-analytics-git-ed2e0fb-3032,666.86ms,8264,[],38640,28,2023-12-04T10:16:13.390Z,4.696416e+16,37.65us,58.94,2023-12-04T10:16:14.182Z,58.93m,0.00ns,0.0,0.00ns,0B,0,0.00ns,0B,0,0B,0,0.00ns,0B,0B,0,0B,0,0.00ns,0,0.00ms,False,100.98d,55.850575,2445286228,2023-12-04T11:15:08.966Z,0B,3.84d,0B,0,2.175153,0.054379,1.002211,1.002211,16.24807,14.072918,23.338934,1460932692,1.43h,0.0,1.15m,44.021001,1460932692,63.726622,13698,126.00ms,23.338934,1460932692,666.89ms,2.175153,32,191,True,0B,333.32d,3.68h,60634,16.037355,1.30d,219,13.862202,0,
9996,20231204_101613_05621_7cehs,FINISHED,trino-analytics-git-ed2e0fb-3032,666.86ms,8309,[],27089,17,2023-12-04T10:16:13.390Z,1.026045e+16,37.65us,13.96,2023-12-04T10:16:14.182Z,13.94m,0.00ns,0.0,0.00ns,0B,0,0.00ns,0B,0,0B,0,0.00ns,0B,0B,0,0B,0,0.00ns,0,0.00ms,False,22.20d,33.07911,1453061291,2023-12-04T10:30:10.747Z,0B,2.64d,0B,0,1.500169,0.038657,1.002211,1.002211,15.05741,13.874627,16.178732,946151095,54.19m,0.0,1.15m,28.214951,946151095,44.676254,25204,126.00ms,16.178732,946151095,666.89ms,1.500169,32,202,True,0B,74.93d,1.46h,60634,15.013399,6.68h,219,13.51323,0,
9997,20231204_101613_05621_7cehs,FINISHED,trino-analytics-git-ed2e0fb-3032,666.86ms,8253,[],42503,38,2023-12-04T10:16:13.390Z,6.258861e+16,37.65us,79.2,2023-12-04T10:16:14.182Z,1.31h,0.00ns,0.0,0.00ns,0B,0,0.00ns,0B,0,0B,0,0.00ns,0B,0B,0,0B,0,0.00ns,0,0.00ms,False,135.97d,63.233378,2770429438,2023-12-04T11:35:06.961Z,0B,4.14d,0B,0,2.176311,0.054518,1.002211,1.002211,16.24807,14.072918,25.284497,1613964547,1.61h,0.0,1.15m,48.75069,1613964547,70.097635,9845,126.00ms,25.284497,1613964547,666.89ms,2.176311,33,181,True,0B,447.57d,4.62h,60634,14.934248,1.75d,219,12.757938,0,
9998,20231204_101613_05621_7cehs,FINISHED,trino-analytics-git-ed2e0fb-3032,666.86ms,8246,[],45472,46,2023-12-04T10:16:13.390Z,7.419956e+16,37.65us,94.2,2023-12-04T10:16:14.182Z,1.57h,0.00ns,0.0,0.00ns,0B,0,0.00ns,0B,0,0B,0,0.00ns,0B,0B,0,0B,0,0.00ns,0,0.00ms,False,162.38d,67.900347,2988261837,2023-12-04T11:50:11.019Z,0B,4.26d,0B,0,2.189743,0.055074,1.002211,1.002211,16.24807,14.072918,26.643098,1719821351,1.74h,0.0,1.15m,52.053024,1719821351,74.994228,6884,126.00ms,26.643098,1719821351,666.89ms,2.189743,32,173,True,0B,533.99d,5.34h,60634,15.251443,2.08d,219,13.0617,0,
9999,20231204_101618_16412_byunn,FINISHED,trino-etl-git-ed2e0fb-3032,273.25ms,65,[],0,0,2023-12-04T10:16:18.237Z,0.0,43.31us,3.79,2023-12-04T10:16:18.607Z,3.79m,0.00ns,0.0,0.00ns,0B,0,0.00ns,0B,0,0B,0,0.00ns,0B,0B,0,0B,0,0.00ns,0,0.00ms,False,0.00ns,0.0,0,2023-12-04T10:20:05.139Z,0B,0.00ns,0B,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0,0.00ns,0.0,42.21ms,0.0,0,0.0,0,97.20ms,0.0,0,273.30ms,0.0,1,2,True,0B,4.08h,1.00ms,66,0.0,1.00ms,2,0.0,0,


# Global Input Widgets

In [5]:
query_state_input_widget = pn.widgets.Select(name="Query State", options=["FAILED", "FINISHED"])
cluster_input_widget = pn.widgets.Select(name="Cluster", options=["adhoc", "etl", "analytics"])
queryid_input_widget = pn.widgets.Select(name="Query ID", options=list(df[df['query_state'] == "FAILED"]["queryid"].unique()))

@pn.depends(query_state_input_widget.param.value, cluster_input_widget.param.value, watch=True)
def update_query_ids(query_state_input_widget,cluster_input_widget):
    filtered_df = df[
        (df['query_state'] == query_state_input_widget) & 
        (df['cluster_name'].str.contains(cluster_input_widget))
    ]

    query_ids = list(filtered_df["queryid"].unique())
    queryid_input_widget.options = query_ids
    queryid_input_widget.value = "" if len(query_ids) == 0 else query_ids[0]

header = pn.Row(query_state_input_widget, cluster_input_widget, queryid_input_widget)
header

# Column Over Elapsed Time Section

## Input Widgets

In [6]:
columns = [
       'blockedDrivers', 'completedDrivers',
       'completedTasks',
       'failedOutputDataSize', 'failedOutputPositions',
       'failedPhysicalInputDataSize', 'failedPhysicalInputPositions',
       'failedPhysicalInputReadTime', 'failedPhysicalWrittenDataSize',
       'failedProcessedInputDataSize', 'failedProcessedInputPositions',
       'failedRawInputDataSize', 'failedRawInputPositions',
       'failedTasks',
       'internalNetworkInputDataSize',
       'internalNetworkInputPositions',
       'logicalWrittenDataSize',
       'outputDataSize', 'outputPositions', 'peakRevocableMemoryReservation',
       'peakTaskRevocableMemory', 'peakTaskTotalMemory', 'peakTaskUserMemory',
       'peakTotalMemoryReservation', 'peakUserMemoryReservation',
       'physicalInputDataSize', 'physicalInputPositions',
       'physicalInputReadTime', 'physicalWrittenDataSize',
       'processedInputDataSize', 'processedInputPositions',
       'progressPercentage', 'rawInputDataSize',
       'rawInputPositions',
       'revocableMemoryReservation', 'runningDrivers', 'runningTasks',
       'spilledDataSize', 'totalBlockedTime',
       'totalCpuTime', 'totalDrivers', 'totalMemoryReservation',
        'totalTasks', 'userMemoryReservation',
       'writtenPositions']
y_axis_input_widget = pn.widgets.Select(name="Y Axis", value="progressPercentage", options=columns)

## Plot Function

In [7]:
def create_column_over_time_plot(query_id="", y_axis="progressPercentage"):
    df_from_query_id = df[df['queryid'] == query_id].sort_values(by="elapsedTime")
    return df_from_query_id.hvplot(
        x='elapsedTime',
        y=y_axis,line_width=2,
        legend=False,
        width=700
    )

## Table

In [8]:
def create_column_over_time_table(query_id="", y_axis="progressPercentage"):
    df_from_query_id = df[df['queryid'] == query_id].sort_values(by="elapsedTime")
    res_df = pd.concat([df_from_query_id[y_axis], df_from_query_id["elapsedTime"], df_from_query_id["queryid"]], axis=1)
    return pn.widgets.Tabulator(res_df, page_size=10, pagination="remote")

In [9]:
over_time_section_name = pn.pane.Markdown("""
## Column Over time
""")

bound_column_over_time_plot = pn.bind(
    create_column_over_time_plot, 
    query_id = queryid_input_widget,
    y_axis = y_axis_input_widget,
)

bound_column_over_time_table = pn.bind(
    create_column_over_time_table, 
    query_id = queryid_input_widget,
    y_axis = y_axis_input_widget,
)

column_over_time_section = pn.Column(over_time_section_name, y_axis_input_widget, pn.Row(bound_column_over_time_plot, bound_column_over_time_table))
column_over_time_section

# Change in Column Values Over Time

## Input Widgets

In [10]:
options = [
    "progressPercentage",
    "processedInputPositions",
    'userMemoryReservation',
    'totalMemoryReservation',
    'peakUserMemoryReservation',
    'peakTotalMemoryReservation',
    'peakTaskUserMemory',
    'peakTaskTotalMemory',
    'physicalInputDataSize',
    'internalNetworkInputDataSize',
    'rawInputDataSize',
    'physicalWrittenDataSize',
    'revocableMemoryReservation',
    'processedInputDataSize',
    'peakRevocableMemoryReservation',
    'peakTaskRevocableMemory',
]
options.sort()
column_input_widget = pn.widgets.Select(name="Column", value="progressPercentage", options=options)

## Plot Function

In [11]:
def create_delta_graph(query_id="", delta_col="progressPercentage"):
    df_from_query_id = df[df['queryid'] == query_id].sort_values(by="elapsedTime")
    delta_df = df_from_query_id[delta_col].diff().fillna(0)
    new = pd.concat([delta_df, df_from_query_id["elapsedTime"]], axis=1)
    return new.hvplot(x="elapsedTime", y=delta_col, line_width=2,legend=False,width=700)


## Table Function

In [12]:
def create_delta_table(query_id="", delta_col="progressPercentage"):
    df_from_query_id = df[df['queryid'] == query_id].sort_values(by="elapsedTime")
    delta_df = df_from_query_id[delta_col].diff().fillna(0)
    new = pd.concat([delta_df, df_from_query_id["elapsedTime"]], axis=1)
    return pn.widgets.Tabulator(new, page_size=10, pagination="remote")


In [13]:
delta_col_section_name = pn.pane.Markdown("""
## Change in Column Over time
""")
bound_delta_graph = pn.bind(create_delta_graph, query_id=queryid_input_widget, delta_col=column_input_widget)
bound_delta_table = pn.bind(create_delta_table, query_id=queryid_input_widget, delta_col=column_input_widget)

delta_column_section = pn.Column(delta_col_section_name, column_input_widget, pn.Row(bound_delta_graph, bound_delta_table))
delta_column_section

## Quick Stats

In [14]:
def stats_block(value, column):
    val = pn.widgets.StaticText(value=value, styles={'font-size': '30px', 'margin': 'auto'})
    block = pn.Card(val, title=column, styles={'margin-right': '10px'})
    return block

def create_stats_blocks(query_id=""):
    df_from_query_id = df[df['queryid'] == query_id].sort_values(by="elapsedTime")
    stats_blocks = []
    
    # Elapsed Time Stats Block
    elapsed_time = round(df_from_query_id["elapsedTime"].max(), 3)
    stats_blocks.append(stats_block(f'{elapsed_time} mins', "elapsedTime"))
    
    gigabytes_cols = [
        'peakTaskUserMemory',
        'peakTaskTotalMemory',
        'rawInputDataSize',
        'physicalInputDataSize'
    ]
    for col in gigabytes_cols:
        max_val = round(df_from_query_id[col].max(), 3)
        stats_blocks.append(stats_block(f'{max_val} GB', col))
    
    return pn.Row(*stats_blocks)

bound_stats_blocks = pn.bind(create_stats_blocks, query_id=queryid_input_widget)
quick_stats = pn.Row(bound_stats_blocks)

# Layout Template

In [15]:
#Layout using Template
template = pn.template.FastListTemplate(
    title='Queries Data Visualization Dashboard', 
    main=[
        header,
        quick_stats,
        column_over_time_section,
        delta_column_section
    ],
    accent_base_color="#88d8b0",
    header_background="#88d8b0",
)
template.show()
# template.servable();

Launching server at http://localhost:50579


<panel.io.server.Server at 0x1bbb7b3d0>

