In [None]:
import pandas as pd
import numpy as np

from scipy import stats

import json
import clustering
from clustering.what_if_model import WhatIfModel
from clustering.pqp_input_parser import PQPInputParser
from clustering.util import create_model
import autoreload
import clustering.evaluation

%load_ext autoreload
%autoreload 2
pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_rows', None)

In [None]:
BASE_PATH_TPCH = "~/Dokumente/repos/example_plugin/stats/final/tpch/sf10-2d"
TPCH_ORDERKEY = f"{BASE_PATH_TPCH}/l_orderkey"
TPCH_SHIPDATE = f"{BASE_PATH_TPCH}/l_shipdate"
TPCH_PARTKEY = f"{BASE_PATH_TPCH}/l_partkey"
TPCH = {
    'l_shipdate': TPCH_SHIPDATE,
    'l_orderkey': TPCH_ORDERKEY,
    'l_partkey': TPCH_PARTKEY
}

In [None]:
m = create_model("lineitem", PQPInputParser("tpch", TPCH_ORDERKEY), 2)

# JOIN EVALUATION

In [None]:
CLUSTERING_COLUMN = "l_orderkey"
SIDED = True
join_results = clustering.evaluation.evaluate_join_step(m, TPCH[CLUSTERING_COLUMN], [CLUSTERING_COLUMN], CLUSTERING_COLUMN, [100], "ALL", SIDED)


print(f"There are {len(join_results)} joins")
join_results['e'] = join_results['TOTAL_ERROR'] ** 2
join_results.sort_values(['e'], ascending=False)[["QUERY_HASH1", "DESCRIPTION1", "RUNTIME_BASE_MS", "RUNTIME_ESTIMATE_MS", "RUNTIME_CLUSTERED_MS", "RELATIVE_ERROR", "TOTAL_ERROR_MS"]]

In [None]:
old_join_results = None
#old_join_results = join_results

In [None]:
clustering.evaluation.print_aggregated_metrics(join_results, m.query_frequencies)
fig = clustering.evaluation.plot_join_errors(join_results, old_join_results, m.query_frequencies)
#old_join_results = join_results
fig

In [None]:
joindbg = pd.read_csv(TPCH_SHIPDATE + "/joins.csv", sep='|')
joindbg.dropna(inplace=True)
joindbg['BUILD_COLUMN'] = joindbg.apply(lambda x: x[f"{x['BUILD_SIDE']}_COLUMN_NAME"], axis=1)
joindbg['PROBE_COLUMN'] = joindbg.apply(lambda x: x[f"{x['PROBE_SIDE']}_COLUMN_NAME"], axis=1)
viewcols = ['QUERY_HASH', 'IS_FLIPPED', 'PROBE_SORTED', 'BUILD_SORTED', 'PROBE_COLUMN', 'BUILD_COLUMN', 'DESCRIPTION']
joindbg[joindbg['QUERY_HASH'] == '3534234c34669919'][viewcols]

In [None]:
CLUSTERING_COLUMN =  "l_partkey"
METRIC = "PIC"
simulation_results = clustering.evaluation.evaluate_join_simulation(m, TPCH[CLUSTERING_COLUMN], [CLUSTERING_COLUMN], CLUSTERING_COLUMN, [100])
#simulation_results[['QUERY_HASH1', 'DESCRIPTION1', f"{METRIC}1", f"{METRIC}2", f"{METRIC}"]].sort_values([f"{METRIC}"], ascending=False)
simulation_results

# SCAN EVALUATION

In [None]:
CLUSTERING_COLUMN = "l_shipdate"
scan_results = clustering.evaluation.evaluate_scans(m, TPCH[CLUSTERING_COLUMN], [CLUSTERING_COLUMN], CLUSTERING_COLUMN, [100])
print(f"There are {len(scan_results)} scans on {m.table_name}")

scan_results['e'] = scan_results['TOTAL_ERROR'] ** 2
scan_results['DESC'] = scan_results.apply(lambda x: " ".join(x['DESCRIPTION1'].split(" ")[3:]), axis=1)
scan_results.sort_values(['e'], ascending=False)[["QUERY_HASH", "DESC", "COLUMN_NAME", "RUNTIME_BASE_MS", "RUNTIME_ESTIMATE_MS", "RUNTIME_CLUSTERED_MS", "TOTAL_ERROR_MS", "RELATIVE_ERROR"]]

In [None]:
old_scan_results = None

In [None]:
clustering.evaluation.print_aggregated_metrics(scan_results, m.query_frequencies)
fig = clustering.evaluation.plot_scan_errors(scan_results, old_scan_results, m.query_frequencies)
#old_scan_results = scan_results
fig

In [None]:
#scan_results['DESCRIPTION1'].apply(lambda x: "Like" in x).any()
adapted_scans = m.adapt_scans_to_clustering(m.table_scans.copy(), ['l_shipdate'], 'l_shipdate', [100])
adapted_scans = adapted_scans[(adapted_scans['DESCRIPTION'] == "l_shipdate BETWEEN UPPER EXCLUSIVE '1994-01-01' AND '1995-01-01'")]
adapted_scans[['QUERY_HASH', 'DESCRIPTION']]
m.table_scans['DATA_TYPE'].unique()

In [None]:
CLUSTERING_COLUMN =  "l_partkey"
METRIC = "IR"
simulation_results = clustering.evaluation.evaluate_scan_simulation(m, TPCH_PARTKEY, [CLUSTERING_COLUMN], CLUSTERING_COLUMN, [100])
simulation_results[['QUERY_HASH1', 'DESCRIPTION1', f'{METRIC}1', f'{METRIC}2', f'{METRIC}r']].sort_values([f'{METRIC}r'], ascending=False)

# AGGREGATE EVALUATION

In [None]:
CLUSTERING_COLUMN = "l_orderkey"
aggregate_results = clustering.evaluation.evaluate_aggregates(m, TPCH[CLUSTERING_COLUMN], [CLUSTERING_COLUMN], CLUSTERING_COLUMN, [100])
print(f"There are {len(aggregate_results)} aggregates")
aggregate_results['e'] = aggregate_results['TOTAL_ERROR'] ** 2
aggregate_results.sort_values(['e'], ascending=False)[['QUERY_HASH', 'DESCRIPTION1', 'RUNTIME_BASE_MS', 'RUNTIME_ESTIMATE_MS', 'RUNTIME_CLUSTERED_MS', 'TOTAL_ERROR_MS', 'RELATIVE_ERROR', 'RUNTIME_ESTIMATE']]

In [None]:
old_aggregate_results = None

In [None]:
clustering.evaluation.print_aggregated_metrics(aggregate_results, m.query_frequencies)
fig = clustering.evaluation.plot_aggregate_errors(aggregate_results, old_aggregate_results, m.query_frequencies)
#old_aggregate_results = aggregate_results
fig

In [None]:
m.aggregates[(m.aggregates['GROUP_COLUMNS'] == 1) & (m.aggregates['AGGREGATE_COLUMNS'] == 1) & (m.aggregates['RUNTIME_NS'] > 500e6)]
#m.aggregates[m.aggregates['COLUMN_NAME'] == 'l_orderkey,l_quantity']

In [None]:
def avg_latency_ms(df, m):
    df['frequency'] = df['QUERY_HASH'].apply(lambda h: m.query_frequencies[h])
    total_sum = df['frequency'] * df['RUNTIME_NS']
    return total_sum.sum() / df['frequency'].sum() / 1e6

def avg_latencies(table_name, column_name):
    m2 = create_model(table_name, PQPInputParser("tpch", TPCH[column_name]), 2)
    m2.joins = m2.joins[(m2.joins['PROBE_TABLE'] == table_name) | (m2.joins['BUILD_TABLE'] == table_name)]
    m2.aggregates = m2.aggregates[m2.aggregates['TABLE_NAME'].apply(lambda x: table_name in x)]

    print(f"average latency with {column_name} clustering:")
    print(f"scans: {avg_latency_ms(m2.table_scans, m)}  ms")
    print(f"joins: {avg_latency_ms(m2.joins, m)} ms")
    print(f"aggregates: {avg_latency_ms(m2.aggregates, m)} ms")
    print()
    
avg_latencies("lineitem", "l_orderkey")
avg_latencies("lineitem", "l_shipdate")
avg_latencies("lineitem", "l_partkey")