In [None]:
from pyspark.sql import SparkSession

import pyspark.sql.functions as F
from datetime import datetime
import bokeh
import bokeh.io
import bokeh.plotting
from datetime import datetime as dt
import dateutil.parser
import shutil
import ipywidgets as widgets
from IPython.display import Javascript
import os
from pyspark.sql.session import SparkSession
from bokeh.embed import components
bokeh.io.output_notebook(hide_banner=False)
from bokeh.plotting import figure, show
from bokeh.palettes import brewer, d3
from pyspark.sql.types import StructType
import json


spark = SparkSession.builder.config("spark.driver.memory", "2G").getOrCreate()

In [None]:
measures = [
        'worker_id',
        'triggerExecution',
        'commitTime',
        'memoryUsedMiB',
        'diskUsedGiB',
        #'rocksdbSstFileSizeGiB',
        #'allUpdatesTime',
        #'allRemovalsTime',
        'numRowsTotal'
        ]

def get_path(experiment):
    path = f"{os.getcwd()}/telemetry/{experiment}.log.json"
    if os.path.exists(path):
        return f"file://{path}"
    else:
        return f"file://{path}.gz"

def read_metric(experiment):
    schema = None
    with open("metric_schema.json", "r") as f:
        data = f.read()
        json.loads(data)
        schema = StructType.fromJson(json.loads(data))

    spark.read.json(get_path(experiment), schema=schema).createOrReplaceTempView("metrics_table")
    
    df = spark.sql(f"""
    --start-sparksql
    select ts,
    experiment,
        triggerExecution,
        memoryUsedMiB,
        commitTime,
        worker_id,
        numRowsTotal,
        jvmMemoryUsedMiB,
        numPut,
        numGet,
        serializationTime,
        deserializationTime,
        sortTime,
        putAndGetTagTime,
        numPut / putAndGetTagTime as numPutPerSec,
        numGet / putAndGetTagTime as numGetPerSec
        from (
        select
            *,
            timestamp(timestamp) as ts,
            '{experiment}' as experiment,
            durationMs.triggerExecution / 1000 as triggerExecution,
            s.memoryUsedBytes,
            s.memoryUsedBytes / 1024 / 1024 as memoryUsedMiB,
            s.commitTimeMs,
            s.commitTimeMs / 1000 as commitTime,
            disk_used / 1024 / 1024 / 1024 as diskUsedGiB, 
            coalesce(s.customMetrics.putTagCount, 0) / 16 as numPut,
            coalesce(s.customMetrics.getTagCount, 0) / 16 as numGet,
            coalesce(s.customMetrics.sortTimer, 0) / 1000 / 16 as sortTime,
            coalesce(s.customMetrics.toStateTimer, 0) / 1000 / 16 as serializationTime,
            coalesce(s.customMetrics.fromStateTimer, 0) / 1000 / 16 as deserializationTime,
            coalesce(s.customMetrics.updateTagCacheTimer, 0) / 1000 / 16 as putAndGetTagTime,
            s.customMetrics.rocksdbSstFileSize,
            s.customMetrics.rocksdbSstFileSize / 1024 / 1024 / 1024 as rocksdbSstFileSizeGiB,
            s.allUpdatesTimeMs,
            s.allUpdatesTimeMs / 1000 as allUpdatesTime,
            s.allRemovalsTimeMs,
            s.allRemovalsTimeMs / 1000 as allRemovalsTime,
            s.numRowsTotal,
            split(worker_name, '-')[4] as worker_id,
            executor_memory / 1024 / 1024 as jvmMemoryUsedMiB
        from
            (
                select
                    *,
                    stateOperators[0] as s
                from
                    metrics_table
                where
                    timestamp is not null
            )
        )
    --end-sparksql
    """)

    return df



def apply_sql(df, view, sql):
    df.createOrReplaceTempView(view)
    return spark.sql(sql)

def render_experiments(experiments, measures):
    dataframes = []
    for experiment in experiments:
        df = read_metric(experiment)
        #df.persist()
        dataframes.append(df)
    for measure in measures:
        if measure == 'worker_id':
            render_experiment(experiments, dataframes, measure, 'circle')
        else:
            render_experiment(experiments, dataframes, measure, 'line')
    for df in dataframes:
        df.unpersist()
            
def render_experiment(experiments, dataframes, measure, plot_type):
    p = figure(x_axis_label='ts', y_axis_label=measure, x_axis_type='datetime', plot_width=1200, plot_height=400)
    for idx, experiment in enumerate(experiments):
        df = dataframes[idx]
        x = [r['ts'] for r in df.collect()]
        y = [r[measure] for r in df.collect()]
        color = d3['Category20'][20][idx]
        if plot_type == 'line':
            p.line(x, y, line_color=color, legend_label=experiment, line_width=2)
        else:
            p.circle(x, y, color=color, legend_label=experiment, size=2)
    p.legend.background_fill_alpha = 0.5
    p.legend.click_policy="hide"
    show(p)

def print_workers(experiments):
    for experiment in experiments:
        view = experiment
        sql = f"""--start-sparksql
            select
                t.experiment,
                t.worker_name,
                split(t.worker_name, '-')[4] as worker_id,
                min(t.timestamp) as min_timestamp,
                max(t.timestamp) as max_timestamp
            from
                {view} as t
            group by
                t.experiment, t.worker_name
            order by
                max_timestamp asc
        --end-sparksql"""
        df = read_metric(experiment)
        df = apply_sql(df, view, sql)
        df.show(truncate=False)

In [None]:
measures = [
        'triggerExecution',
        'jvmMemoryUsedMiB',
        'commitTime',
        'memoryUsedMiB',
        #'worker_id',
        #'numRowsTotal',
        'numPutPerSec',
        'numGetPerSec',
        'putAndGetTagTime',
        'numPut',
        'numGet',
        'serializationTime', 
        'deserializationTime', 
        'sortTime'
]
#bloom_r5000_p16_a50000_b50_f200000002_t60_x45G_hdfs -> 1 version
#bloom_r5000_p16_a50000_b50_f200000003_t60_x45G_hdfs -> 0 version

experiments = [
#     "bloom_r5000_p16_a50000_b500_f20000000_t60_x45G_hdfs_javaser",
#     "bloom_r5000_p16_a50000_b50000_f200000_t60_x45G_hdfs_javaser",
#     "bloom_r5000_p16_a50000_b500_f20000000_t60_x45G_hdfs",
#     "bloom_r5000_p16_a50000_b50000_f200000_t60_x45G_hdfs",
#     "bloom_r5000_p16_a50000_b50000_f300000_t60_x45G_hdfs",
    
    #"bloom_r10000_p16_a50000_b500_f30000000_t60_x30G_fluxstore",
    #"bloom_r10000_p16_a50000_b500_f50000000_t60_x45G_fluxstore",
    #"bloom_r10000_p16_a50000_b500_f40000000_t60_x45G_fluxstore",
    #"bloom_r10000_p16_a50000_b500_f5000000_t60_x45G_fluxstore",
    #"bloom_r10000_p16_a50000_b50000_f500000_t60_x45G_fluxstore",
    #"bloom_r5000_p16_a50000_b50000_f500000_t60_x45G_fluxstore",
    #"bloom_r10000_p16_a50000_b500_f60000000_t60_x45G_fluxstore",
    #"bloom_r10000_p16_a50000_b50000_f200000_t60_x45G_fluxstorecomp",
    #"bloom_r10000_p16_a50000_b50000_f200000_t60_x45G_fluxstore",
    #"bloom_r10000_p16_a50000_b50000_f200000_t60_x45G_hdfs",
    
    #"bloom_r5000_p16_a50000_b50000_f200000_t60_x45G_hdfs",
    #"bloom_r20000_p16_a50000_b50000_f200000_t60_x45G_fluxstore",
    #"bloom_r5000_p16_a50000_b50000_f600000_t60_x45G_fluxstore",
    
#     "bloom_r5000_p16_a50000_b50000_f200000_t60_x45G_hdfs",
#     "bloom_r5000_p16_a50000_b50000_f300000_t60_x45G_fluxstore",
#     "bloom_r8000_p16_a50000_b50000_f200000_t60_x45G_fluxstore",
    
#     "bloom_r5000_p16_a50000_b500_f20000000_t60_x45G_hdfs",
#     "bloom_r5000_p16_a50000_b500_f30000000_t60_x45G_fluxstore",
#     "bloom_r8000_p16_a50000_b500_f20000000_t60_x45G_fluxstore",
    
#     "bloom_r5000_p16_a50000_b500_f30000000_t60_x45G_fluxstorecomp",
#     "bloom_r8000_p16_a50000_b500_f20000000_t60_x45G_fluxstorecomp",
    
    #"bloom_r10000_p16_a50000_b50000_f400000_t60_x45G_fluxstorecomp",
    "bloom_r10000_p16_a50000_b50000_f400000_t30_x45G_fluxstorecomp",
    "bloom_r15000_p16_a50000_b50000_f400000_t10_x45G_fluxstorecomp",
]

print(experiments)
render_experiments(experiments, measures)
