In [1]:
import pandas as pd
import numpy as np
from functools import reduce
from datetime import datetime
from collections import Counter
import itertools
import math
import operator
import json

In [2]:
# Config

BENCHMARK = "TPCH"
CHUNK_SIZE = 65535

if BENCHMARK == "TPCH":
    SCALE_FACTOR = 1
    RUNS = 10
    TIME = 60
    STATISTICS_PATH = f"~/Dokumente/repos/example_plugin/TPC-H__SF_{SCALE_FACTOR}.000000__RUNS_{RUNS}__TIME_{TIME}"
elif BENCHMARK == "TPCDS":
    SCALE_FACTOR = 1
    RUNS = 1
    TIME = 60
    STATISTICS_PATH = f"~/Dokumente/repos/example_plugin/TPC-DS__SF_{SCALE_FACTOR}.000000__RUNS_{RUNS}__TIME_{TIME}"
else:
    raise Exception("Unknown benchmark: " + BENCHMARK)

print(f"Model is configured for {BENCHMARK} (chunk size {CHUNK_SIZE}) with scale factor {SCALE_FACTOR}, {TIME} seconds runtime, and at most {RUNS} runs per query")

Model is configured for TPCH (chunk size 65535) with scale factor 1, 60 seconds runtime, and at most 10 runs per query


In [3]:
# Load table scan statistics

path = f"{STATISTICS_PATH}/table_scans.csv"
scans = pd.read_csv(path, sep='|')
EXPECTED_SCAN_COUNT = len(scans)
LOADED_CHUNK_SIZE = CHUNK_SIZE
LOADED_BENCHMARK = BENCHMARK
LOADED_SCALE_FACTOR = SCALE_FACTOR
LOADED_RUNS = RUNS
LOADED_TIME = TIME

print(f"Successfully loaded {path}")

def assert_correct_statistics_loaded():
    assert BENCHMARK == LOADED_BENCHMARK, f"The model is configured to use {BENCHMARK}, but {LOADED_BENCHMARK} is currently loaded.\nEither change the benchmark or re-run all cells"
    assert SCALE_FACTOR == LOADED_SCALE_FACTOR, f"The model is configured to use {SCALE_FACTOR} as scale factor, but data for a scale factor of {LOADED_SCALE_FACTOR} is currently loaded.\nEither change the benchmark or re-run all cells"
    assert RUNS == LOADED_RUNS, f"The model is configured to perform at most {RUNS} runs, but the currently loaded data had at most {LOADED_RUNS} runs.\nEither change the benchmark or re-run all cells"
    assert TIME == LOADED_TIME, f"The model is configured to run for {TIME} seconds, but the currently data had a runtime of {LOADED_TIME} seconds.\nEither change the benchmark or re-run all cells"
    assert CHUNK_SIZE == LOADED_CHUNK_SIZE, f"The model is configured to use {CHUNK_SIZE} as chunk_size, but data for a chunk size of {LOADED_CHUNK_SIZE} is currently loaded.\nEither change the benchmark or re-run all cells"
    assert EXPECTED_SCAN_COUNT == len(scans), f"There should be {EXPECTED_SCAN_COUNT} table scans, but there are only {len(scans)}\nProbably one of the last commands reassigned it unintentionally"
    
    assert 'OPERATOR_POINTER' in scans.columns, f"the statistics in {STATISTICS_PATH} are outdated (column 'OPERATOR_POINTER' in table_scans.csv is missing). Please create them again."

Successfully loaded ~/Dokumente/repos/example_plugin/TPC-H__SF_1.000000__RUNS_10__TIME_60/table_scans.csv


In [4]:
# Validate table scans
assert_correct_statistics_loaded()

# To make sure pruning was not active,
# first fetch table sizes,
table_statistics = pd.read_csv(f"{STATISTICS_PATH}/table_meta_data.csv", sep='|')
table_sizes = dict(zip(table_statistics.table_name, table_statistics.row_count))

# then make sure INPUT_ROWS == table_size
def input_size_matches(row):
    #print(row)
    
    actual_row_count = row['INPUT_ROWS']
    table = row['TABLE_NAME']
    expected_row_count = table_sizes[table]
    return expected_row_count == actual_row_count

data_scans = scans[scans['COLUMN_TYPE'] == 'DATA']
input_size_matches = data_scans.apply(input_size_matches, axis=1)
all_sizes_match = reduce(np.logical_and, input_size_matches) #input_size_matches.apply()

if not all_sizes_match:
    raise Exception("The given statistics were probably created while pruning was active")
else:
    print("OK - looks like pruning was deactivated while the statistics were created")

OK - looks like pruning was deactivated while the statistics were created


In [5]:
# Append additional information to the table scans
assert_correct_statistics_loaded()

print(f"Statistics for {BENCHMARK} contain {len(scans)} table scans")


# Add statistics about selectivity and speed for each operator
scans['selectivity'] = scans['OUTPUT_ROWS'] / scans['INPUT_ROWS']

# TODO: Assumption that reading and writing a row have the same cost
scans['time_per_row'] = scans['RUNTIME_NS'] / (scans['INPUT_ROWS'] + scans['OUTPUT_ROWS'])
scans['time_per_input_row'] = scans['time_per_row']
scans['time_per_output_row'] = scans['time_per_row']


def determine_or_chains(table_scans):
    table_scans['part_of_or_chain'] = False
    
    single_table_scans = table_scans.groupby(['QUERY_HASH', 'TABLE_NAME', 'OPERATOR_POINTER'])
    
    for _, scans in single_table_scans:            
        input_row_frequencies = Counter(scans.INPUT_ROWS)
        or_input_sizes = set([input_size for input_size, frequency in input_row_frequencies.items() if frequency > 1])

        df = pd.DataFrame()
        df['INPUT_ROWS'] = scans['INPUT_ROWS']
        df['OUTPUT_ROWS'] = scans['OUTPUT_ROWS']
        df['part_of_or_chain'] = scans.apply(lambda row: row['INPUT_ROWS'] in or_input_sizes, axis=1)

        for _ in range(len(scans)):
            or_input_sizes |= set(df[df['part_of_or_chain']].OUTPUT_ROWS.unique())
            df['part_of_or_chain'] = df.apply(lambda row: row['INPUT_ROWS'] in or_input_sizes, axis=1)

        or_chains = list(df[df['part_of_or_chain']].index)
        table_scans.iloc[or_chains, table_scans.columns.get_loc('part_of_or_chain')] = True
    
    return table_scans

# Hyrise does not use scans that are part of an OR-chain for pruning
scans = determine_or_chains(scans)


# Like scans are not useful if they start with %
# TODO what if they dont start with % and contain more than one % ? -> up to first % prunable, but is it used?
def benefits_from_sorting(row):    
    description = row['DESCRIPTION']
    if "ColumnLike" in description:
        words = description.split()
        like_criteria = words[-1]
        assert "%" in like_criteria, f"LIKE operators should have an %, but found none in {like_criteria}"
        return like_criteria[1] != '%'
    elif "ExpressionEvaluator" in description and " IN " in description:
        return False
    else:
        return True

scans['benefits_from_sorting'] = scans.apply(benefits_from_sorting, axis=1)
# TODO: valid atm, but feels a bit hacky to assume not benefitting from sorted segments -> not benefitting from pruning
scans['useful_for_pruning'] = scans.apply(lambda row: not row['part_of_or_chain'] and row['benefits_from_sorting'] , axis=1)
EXPECTED_SCAN_COUNT = len(scans)
print(f"Of those, only {len(scans[scans['useful_for_pruning']])} are useful for pruning")

print("TODO: For now, filtering on scans is deactivated. This is because all scans are needed to recognize OR-Chains. Models have to take care themselves whether a scan can contribute to pruning or not")

Statistics for TPCH contain 436 table scans
Of those, only 228 are useful for pruning
TODO: For now, filtering on scans is deactivated. This is because all scans are needed to recognize OR-Chains. Models have to take care themselves whether a scan can contribute to pruning or not


In [6]:
def test_determine_or_chains():
    test = pd.DataFrame()
    test['QUERY_HASH'] = pd.Series(['1']*3  + ['2']*4)
    test['TABLE_NAME'] = pd.Series(['lineitem']*3  + ['part']*4)
    test['OPERATOR_POINTER'] = pd.Series(['0x1'] + ['0x2']*2 + ['0x3']*4)
    test['COLUMN_NAME'] = pd.Series(['l_shipdate', 'l_shipdate', 'l_discount', 'p_brand', 'p_type', 'p_type', 'p_size'])
    test['INPUT_ROWS'] = pd.Series( [6001215, 6001215, 200000, 200000, 199000, 199000, 50000])
    test['OUTPUT_ROWS'] = pd.Series([ 400000,  300000, 200000, 199000,      0,  50000, 20000])
    test_result = determine_or_chains(test)
    assert len(test_result) == 7, "should not filter out any rows"    
    assert len(test_result[test_result['part_of_or_chain']]) == 3, "expected 3 scans, got\n" + str(test_result)
    assert list(test_result['part_of_or_chain']) == [False]*4 + [True]*3
    print("Test OK")

test_determine_or_chains()

Test OK


In [8]:
(scans['RUNTIME_NS'] - scans['SINGLE_RUNTIME_NS']).max()
# TODO can the actual runtime be that much greater than the runtime on the original table?

104114384

In [7]:
# Load query frequency information
assert_correct_statistics_loaded()

def get_query_frequencies():
    plan_cache = pd.read_csv(f"{STATISTICS_PATH}/plan_cache.csv", sep='|')
    return dict(zip(plan_cache.QUERY_HASH, plan_cache.EXECUTION_COUNT))

In [9]:
# Load column statistics - especially interesting: number of distinct values, and columns sorted during statistics creation

# Returns a 2-level-dictionary: distinct_values[TABLE][COLUMN] = number_of_distinct_values
def get_distinct_values_count():        
    # Code
    column_statistics_df = pd.read_csv(f"{STATISTICS_PATH}/column_meta_data.csv", sep='|')
    column_statistics_df['distinct_values'] = np.int32(column_statistics_df['distinct_values'])
    tables_and_columns = column_statistics_df.groupby('table_name')
    distinct_values = {table: dict(zip(column_df.column_name, column_df.distinct_values)) for table, column_df in tables_and_columns }

    
    # Test
    num_tables = len(distinct_values)
    if BENCHMARK == "TPCH":
        assert num_tables == 8, f"TPCH has 8 tables, but got {num_tables}"
    elif BENCHMARK == "TPCDS":
        assert num_tables == 24, f"TPCDS has 24 tables, but got {num_tables}"
    else:
        assert False, "Insert a benchmark specific check here"
    
    return distinct_values

# Returns a dictionary: sorted_columns_during_creation[TABLE] = [column1, column2, ...]
def get_sorted_columns_during_creation():
    # Code
    column_statistics_df = pd.read_csv(f"{STATISTICS_PATH}/column_meta_data.csv", sep='|')
    globally_sorted_columns = column_statistics_df[column_statistics_df['is_globally_sorted'] == 1]
    
    tables_and_columns = globally_sorted_columns.groupby('table_name')
    globally_sorted_columns = {table: list(column_df.column_name) for table, column_df in tables_and_columns }
    
    return globally_sorted_columns

In [10]:
### JOINS ###

assert_correct_statistics_loaded()

def load_join_statistics():
    def line_looks_suspicious(row):
        right_table_name = row['RIGHT_TABLE_NAME']    
        if pd.isnull(right_table_name):
            pass
        elif row['RIGHT_TABLE_ROW_COUNT'] > table_sizes[row['RIGHT_TABLE_NAME']]:
            return True

        left_table_name = row['LEFT_TABLE_NAME']
        if pd.isnull(left_table_name):
            pass
        elif row['LEFT_TABLE_ROW_COUNT'] > table_sizes[row['LEFT_TABLE_NAME']]:
            return True

        return False
    
    def validate_joins(joins):
        is_suspicious = joins.apply(line_looks_suspicious, axis=1)
        suspicious_joins = joins[is_suspicious]
        assert len(suspicious_joins) < 3, f"there are {len(suspicious_joins)} suspicious joins:\n{suspicious_joins[['JOIN_MODE', 'LEFT_TABLE_NAME', 'LEFT_COLUMN_NAME', 'LEFT_TABLE_ROW_COUNT', 'RIGHT_TABLE_NAME', 'RIGHT_COLUMN_NAME', 'RIGHT_TABLE_ROW_COUNT', 'PROBE_TABLE', 'PROBE_COLUMN', 'OUTPUT_ROWS']]}"
    
    joins = pd.read_csv(f"{STATISTICS_PATH}/joins.csv", sep=',')
    validate_joins(joins)
                                                                                           
    return joins

#load_join_statistics().iloc[9:10][['JOIN_MODE', 'LEFT_TABLE_NAME', 'LEFT_COLUMN_NAME', 'LEFT_TABLE_ROW_COUNT', 'RIGHT_TABLE_NAME', 'RIGHT_COLUMN_NAME', 'RIGHT_TABLE_ROW_COUNT', 'PROBE_TABLE', 'PROBE_COLUMN', 'OUTPUT_ROWS']]

In [11]:
class AbstractModel:
    
    def __init__(self, query_frequencies, table_name, table_scans, correlations={}):
        self.query_frequencies = query_frequencies
        self.table_name = table_name
        self.table_scans = table_scans
        self.correlations = correlations
        
    def query_frequency(self, query_hash):
        return self.query_frequencies[query_hash]
        
    def extract_scan_columns(self):
        useful_scans = self.table_scans[self.table_scans['useful_for_pruning']]
        interesting_scan_columns = list(useful_scans['COLUMN_NAME'].unique())
        
        return interesting_scan_columns
    
    def extract_join_columns(self):
        interesting_join_probe_columns = list(self.joins[self.joins['PROBE_TABLE'] == self.table_name]['PROBE_COLUMN'].unique())
        interesting_join_build_columns = list(self.joins[self.joins['BUILD_TABLE'] == self.table_name]['BUILD_COLUMN'].unique())        
        
        return self.uniquify(interesting_join_probe_columns + interesting_join_build_columns)
    
    def extract_interesting_columns(self):        
        return self.uniquify(self.extract_scan_columns() + self.extract_join_columns())
    
    def round_up_to_next_multiple(self, number_to_round, base_for_multiple):
        quotient = number_to_round // base_for_multiple
        if number_to_round % base_for_multiple != 0:
            quotient += 1
        return quotient * base_for_multiple        

    def uniquify(self, seq):
            seen = set()
            return [x for x in seq if not (x in seen or seen.add(x))]    
    
    # return a list of possible clusterings
    def suggest_clustering(self, first_k=1):
        raise NotImplemented()

In [12]:
class SingleTableMdcModel(AbstractModel):
    
    def __init__(self, query_frequencies, table_name, table_scans, table_size, distinct_values, target_chunksize, correlations, joins, sorted_columns_during_creation):
        super().__init__(query_frequencies, table_name, table_scans, correlations)
        self.table_size = table_size
        self.distinct_values = distinct_values
        self.target_chunksize = target_chunksize
        self.joins = joins
        self.sorted_columns_during_creation = sorted_columns_during_creation
    
    def suggest_clustering(self, first_k=1):
        interesting_columns = self.extract_interesting_columns()

        print(interesting_columns)
        
        clustering_columns = itertools.product(interesting_columns, interesting_columns)
        #clustering_columns = itertools.product(interesting_columns, interesting_columns, interesting_columns)
        clustering_columns = filter(lambda x: x[0] <= x[1], clustering_columns)
        #clustering_columns = filter(lambda x: x[1] <= x[2], clustering_columns)
        clustering_columns = [self.uniquify(clustering) for clustering in clustering_columns]
        sort_columns = interesting_columns        
        clusterings_with_runtimes = reduce(lambda x,y: x+y,[self.estimate_total_runtime(clustering_cols, sort_columns) for clustering_cols in clustering_columns])
        clusterings_with_runtimes.sort(key=lambda x: x[2], reverse=False)
        
        return clusterings_with_runtimes[0:first_k]
        
    def estimate_table_scan_runtimes(self, clustering_columns, sorting_columns, split_factors, total_runtimes):        
        def compute_unprunable_parts(row, split_factors):
            def clustering_columns_correlated_to(column):
                return [clustering_column for clustering_column in clustering_columns if column in self.correlations.get(clustering_column, {})]
            
            def correlates_to_clustering_column(column):
                return len(clustering_columns_correlated_to(column)) > 0

            column_name = row['COLUMN_NAME']

            if not row['useful_for_pruning']:
                selectivity = 1
            elif column_name in clustering_columns:
                scan_selectivity = row['selectivity']
                split_factor = split_factors[clustering_columns.index(column_name)]
                selectivity =  self.round_up_to_next_multiple(scan_selectivity, 1 / split_factor)
            elif correlates_to_clustering_column(column_name):
                scan_selectivity = row['selectivity']
                correlated_clustering_columns = clustering_columns_correlated_to(column_name)
                
                # ToDo this is hacky, but for now assume there is just one correlated column
                assert len(correlated_clustering_columns) == 1, f"expected just 1 correlated clustering column, but got {len(correlated_clustering_columns)}"
                
                split_factor = split_factors[clustering_columns.index(correlated_clustering_columns[0])]
                selectivity = min(1, 1.2 * self.round_up_to_next_multiple(scan_selectivity, 1 / split_factor))
            else:
                selectivity = 1
            
            return selectivity
        
        def compute_runtimes(row, sorting_column):
            assert row['estimated_input_rows'] > 1, row
            assert row['runtime_per_input_row'] > 0, row
            assert row['runtime_per_output_row'] > 0, row
            input_row_count = row['estimated_input_rows']
            
            if row['COLUMN_NAME'] == sorting_column and row['benefits_from_sorting']:
                # TODO is this the best way to simulate sorted access?
                input_row_count = np.log2(input_row_count)

            runtime = input_row_count * row['runtime_per_input_row'] + row['OUTPUT_ROWS'] * row['runtime_per_output_row']
            return runtime * self.query_frequency(row['QUERY_HASH'])
        
        scans_per_query = self.table_scans.sort_values(['INPUT_ROWS'], ascending=False).groupby(['QUERY_HASH', 'OPERATOR_POINTER'])
        for _, scans in scans_per_query:
            number_of_scans = len(scans)
            assert number_of_scans > 0 and number_of_scans < 25, f"weird scan length: {number_of_scans}\nScans:\n{scans}"
            # TODO: kinda unrealistic assumption: everything not in the table scan result can be pruned
                          
            unprunable_parts = scans.apply(compute_unprunable_parts, axis=1, args=(split_factors,))            
            unprunable_part = unprunable_parts.product()
            assert unprunable_part > 0, "no unprunable part"
            
            estimated_pruned_table_size = self.round_up_to_next_multiple(unprunable_part * self.table_size, CHUNK_SIZE)
            
            runtimes = pd.DataFrame()
            runtimes['QUERY_HASH'] = scans['QUERY_HASH']
            runtimes['runtime_per_input_row'] = scans['time_per_input_row']
            runtimes['runtime_per_output_row'] = scans['time_per_output_row']
            runtimes['COLUMN_NAME'] = scans['COLUMN_NAME']
            runtimes['benefits_from_sorting'] = scans['benefits_from_sorting']
            # the pruned table inputs should be reflected in 'estimated_input_rows'
            runtimes['estimated_input_rows'] = scans.apply(lambda x: x['INPUT_ROWS'], axis=1)
            runtimes['OUTPUT_ROWS'] = scans['OUTPUT_ROWS']

            runtimes.iloc[0, runtimes.columns.get_loc('estimated_input_rows')] = estimated_pruned_table_size                                    
            assert runtimes['estimated_input_rows'].iloc[0] == estimated_pruned_table_size, f"value is {runtimes.iloc[0]['estimated_input_rows']}, but should be {estimated_pruned_table_size}"
            # TODO modify input sizes of subsequent scans
            
            for sorting_column in sorting_columns:
                scan_runtimes = runtimes.apply(compute_runtimes, axis=1, args=(sorting_column,))
                total_runtimes[sorting_column] += scan_runtimes.sum()

    def estimate_join_runtimes(self, clustering_columns, sorting_columns, total_runtimes):                
        def estimate_join_runtime(row, sorting_column):
                        
            if "JoinHash" in row['DESCRIPTION']:
                probe_column = row['PROBE_COLUMN']
                if row['PROBE_TABLE'] == self.table_name:
                    probe_column_was_sorted = row['PROBE_SORTED'] and probe_column in self.sorted_columns_during_creation.get(self.table_name, {})
                    probe_column_is_sorted = row['PROBE_SORTED'] and probe_column == sorting_column
                    probe_column_is_clustered = row['PROBE_SORTED'] and probe_column in clustering_columns
                else:
                    probe_column_was_sorted = row['PROBE_SORTED'] and probe_column in self.sorted_columns_during_creation.get(row['PROBE_TABLE'], {})
                    probe_column_is_sorted = probe_column_was_sorted
                    probe_column_is_clustered = probe_column_was_sorted
                    
                build_column = row['BUILD_COLUMN']
                if row['BUILD_TABLE'] == self.table_name:
                    build_column_was_sorted = row['BUILD_SORTED'] and build_column in self.sorted_columns_during_creation.get(self.table_name, {})
                    build_column_is_sorted = row['BUILD_SORTED'] and build_column == sorting_column
                    build_column_is_clustered = row['BUILD_SORTED'] and build_column in clustering_columns
                else:
                    build_column_was_sorted = row['BUILD_SORTED'] and build_column in self.sorted_columns_during_creation.get(row['BUILD_TABLE'], {})
                    build_column_is_sorted = build_column_was_sorted
                    build_column_is_clustered = build_column_was_sorted

                time_materialize = row['MATERIALIZE']
                
                probe_weight = 2
                build_weight = 2
                if probe_column_was_sorted:
                    probe_weight = 1
                if build_column_was_sorted:
                    build_weight = 1
                
                
                probe_table_size = row['PROBE_TABLE_SIZE']
                build_table_size = row['BUILD_TABLE_SIZE']
                total_table_size = probe_weight * probe_table_size + build_weight * build_table_size
                
                time_materialize_probe = time_materialize * (probe_weight * probe_table_size / total_table_size)
                time_materialize_build = time_materialize - time_materialize_probe
                
                
                def get_materialize_factor(was_sorted, is_sorted, is_clustered):
                    materialize_factor = 1
                    if is_sorted and is_clustered:
                        if not was_sorted:
                            materialize_factor = 0.5
                        else:
                            materialize_factor = 1
                    elif is_sorted or is_clustered:
                        if not was_sorted:
                            materialize_factor = 0.55
                        else:
                            materialize_factor = 1.1
                    elif was_sorted:
                        # probe column is now neither sorted nor clustered
                        materialize_factor = 2
                    else:
                        # default case: was not sorted before, and is neither sorted nor clustered now. No change
                        materialize_factor = 1
                        
                    return materialize_factor
                
                materialize_probe_factor = get_materialize_factor(probe_column_was_sorted, probe_column_is_sorted, probe_column_is_clustered)
                materialize_build_factor = get_materialize_factor(build_column_was_sorted, build_column_is_sorted, build_column_is_clustered)
                
                time_materialize = time_materialize_probe * materialize_probe_factor + time_materialize_build *  materialize_build_factor
                

                # unchanged
                time_cluster = row['CLUSTER']
                
                # unchanged
                time_build = row['BUILD']
                
                            
                time_probe = row['PROBE']
                probe_factor = 1
                if probe_column_is_sorted and probe_column_is_clustered:
                    if not probe_column_was_sorted:
                        probe_factor = 0.7
                    else:
                        probe_factor = 1
                elif probe_column_is_sorted or probe_column_is_clustered:
                    if not probe_column_was_sorted:
                        probe_factor = 0.9
                    else:
                        probe_factor = 1.1
                elif probe_column_was_sorted:
                    # probe column is now neither sorted nor clustered
                    probe_factor = 1.4
                
                time_probe *= probe_factor                
                
                # unchanged
                time_write_output = row['WRITE_OUTPUT']
                
                
                
                # TODO: how to deal with the difference between RUNTIME_NS and sum(stage_runtimes)?
                runtime = time_materialize + time_cluster + time_build + time_probe + time_write_output
            else:
                runtime = row['RUNTIME_NS']
                
            return runtime * self.query_frequency(row['QUERY_HASH'])
        
        for sorting_column in sorting_columns:
            join_runtimes = self.joins.apply(estimate_join_runtime, axis=1, args=(sorting_column,))
            total_runtimes[sorting_column] += join_runtimes.sum()
                
    def estimate_total_runtime(self, clustering_columns, sorting_columns):
        #print(f"testing clustering {clustering_columns} with sorting columns {sorting_columns}")
        split_factors = self.determine_split_factors(clustering_columns)            
        total_runtimes = {sorting_column: 0 for sorting_column in sorting_columns}
        self.estimate_table_scan_runtimes(clustering_columns, sorting_columns, split_factors, total_runtimes)
        self.estimate_join_runtimes(clustering_columns, sorting_columns, total_runtimes)
        
        clusterings = [[list(zip(clustering_columns, split_factors)), sorting_column, np.int64(total_runtimes[sorting_column])] for sorting_column in sorting_columns]
        return clusterings
    
    def determine_split_factors(self, clustering_columns):
        approximate_split_factor = self.table_size / self.target_chunksize
        individual_distinct_values = [self.distinct_values[column] for column in clustering_columns]        
        log_distinct_values = [math.ceil(0.5+np.log2(x)) for x in individual_distinct_values]
        log_distinct_values_product = reduce(operator.mul, log_distinct_values, 1)
        assert log_distinct_values_product > 0, "cannot have a distinct value count of 0"
        
        global_modification_factor = approximate_split_factor / log_distinct_values_product
        num_dimensions = len(clustering_columns)
        individual_modification_factor = np.power(global_modification_factor, 1.0 / num_dimensions)    
        split_factors = [math.ceil(x * individual_modification_factor) for x in log_distinct_values]
        
        # testing
        actual_split_factor = reduce(operator.mul, split_factors, 1)
        assert actual_split_factor > 0, "there was a split up factor of 0"
        estimated_chunksize = self.table_size / actual_split_factor
        assert estimated_chunksize <= self.target_chunksize, "chunks should be smaller, not larger than target_chunksize"
        allowed_percentage = 0.55
        if estimated_chunksize < allowed_percentage * self.target_chunksize:
            print(f"Warning: chunks should not be too much smaller than target_chunksize: {estimated_chunksize} < {allowed_percentage} * {self.target_chunksize}")
        #assert estimated_chunksize >= allowed_percentage * self.target_chunksize, f"chunks should not be too much smaller than target_chunksize: {estimated_chunksize} < {allowed_percentage} * {self.target_chunksize}"
        
        return split_factors    

In [25]:
assert_correct_statistics_loaded()

def extract_single_table(table_scans, table_name):
    return table_scans[table_scans['TABLE_NAME'] == table_name]

def get_table_names(table_scans):
    return table_scans['TABLE_NAME'].unique()

def extract_probe_side_joins(joins, table_name):
    return joins[joins['PROBE_TABLE'] == table_name]


def default_benchmark_config():    
    if BENCHMARK == "TPCH":
        config = {
            'lineitem': [['l_shipdate', 92 * SCALE_FACTOR]],
            'orders': [['o_orderdate', 23 * SCALE_FACTOR]]
        }
    elif BENCHMARK == "TPCDS":
        config = dict()
    else:        
        raise Exception("unknown benchmark, please provide a default config")
    return config

def get_correlations():
    if BENCHMARK == "TPCH":
        correlations = {
            'lineitem': {
                'l_shipdate': ['l_receiptdate', 'l_commitdate'],
                'l_receiptdate': ['l_shipdate', 'l_commitdate'],
            }
        }
    elif BENCHMARK == "TPCDS":
        correlations = dict()
    else:
        raise Exception("unknown benchmark, please provide correlation information")
        
    return correlations


def format_table_clustering(clustering_config):
    # input format: List of [ [(column, split)+ ], sorting_column, runtime ]
    # output format: List of [ (column, split)+ ] - sorting column integrated if necessary
    
    assert len(clustering_config) == 3, "config should have exactly three entries: clustering columns, sort column, runtime"
    clustering_columns = clustering_config[0]
    assert len(clustering_columns) <= 3, "atm the model is at most 3-dimensional"
    #print(f"clustering columns are {clustering_columns}")
    last_clustering_column = clustering_columns[-1]
    last_clustering_column_name = last_clustering_column[0]
    #print(f"last column is {last_clustering_column_name}")
    sorting_column = clustering_config[1]
    #print(f"sort column is {sorting_column}")
    
    result = clustering_columns
    if last_clustering_column_name != sorting_column:
        result = clustering_columns + [(sorting_column, 1)]
        
    #print(f"in: {clustering_config}")
    #print(f"out: {result}")
    
    return result

def get_config_name(clustering_config):
    # Input: config-dict
    
    # List of lists. Each secondary list contains clustering information for a table
    table_configs = [clustering_config[table] for table in clustering_config]
    config_entries = [[f"{config_entry[0]}-{config_entry[1]}" for config_entry in config] for config in table_configs]
    table_entries = ["_".join(config) for config in config_entries]
    return "_".join(table_entries)


def create_benchmark_configs():
    
    start_time = datetime.now()
    clusterings = {"default" : default_benchmark_config()}
    query_frequencies = get_query_frequencies()
    
    distinct_values = get_distinct_values_count()
    joins = load_join_statistics()    
    sorted_columns_during_creation = get_sorted_columns_during_creation()
    correlations = get_correlations()
    table_names = get_table_names(scans)
    for table_name in table_names:
        start_time_table = datetime.now()
        single_table_scans = extract_single_table(scans, table_name)
        probe_side_joins = joins#extract_probe_side_joins(joins, table_name)
        table_size = table_sizes[table_name]
        if table_size <= 3 * CHUNK_SIZE:
            print(f"Not computing clustering for {table_name}, as it has only {table_size} rows")
            continue

        model = SingleTableMdcModel(query_frequencies, table_name, single_table_scans, table_size, distinct_values[table_name], CHUNK_SIZE, correlations.get(table_name, {}), probe_side_joins, sorted_columns_during_creation)
        table_clusterings = model.suggest_clustering(3)
        for table_clustering in table_clusterings:
            config = default_benchmark_config()
            config[table_name] = format_table_clustering(table_clustering)
            config_name = get_config_name(config)
            clusterings[config_name] = config
        end_time_table = datetime.now()
        print(f"Done computing clustering for {table_name} ({end_time_table - start_time_table})")

            
    end_time = datetime.now()
    print(f"Computed all clusterings in {end_time - start_time}")
    
    return clusterings

create_benchmark_configs()

# TODO:
#  joins costs are multiplied with 0
#  still, the model suggests some join columns - why? are they useful for pruning?

Not computing clustering for customer, as it has only 150000 rows
['o_orderstatus', 'o_orderdate', 'o_orderkey', 'o_custkey']
Done computing clustering for orders (0:00:10.260482)
Not computing clustering for nation, as it has only 25 rows
['p_name', 'p_type', 'p_brand', 'p_container', 'p_size', 'p_partkey']


KeyboardInterrupt: 

In [14]:
assert_correct_statistics_loaded()

def extract_single_table(table_scans, table_name):
    return table_scans[table_scans['TABLE_NAME'] == table_name]

def get_table_names(table_scans):
    return table_scans['TABLE_NAME'].unique()


def default_benchmark_config():    
    if BENCHMARK == "TPCH":
        config = {
            'lineitem': [['l_shipdate', 2]],
            'orders': [['o_orderdate', 2]]
        }
    elif BENCHMARK == "TPCDS":
        config = dict()
    else:        
        raise Exception("unknown benchmark, please provide a default config")
    return config

def get_correlations():
    if BENCHMARK == "TPCH":
        correlations = {
            'lineitem': {
                'l_shipdate': ['l_receiptdate', 'l_commitdate'],
                'l_receiptdate': ['l_shipdate', 'l_commitdate'],
            }
        }
    elif BENCHMARK == "TPCDS":
        correlations = dict()
    else:
        raise Exception("unknown benchmark, please provide correlation information")
        
    return correlations


def format_table_clustering(clustering_config):
    # input format: List of [ [(column, split)+ ], sorting_column, runtime ]
    # output format: List of [ (column, split)+ ] - sorting column integrated if necessary
    
    assert len(clustering_config) == 3, "config should have exactly three entries: clustering columns, sort column, runtime"
    clustering_columns = clustering_config[0]
    assert len(clustering_columns) <= 3, "atm the model is at most 3-dimensional"
    #print(f"clustering columns are {clustering_columns}")
    last_clustering_column = clustering_columns[-1]
    last_clustering_column_name = last_clustering_column[0]
    #print(f"last column is {last_clustering_column_name}")
    sorting_column = clustering_config[1]
    #print(f"sort column is {sorting_column}")
    
    result = clustering_columns
    if last_clustering_column_name != sorting_column:
        result = clustering_columns + [(sorting_column, 1)]
        
    #print(f"in: {clustering_config}")
    #print(f"out: {result}")
    
    return result

def get_config_name(clustering_config):
    # Input: config-dict
    
    # List of lists. Each secondary list contains clustering information for a table
    table_configs = [clustering_config[table] for table in clustering_config]
    config_entries = [[f"{config_entry[0]}-{config_entry[1]}" for config_entry in config] for config in table_configs]
    table_entries = ["_".join(config) for config in config_entries]
    return "_".join(table_entries)


def create_model(table_name, max_dimensions=2):    
    query_frequencies = get_query_frequencies()
    distinct_values = get_distinct_values_count()
    joins = load_join_statistics()    
    sorted_columns_during_creation = get_sorted_columns_during_creation()
    correlations = get_correlations()
    table_names = get_table_names(scans)
    start_time_table = datetime.now()
    single_table_scans = extract_single_table(scans, table_name)
    table_size = table_sizes[table_name]

    model = DisjointClustersModel(max_dimensions, query_frequencies, table_name, single_table_scans, table_size, distinct_values[table_name], CHUNK_SIZE, correlations.get(table_name, {}), joins, sorted_columns_during_creation)
    return model

In [15]:
class DisjointClustersModel(AbstractModel):
    
    def __init__(self, max_dimensions, query_frequencies, table_name, table_scans, table_size, distinct_values, target_chunksize, correlations, joins, sorted_columns_during_creation):
        super().__init__(query_frequencies, table_name, table_scans, correlations)
        self.max_dimensions = max_dimensions
        self.table_size = table_size
        self.distinct_values = distinct_values
        self.target_chunksize = target_chunksize
        self.joins = joins
        self.sorted_columns_during_creation = sorted_columns_during_creation
        
        self.join_column_names = self.extract_join_columns()
        self.scan_column_names = self.extract_scan_columns()
        
    def is_join_column(self, column_name):
        return column_name in self.join_column_names
    
    def is_scan_column(self, column_name):
        return column_name in self.scan_column_names
    
    def suggest_clustering(self, first_k=1):
        interesting_columns = self.extract_interesting_columns()

        print(interesting_columns)
        clustering_columns = itertools.combinations_with_replacement(interesting_columns, self.max_dimensions)
        clustering_columns = [self.uniquify(clustering) for clustering in clustering_columns]
        sort_columns = interesting_columns        
        clusterings_with_runtimes = reduce(lambda x,y: x+y,[self.estimate_total_runtime(clustering_cols, sort_columns) for clustering_cols in clustering_columns])
        clusterings_with_runtimes.sort(key=lambda x: x[2], reverse=False)
        
        return clusterings_with_runtimes[0:first_k]
        
    def estimate_table_scan_runtimes(self, clustering_columns, sorting_columns, split_factors, total_runtimes):        
        def compute_unprunable_parts(row, split_factors):
            def clustering_columns_correlated_to(column):
                return [clustering_column for clustering_column in clustering_columns if column in self.correlations.get(clustering_column, {})]
            
            def correlates_to_clustering_column(column):
                return len(clustering_columns_correlated_to(column)) > 0

            column_name = row['COLUMN_NAME']

            if not row['useful_for_pruning']:
                selectivity = 1
            elif column_name in clustering_columns:
                scan_selectivity = row['selectivity']
                split_factor = split_factors[clustering_columns.index(column_name)]
                selectivity =  self.round_up_to_next_multiple(scan_selectivity, 1 / split_factor)
            elif correlates_to_clustering_column(column_name):
                scan_selectivity = row['selectivity']
                correlated_clustering_columns = clustering_columns_correlated_to(column_name)
                
                # ToDo this is hacky, but for now assume there is just one correlated column
                assert len(correlated_clustering_columns) == 1, f"expected just 1 correlated clustering column, but got {len(correlated_clustering_columns)}"
                
                split_factor = split_factors[clustering_columns.index(correlated_clustering_columns[0])]
                selectivity = min(1, 1.2 * self.round_up_to_next_multiple(scan_selectivity, 1 / split_factor))
            else:
                selectivity = 1
            
            return selectivity
        
        def compute_runtimes(row, sorting_column):
            assert row['estimated_input_rows'] > 1, row
            assert row['runtime_per_input_row'] > 0, row
            assert row['runtime_per_output_row'] > 0, row
            input_row_count = row['estimated_input_rows']
            
            if row['COLUMN_NAME'] == sorting_column and row['benefits_from_sorting']:
                # TODO is this the best way to simulate sorted access?
                input_row_count = np.log2(input_row_count)

            runtime = input_row_count * row['runtime_per_input_row'] + row['OUTPUT_ROWS'] * row['runtime_per_output_row']
            return runtime * self.query_frequency(row['QUERY_HASH'])
        
        scans_per_query = self.table_scans.sort_values(['INPUT_ROWS'], ascending=False).groupby(['QUERY_HASH', 'OPERATOR_POINTER'])
        for _, scans in scans_per_query:
            number_of_scans = len(scans)
            assert number_of_scans > 0 and number_of_scans < 25, f"weird scan length: {number_of_scans}\nScans:\n{scans}"
            # TODO: kinda unrealistic assumption: everything not in the table scan result can be pruned
                          
            unprunable_parts = scans.apply(compute_unprunable_parts, axis=1, args=(split_factors,))            
            unprunable_part = unprunable_parts.product()
            assert unprunable_part > 0, "no unprunable part"
            
            estimated_pruned_table_size = self.round_up_to_next_multiple(unprunable_part * self.table_size, CHUNK_SIZE)
            
            runtimes = pd.DataFrame()
            runtimes['QUERY_HASH'] = scans['QUERY_HASH']
            runtimes['runtime_per_input_row'] = scans['time_per_input_row']
            runtimes['runtime_per_output_row'] = scans['time_per_output_row']
            runtimes['COLUMN_NAME'] = scans['COLUMN_NAME']
            runtimes['benefits_from_sorting'] = scans['benefits_from_sorting']
            # the pruned table inputs should be reflected in 'estimated_input_rows'
            runtimes['estimated_input_rows'] = scans.apply(lambda x: x['INPUT_ROWS'], axis=1)
            runtimes['OUTPUT_ROWS'] = scans['OUTPUT_ROWS']

            runtimes.iloc[0, runtimes.columns.get_loc('estimated_input_rows')] = estimated_pruned_table_size                                    
            assert runtimes['estimated_input_rows'].iloc[0] == estimated_pruned_table_size, f"value is {runtimes.iloc[0]['estimated_input_rows']}, but should be {estimated_pruned_table_size}"
            # TODO modify input sizes of subsequent scans
            
            for sorting_column in sorting_columns:
                scan_runtimes = runtimes.apply(compute_runtimes, axis=1, args=(sorting_column,))
                total_runtimes[sorting_column] += scan_runtimes.sum()

    def estimate_join_runtimes(self, clustering_columns, sorting_columns, total_runtimes):                
        def estimate_join_runtime(row, sorting_column):
                        
            if "JoinHash" in row['DESCRIPTION']:
                probe_column = row['PROBE_COLUMN']
                if row['PROBE_TABLE'] == self.table_name:
                    probe_column_was_sorted = row['PROBE_SORTED'] and probe_column in self.sorted_columns_during_creation.get(self.table_name, {})
                    probe_column_is_sorted = row['PROBE_SORTED'] and probe_column == sorting_column
                    probe_column_is_clustered = row['PROBE_SORTED'] and probe_column in clustering_columns
                else:
                    probe_column_was_sorted = row['PROBE_SORTED'] and probe_column in self.sorted_columns_during_creation.get(row['PROBE_TABLE'], {})
                    probe_column_is_sorted = probe_column_was_sorted
                    probe_column_is_clustered = probe_column_was_sorted
                    
                build_column = row['BUILD_COLUMN']
                if row['BUILD_TABLE'] == self.table_name:
                    build_column_was_sorted = row['BUILD_SORTED'] and build_column in self.sorted_columns_during_creation.get(self.table_name, {})
                    build_column_is_sorted = row['BUILD_SORTED'] and build_column == sorting_column
                    build_column_is_clustered = row['BUILD_SORTED'] and build_column in clustering_columns
                else:
                    build_column_was_sorted = row['BUILD_SORTED'] and build_column in self.sorted_columns_during_creation.get(row['BUILD_TABLE'], {})
                    build_column_is_sorted = build_column_was_sorted
                    build_column_is_clustered = build_column_was_sorted

                time_materialize = row['MATERIALIZE']
                
                probe_weight = 2
                build_weight = 2
                if probe_column_was_sorted:
                    probe_weight = 1
                if build_column_was_sorted:
                    build_weight = 1
                
                
                probe_table_size = row['PROBE_TABLE_SIZE']
                build_table_size = row['BUILD_TABLE_SIZE']
                total_table_size = probe_weight * probe_table_size + build_weight * build_table_size
                
                time_materialize_probe = time_materialize * (probe_weight * probe_table_size / total_table_size)
                time_materialize_build = time_materialize - time_materialize_probe
                
                
                def get_materialize_factor(was_sorted, is_sorted, is_clustered):
                    materialize_factor = 1
                    if is_sorted and is_clustered:
                        if not was_sorted:
                            materialize_factor = 0.5
                        else:
                            materialize_factor = 1
                    elif is_sorted or is_clustered:
                        if not was_sorted:
                            materialize_factor = 0.55
                        else:
                            materialize_factor = 1.1
                    elif was_sorted:
                        # probe column is now neither sorted nor clustered
                        materialize_factor = 2
                    else:
                        # default case: was not sorted before, and is neither sorted nor clustered now. No change
                        materialize_factor = 1
                        
                    return materialize_factor
                
                materialize_probe_factor = get_materialize_factor(probe_column_was_sorted, probe_column_is_sorted, probe_column_is_clustered)
                materialize_build_factor = get_materialize_factor(build_column_was_sorted, build_column_is_sorted, build_column_is_clustered)
                
                time_materialize = time_materialize_probe * materialize_probe_factor + time_materialize_build *  materialize_build_factor
                

                # unchanged
                time_cluster = row['CLUSTER']
                
                # unchanged
                time_build = row['BUILD']
                
                            
                time_probe = row['PROBE']
                probe_factor = 1
                if probe_column_is_sorted and probe_column_is_clustered:
                    if not probe_column_was_sorted:
                        probe_factor = 0.7
                    else:
                        probe_factor = 1
                elif probe_column_is_sorted or probe_column_is_clustered:
                    if not probe_column_was_sorted:
                        probe_factor = 0.9
                    else:
                        probe_factor = 1.1
                elif probe_column_was_sorted:
                    # probe column is now neither sorted nor clustered
                    probe_factor = 1.4
                
                time_probe *= probe_factor                
                
                # unchanged
                time_write_output = row['WRITE_OUTPUT']
                
                
                
                # TODO: how to deal with the difference between RUNTIME_NS and sum(stage_runtimes)?
                runtime = time_materialize + time_cluster + time_build + time_probe + time_write_output
            else:
                runtime = row['RUNTIME_NS']
                
            return runtime * self.query_frequency(row['QUERY_HASH'])
        
        for sorting_column in sorting_columns:
            join_runtimes = self.joins.apply(estimate_join_runtime, axis=1, args=(sorting_column,))
            total_runtimes[sorting_column] += join_runtimes.sum()
                
    def estimate_total_runtime(self, clustering_columns, sorting_columns):
        #print(f"testing clustering {clustering_columns} with sorting columns {sorting_columns}")
        cluster_counts = self.determine_cluster_counts(clustering_columns)            
        total_runtimes = {sorting_column: 0 for sorting_column in sorting_columns}
        self.estimate_table_scan_runtimes(clustering_columns, sorting_columns, cluster_counts, total_runtimes)
        self.estimate_join_runtimes(clustering_columns, sorting_columns, total_runtimes)
        
        clusterings = [[list(zip(clustering_columns, cluster_counts)), sorting_column, np.int64(total_runtimes[sorting_column])] for sorting_column in sorting_columns]
        return clusterings
    
    def determine_cluster_counts(self, clustering_columns):
        # ToDo what if we aim at less than number of chunks clusters, i.e. multiple chunks per cluster?
        target_cluster_count = int(1.1 * self.table_size / self.target_chunksize)
        # idea: fixed size for join columns, variable amount for scan columns
        
        join_columns = list(filter(lambda x: self.is_join_column(x), clustering_columns))
        scan_columns = list(filter(lambda x: self.is_scan_column(x), clustering_columns))
        intersecting_columns = set(join_columns).intersection(set(scan_columns))
        assert len(intersecting_columns) == 0, f"The following columns are used as both join and scan column: {intersecting_columns}"
        
        if len(scan_columns) == 0:
            CLUSTERS_PER_JOIN_COLUMN = math.ceil(math.pow(target_cluster_count, 1/len(join_columns)))
        else: 
            CLUSTERS_PER_JOIN_COLUMN = 3;
        # Assumption: uniform distribution (in the sense that every cluster actually exists)
        num_join_clusters = math.pow(CLUSTERS_PER_JOIN_COLUMN, len(join_columns))
        assert num_join_clusters <= 1.5 * target_cluster_count, f"Would get {num_join_clusters} clusters for join columns, but aimed at at most {target_cluster_count} clusters"
    
        # only applies to scan columns
        desired_scan_clusters_count = math.ceil(target_cluster_count / num_join_clusters)
        individual_distinct_values = [self.distinct_values[column] for column in scan_columns]
        log_distinct_values = [math.ceil(0.5+np.log2(x)) for x in individual_distinct_values]
        log_distinct_values_product = reduce(operator.mul, log_distinct_values, 1)
        assert log_distinct_values_product > 0, "cannot have a distinct value count of 0"

        global_modification_factor = desired_scan_clusters_count / log_distinct_values_product
        num_scan_dimensions = len(scan_columns)
        individual_modification_factor = np.power(global_modification_factor, 1.0 / max(1, num_scan_dimensions))
        
        join_column_cluster_counts = [CLUSTERS_PER_JOIN_COLUMN] * len(join_columns)
        scan_column_cluster_counts = [math.ceil(x * individual_modification_factor) for x in log_distinct_values]
        
        
        # Merge join and scan columns
        join_index = 0
        scan_index = 0
        cluster_counts = []
        for clustering_column in clustering_columns:
            if clustering_column in join_columns:
                cluster_counts.append(join_column_cluster_counts[join_index])
                join_index += 1
            elif clustering_column in scan_columns:
                cluster_counts.append(scan_column_cluster_counts[scan_index])
                scan_index += 1
        assert join_index == len(join_columns), f"Processed the wrong number of join columns: {join_index} instead of {len(join_column_cluster_counts)}"
        assert scan_index == len(scan_columns), f"Processed the wrong number of scan columns: {scan_index} instead of {len(scan_column_cluster_counts)}"
        assert len(cluster_counts) == len(clustering_columns), f"Expected {len(clustering_columns)} cluster counts, but got {len(cluster_counts)}"
        
        # testing
        actual_cluster_count = reduce(operator.mul, cluster_counts, 1)
        assert actual_cluster_count > 0, "there was a split up factor of 0"
        assert actual_cluster_count <= 1.5 * target_cluster_count, f"Wanted at most {target_cluster_count} clusters, but got {actual_cluster_count}\nConfig: {clustering_columns}\nCluster sizes: {cluster_counts}"
        estimated_chunksize = self.table_size / actual_cluster_count
        assert estimated_chunksize <= self.target_chunksize, "chunks should be smaller, not larger than target_chunksize"
        allowed_percentage = 0.55
        if estimated_chunksize < allowed_percentage * self.target_chunksize:
            print(f"Warning: chunks should not be too much smaller than target_chunksize: {estimated_chunksize} < {allowed_percentage} * {self.target_chunksize}")
        #assert estimated_chunksize >= allowed_percentage * self.target_chunksize, f"chunks should not be too much smaller than target_chunksize: {estimated_chunksize} < {allowed_percentage} * {self.target_chunksize}"
        
        return cluster_counts

In [16]:
model = create_model("lineitem", 4)
model.suggest_clustering(20)

['l_shipdate', 'l_quantity', 'l_discount', 'l_receiptdate', 'l_suppkey', 'l_orderkey', 'l_partkey']


[[[('l_shipdate', 4), ('l_suppkey', 3), ('l_orderkey', 3), ('l_partkey', 3)],
  'l_orderkey',
  26385936866],
 [[('l_receiptdate', 4),
   ('l_suppkey', 3),
   ('l_orderkey', 3),
   ('l_partkey', 3)],
  'l_orderkey',
  26399322584],
 [[('l_shipdate', 12), ('l_orderkey', 3), ('l_partkey', 3)],
  'l_orderkey',
  26666941325],
 [[('l_shipdate', 12), ('l_orderkey', 3), ('l_partkey', 3)],
  'l_orderkey',
  26666941325],
 [[('l_shipdate', 12), ('l_orderkey', 3), ('l_partkey', 3)],
  'l_orderkey',
  26666941325],
 [[('l_receiptdate', 4),
   ('l_suppkey', 3),
   ('l_orderkey', 3),
   ('l_partkey', 3)],
  'l_shipdate',
  26669096560],
 [[('l_receiptdate', 12), ('l_orderkey', 3), ('l_partkey', 3)],
  'l_orderkey',
  26672516068],
 [[('l_receiptdate', 12), ('l_orderkey', 3), ('l_partkey', 3)],
  'l_orderkey',
  26672516068],
 [[('l_receiptdate', 12), ('l_orderkey', 3), ('l_partkey', 3)],
  'l_orderkey',
  26672516068],
 [[('l_shipdate', 6), ('l_discount', 2), ('l_orderkey', 3), ('l_partkey', 3)],


In [15]:
assert_correct_statistics_loaded()

def extract_single_table(table_scans, table_name):
    return table_scans[table_scans['TABLE_NAME'] == table_name]

def get_table_names(table_scans):
    return table_scans['TABLE_NAME'].unique()


def default_benchmark_config():    
    if BENCHMARK == "TPCH":
        config = {
            'lineitem': [['l_shipdate', 2]],
            'orders': [['o_orderdate', 2]]
        }
    elif BENCHMARK == "TPCDS":
        config = dict()
    else:        
        raise Exception("unknown benchmark, please provide a default config")
    return config

def get_correlations():
    if BENCHMARK == "TPCH":
        correlations = {
            'lineitem': {
                'l_shipdate': ['l_receiptdate', 'l_commitdate'],
                'l_receiptdate': ['l_shipdate', 'l_commitdate'],
            }
        }
    elif BENCHMARK == "TPCDS":
        correlations = dict()
    else:
        raise Exception("unknown benchmark, please provide correlation information")
        
    return correlations


def format_table_clustering(clustering_config):
    # input format: List of [ [(column, split)+ ], sorting_column, runtime ]
    # output format: List of [ (column, split)+ ] - sorting column integrated if necessary
    
    assert len(clustering_config) == 3, "config should have exactly three entries: clustering columns, sort column, runtime"
    clustering_columns = clustering_config[0]
    assert len(clustering_columns) <= 3, "atm the model is at most 3-dimensional"
    #print(f"clustering columns are {clustering_columns}")
    last_clustering_column = clustering_columns[-1]
    last_clustering_column_name = last_clustering_column[0]
    #print(f"last column is {last_clustering_column_name}")
    sorting_column = clustering_config[1]
    #print(f"sort column is {sorting_column}")
    
    result = clustering_columns
    if last_clustering_column_name != sorting_column:
        result = clustering_columns + [(sorting_column, 1)]
        
    #print(f"in: {clustering_config}")
    #print(f"out: {result}")
    
    return result

def get_config_name(clustering_config):
    # Input: config-dict
    
    # List of lists. Each secondary list contains clustering information for a table
    table_configs = [clustering_config[table] for table in clustering_config]
    config_entries = [[f"{config_entry[0]}-{config_entry[1]}" for config_entry in config] for config in table_configs]
    table_entries = ["_".join(config) for config in config_entries]
    return "_".join(table_entries)


def create_benchmark_configs():    
    start_time = datetime.now()
    clusterings = {"default" : default_benchmark_config()}
    query_frequencies = get_query_frequencies()
    
    distinct_values = get_distinct_values_count()
    joins = load_join_statistics()    
    sorted_columns_during_creation = get_sorted_columns_during_creation()
    correlations = get_correlations()
    table_names = get_table_names(scans)
    for table_name in table_names:
        start_time_table = datetime.now()
        single_table_scans = extract_single_table(scans, table_name)
        table_size = table_sizes[table_name]
        if table_size <= 3 * CHUNK_SIZE:
            print(f"Not computing clustering for {table_name}, as it has only {table_size} rows")
            continue

        model = DisjointClustersModel(query_frequencies, table_name, single_table_scans, table_size, distinct_values[table_name], CHUNK_SIZE, correlations.get(table_name, {}), joins, sorted_columns_during_creation)
        table_clusterings = model.suggest_clustering(3)
        for table_clustering in table_clusterings:
            config = default_benchmark_config()
            config[table_name] = format_table_clustering(table_clustering)
            config_name = get_config_name(config)
            clusterings[config_name] = config
        end_time_table = datetime.now()
        print(f"Done computing clustering for {table_name} ({end_time_table - start_time_table})")
    
    end_time = datetime.now()
    print(f"Computed all clusterings in {end_time - start_time}")
    
    return clusterings

create_benchmark_configs()

# TODO:
#  joins costs are multiplied with 0
#  still, the model suggests some join columns - why? are they useful for pruning?

Not computing clustering for customer, as it has only 150000 rows
['o_orderstatus', 'o_orderdate', 'o_orderkey', 'o_custkey']
Done computing clustering for orders (0:00:09.662568)
Not computing clustering for nation, as it has only 25 rows
['p_name', 'p_type', 'p_brand', 'p_container', 'p_size', 'p_partkey']
Done computing clustering for part (0:00:32.685082)
Not computing clustering for region, as it has only 5 rows
['l_shipdate', 'l_quantity', 'l_discount', 'l_receiptdate', 'l_suppkey', 'l_orderkey', 'l_partkey']
Done computing clustering for lineitem (0:00:40.875373)
Not computing clustering for supplier, as it has only 10000 rows
Computed all clusterings in 0:01:23.580248


{'default': {'lineitem': [['l_shipdate', 2]], 'orders': [['o_orderdate', 2]]},
 'l_shipdate-2_o_custkey-5_o_orderkey-6_o_orderdate-1': {'lineitem': [['l_shipdate',
    2]],
  'orders': [('o_custkey', 5), ('o_orderkey', 6), ('o_orderdate', 1)]},
 'l_shipdate-2_o_custkey-5_o_orderkey-6': {'lineitem': [['l_shipdate', 2]],
  'orders': [('o_custkey', 5), ('o_orderkey', 6)]},
 'l_shipdate-2_o_orderdate-4_o_orderkey-7_o_custkey-1': {'lineitem': [['l_shipdate',
    2]],
  'orders': [('o_orderdate', 4), ('o_orderkey', 7), ('o_custkey', 1)]},
 'l_shipdate-2_o_orderdate-2_p_name-2_p_partkey-2_p_type-1': {'lineitem': [['l_shipdate',
    2]],
  'orders': [['o_orderdate', 2]],
  'part': [('p_name', 2), ('p_partkey', 2), ('p_type', 1)]},
 'l_shipdate-2_o_orderdate-2_p_partkey-3_p_size-2_p_type-1': {'lineitem': [['l_shipdate',
    2]],
  'orders': [['o_orderdate', 2]],
  'part': [('p_partkey', 3), ('p_size', 2), ('p_type', 1)]},
 'l_shipdate-2_o_orderdate-2_p_name-2_p_partkey-2_p_brand-1': {'lineitem'

Outdated code fragments (older model versions) are kept below.

In [16]:
class SimpleModel(AbstractModel):
    
    def __init__(self, table_scans, correlations = {}):
        super().__init__(table_scans, correlations)        
    
    def suggest_clustering(self, first_k=1):
        interesting_columns = self.extract_interesting_columns()

        pairs = itertools.product(interesting_columns, interesting_columns)                
        total_runtimes = [self.estimate_total_runtime(self.table_scans, clustering_columns) for clustering_columns in pairs]
        total_runtimes.sort(key=lambda x: x[1], reverse=False)
        
        return total_runtimes[0:first_k]
        
    
    def estimate_total_runtime(self, single_table, clustering_columns):
        total_runtime = 0
        
        pruning_col = clustering_columns[0]
        sorted_col = clustering_columns[1]
        def compute_runtime(row):
            col_name = row['COLUMN_NAME']
            if pruning_col == sorted_col:
                if col_name == pruning_col:
                    return row['optimal_log_runtime']
                else:
                    if col_name in self.correlations.get(pruning_col, []):
                        # correlated to pruning column -> a lot of pruning, no sortedness
                        # TODO: better measure correlation
                        return 1.2 * row['optimal_runtime']
                    else:
                        return row['RUNTIME_NS']

            else:
                if col_name == pruning_col:
                    return row['optimal_runtime']
                elif col_name == sorted_col:
                    # TODO: should this be affected by correlation?
                    # we will get less chunks, so a linear scan should be close to optimal_runtime,
                    # but log time should beat it anyway
                    return row['log_runtime']
                else:
                    if col_name in self.correlations.get(pruning_col, []):
                        # correlated to pruning column -> a lot of pruning, no sortedness
                        # TODO: better measure correlation
                        return 1.2 * row['optimal_runtime']
                    else:
                        return row['RUNTIME_NS']
                    
        effective_runtime = single_table.apply(compute_runtime, axis=1)
        return [clustering_columns, effective_runtime.sum()]

In [17]:
# Store additional statistics
# TODO keep?

assert_correct_statistics_loaded()

def round_up_to_chunksize(row):
    if row['OUTPUT_ROWS'] % CHUNK_SIZE == 0:
        return row['OUTPUT_ROWS']
    else:
        return row['OUTPUT_ROWS'] + (CHUNK_SIZE - (row['OUTPUT_ROWS'] % CHUNK_SIZE))

scans['pruned_minimum_input_rows'] = scans.apply(round_up_to_chunksize, axis=1)

scans['selectivity'] = scans['OUTPUT_ROWS'] / scans['INPUT_ROWS']
scans['actual_selectivity'] = scans['SINGLE_OUTPUT_ROWS'] / scans['SINGLE_INPUT_ROWS']

scans['time_per_ir'] = scans['RUNTIME_NS'] / scans['INPUT_ROWS']
scans['time_per_or'] = scans['RUNTIME_NS'] / scans['OUTPUT_ROWS']

# optimal runtime assuming perfect pruning, but not sortedness
scans['optimal_runtime'] = scans['time_per_ir'] * scans['pruned_minimum_input_rows']
scans['runtime_gain'] = scans['RUNTIME_NS'] - scans['optimal_runtime']


# log runtime for sorted columns
scans['log_runtime'] = np.log2(scans['RUNTIME_NS'])
scans['optimal_log_runtime'] = np.log2(1+scans['optimal_runtime'])
scans

Unnamed: 0,QUERY_HASH,COLUMN_TYPE,TABLE_NAME,COLUMN_NAME,INPUT_ROWS,OUTPUT_ROWS,RUNTIME_NS,DESCRIPTION,SINGLE_INPUT_ROWS,SINGLE_OUTPUT_ROWS,...,benefits_from_sorting,useful_for_pruning,pruned_minimum_input_rows,actual_selectivity,time_per_ir,time_per_or,optimal_runtime,runtime_gain,log_runtime,optimal_log_runtime
0,bac00c0bdbf62ea,REFERENCE,customer,c_phone,681435,190499,182312504,TableScan Impl: ExpressionEvaluator (SUBSTR(c_...,1500000,419123,...,False,False,196605,0.279415,267.542031,957.026042,5.260010e+07,1.297124e+08,27.441838,25.648562
1,2687bf4da454552b,REFERENCE,customer,c_phone,681352,190434,184210492,TableScan Impl: ExpressionEvaluator (SUBSTR(c_...,1500000,419793,...,False,False,196605,0.279862,270.360243,967.319344,5.315418e+07,1.310563e+08,27.456780,25.663680
2,2bd757c748d34189,REFERENCE,customer,c_phone,681223,190942,186800722,TableScan Impl: ExpressionEvaluator (SUBSTR(c_...,1500000,419992,...,False,False,196605,0.279995,274.213763,978.311330,5.391180e+07,1.328889e+08,27.476925,25.684098
3,7ce8aa4cc8eabfd8,REFERENCE,customer,c_phone,680683,190698,183656825,TableScan Impl: ExpressionEvaluator (SUBSTR(c_...,1500000,419626,...,False,False,196605,0.279751,269.812563,963.076828,5.304650e+07,1.306103e+08,27.452437,25.660754
4,3bf533ddc6f54ed4,REFERENCE,customer,c_phone,680526,190820,183580973,TableScan Impl: ExpressionEvaluator (SUBSTR(c_...,1500000,420284,...,False,False,196605,0.280189,269.763349,962.063583,5.303682e+07,1.305441e+08,27.451841,25.660491
5,74335a369db42f54,REFERENCE,customer,c_phone,682247,190532,183523699,TableScan Impl: ExpressionEvaluator (SUBSTR(c_...,1500000,419794,...,False,False,196605,0.279863,268.998909,963.217197,5.288653e+07,1.306372e+08,27.451391,25.656397
6,aeebd3c094273d3e,REFERENCE,customer,c_phone,681571,190524,182585251,TableScan Impl: ExpressionEvaluator (SUBSTR(c_...,1500000,420242,...,False,False,196605,0.280161,267.888820,958.332026,5.266828e+07,1.299170e+08,27.443995,25.650431
7,6730c267d3eac48a,DATA,orders,o_orderstatus,15000000,7309184,88478994,TableScan Impl: ColumnVsValue o_orderstatus = 'F',15000000,7309184,...,True,True,7339920,0.487279,5.898600,12.105181,4.329525e+07,4.518374e+07,26.398832,25.367705
8,6730c267d3eac48a,DATA,nation,n_name,25,1,36669,TableScan Impl: ColumnVsValue n_name = 'IRAN',25,1,...,True,True,65535,0.040000,1466.760000,36669.000000,9.612412e+07,-9.608745e+07,15.162273,26.518395
9,6ec3126b032024be,DATA,orders,o_orderstatus,15000000,7309184,89032941,TableScan Impl: ColumnVsValue o_orderstatus = 'F',15000000,7309184,...,True,True,7339920,0.487279,5.935529,12.180969,4.356631e+07,4.546663e+07,26.407836,25.376710


In [18]:
GAIN_COLUMN = 'runtime_gain'

scans_groupby_columnname = scans.groupby(['TABLE_NAME', 'COLUMN_NAME'])
sum_of_gains = pd.DataFrame(scans_groupby_columnname[GAIN_COLUMN].sum())
sum_of_gains.sort_values(by=['TABLE_NAME', GAIN_COLUMN], ascending=[True, False])

Unnamed: 0_level_0,Unnamed: 1_level_0,runtime_gain
TABLE_NAME,COLUMN_NAME,Unnamed: 2_level_1
customer,c_phone,1308860000.0
customer,c_mktsegment,37062380.0
lineitem,l_shipdate,3668620000.0
lineitem,l_receiptdate,1472151000.0
lineitem,l_shipmode,1250315000.0
lineitem,l_discount,645719200.0
lineitem,l_quantity,210563500.0
nation,n_name,-7202164000.0
orders,o_orderdate,1378964000.0
orders,o_orderstatus,312702200.0


In [19]:
assert_correct_statistics_loaded()

if BENCHMARK == "TPCH":
    TABLE = "lineitem"
else:    
    TABLE = "customer_demographics"

import itertools

def extract_single_table(table_name):
    return scans[scans['TABLE_NAME'] == table_name]

def extract_interesting_columns(df):
    return list(df['COLUMN_NAME'].unique())


correlations = {
    'l_shipdate': ['l_receiptdate', 'l_commitdate'],
    'l_receiptdate': ['l_shipdate', 'l_commitdate'],
    'l_commitdate': ['l_receiptdate', 'l_shipdate']
}
#correlations = {}
def table_sorting_options(table_name):
    single_table = extract_single_table(table_name)
    interesting_cols = extract_interesting_columns(single_table)
    pairs = itertools.product(interesting_cols, interesting_cols)
    
    total_times = []
    for pair in pairs:
        pruning_col = pair[0]
        sorted_col = pair[1]

        def compute_runtime(row):
            col_name = row['COLUMN_NAME']
            if pruning_col == sorted_col:
                if col_name == pruning_col:
                    return row['optimal_log_runtime']
                else:
                    if col_name in correlations.get(pruning_col, []):
                        # correlated to pruning column -> a lot of pruning, no sortedness
                        # TODO: better measure correlation
                        return 1.2 * row['optimal_runtime']
                    else:
                        return row['RUNTIME_NS']

            else:
                if col_name == pruning_col:
                    return row['optimal_runtime']
                elif col_name == sorted_col:
                    # TODO: should this be affected by correlation?
                    # we will get less chunks, so a linear scan should be close to optimal_runtime,
                    # but log time should beat it anyway
                    return row['log_runtime']
                else:
                    if col_name in correlations.get(pruning_col, []):
                        # correlated to pruning column -> a lot of pruning, no sortedness
                        # TODO: better measure correlation
                        return 1.2 * row['optimal_runtime']
                    else:
                        return row['RUNTIME_NS']

        effective_runtime = single_table.apply(compute_runtime, axis=1)
        total_times.append([pair, effective_runtime.sum()])    
    total_times = pd.DataFrame(total_times, columns=['columns', 'time'])    
    return total_times

options = table_sorting_options(TABLE)
options.sort_values(by=['time'], ascending=True)

Unnamed: 0,columns,time
20,"(l_receiptdate, l_shipdate)",2966448000.0
0,"(l_shipdate, l_shipdate)",3019390000.0
15,"(l_shipmode, l_shipdate)",3188283000.0
10,"(l_discount, l_shipdate)",3792880000.0
5,"(l_quantity, l_shipdate)",4228035000.0
3,"(l_shipdate, l_shipmode)",6154668000.0
2,"(l_shipdate, l_discount)",6791128000.0
23,"(l_receiptdate, l_shipmode)",7020858000.0
1,"(l_shipdate, l_quantity)",7197625000.0
4,"(l_shipdate, l_receiptdate)",7297399000.0


In [20]:
aggregates = pd.read_csv(f"{STATISTICS_PATH}/aggregates.csv", sep=',')

# it looks like column names are mixed up.
# COLUMN_NAME -> actually GROUP_BY_COLUMN_COUNT
# GROUP_BY_COLUMN_COUNT -> actually AGGREGATE_COLUMN_COUNT
# AGGREGATE_COLUMN_COUNT -> actually COLUMN_NAME

COL_NAME = 'AGGREGATE_COLUMN_COUNT'
GROUPBY_COL = 'COLUMN_NAME'
AGG_COL = 'GROUP_BY_COLUMN_COUNT'

# All aggregates have to read the entire table, so we cannot skip chunks.
# But getting all groups consecutive could provide a speedup
# As a result, we care only about aggregates with group by columns

interesting_aggregates = aggregates[aggregates[GROUPBY_COL] > 0]
stats = interesting_aggregates.groupby(['TABLE_NAME', COL_NAME])
out_columns = pd.DataFrame(stats['OUTPUT_ROWS'].max())
out_columns.sort_values(by=['TABLE_NAME', 'OUTPUT_ROWS'], ascending=[True, False])
aggregates[aggregates['COLUMN_TYPE'] == 'DATA']

Unnamed: 0,QUERY_HASH,AGGREGATE_HASH,COLUMN_TYPE,TABLE_NAME,COLUMN_NAME,GROUP_BY_COLUMN_COUNT,AGGREGATE_COLUMN_COUNT,INPUT_ROWS,OUTPUT_ROWS,RUNTIME_NS,DESCRIPTION


In [21]:
scan_time_per_column = scans.groupby(['COLUMN_NAME'])
accumulated_scan_times = pd.DataFrame(scan_time_per_column['RUNTIME_NS'].sum())
total_scan_runtime = accumulated_scan_times['RUNTIME_NS'].sum()
assert total_scan_runtime == scans['RUNTIME_NS'].sum(), f"{total_scan_runtime}, {scans['RUNTIME_NS'].sum()}"
print(f"total scan runtime: {total_scan_runtime}")

scan_time_per_column_prunable = scans[scans['useful_for_pruning']].groupby(['COLUMN_NAME'])
accumulated_prunable_scan_times = pd.DataFrame(scan_time_per_column_prunable['RUNTIME_NS'].sum())
total_prunable_scan_runtime = accumulated_prunable_scan_times['RUNTIME_NS'].sum()
print(f"total prunable scan runtime: {total_prunable_scan_runtime}")
print(f"{100*total_prunable_scan_runtime/total_scan_runtime}% of scan runtime amount to prunable scans")

accumulated_scan_times.sort_values(['RUNTIME_NS'], ascending=False)

total scan runtime: 27362435625
total prunable scan runtime: 14034972749
51.2928488580044% of scan runtime amount to prunable scans


Unnamed: 0_level_0,RUNTIME_NS
COLUMN_NAME,Unnamed: 1_level_1
l_shipdate,8264282387
o_comment,7605398277
o_orderdate,1909147688
c_phone,1839853863
l_receiptdate,1736861963
l_shipmode,1460384674
p_name,1051621410
l_discount,823923974
p_size,649871849
o_orderstatus,612334648


In [22]:
joins = load_join_statistics()

print(joins['PROBE_COLUMN'].unique())

join_time_per_column = joins.groupby(['PROBE_COLUMN'])

accumulated_join_times = pd.DataFrame(join_time_per_column['RUNTIME_NS'].sum())
print(len(accumulated_join_times))
total_join_runtime = accumulated_join_times['RUNTIME_NS'].sum()
#assert total_join_runtime == joins['RUNTIME_NS'].sum(), f"{total_join_runtime},{joins['RUNTIME_NS'].sum()}"
print(f"total join runtime: {total_join_runtime}")

joins[joins.apply(lambda x : x['PROBE_COLUMN'] not in ['o_custkey' ,'n_nationkey' ,'s_nationkey' ,'l_suppkey', 's_suppkey',
 'l_orderkey', 'o_orderkey', 'p_partkey' ,'l_partkey' ,'ps_suppkey',
 'c_nationkey' ,'r_regionkey' ,'c_custkey' ,'ps_partkey'] ,axis=1)]

print(f"for {BENCHMARK}, joins take about {total_join_runtime / total_scan_runtime} times longer than table scans")
accumulated_join_times.sort_values(['RUNTIME_NS'], ascending=False)

['c_custkey' 's_nationkey' 'l_suppkey' 'l_orderkey' 'o_orderkey'
 'ps_partkey' 'l_partkey' 's_suppkey' 'n_nationkey' 'n_regionkey'
 'c_nationkey' 'ps_suppkey' 'p_partkey' 'o_custkey' 'ps_supplycost' nan]
15
total join runtime: 437246611576
for TPCH, joins take about 15.979813258162759 times longer than table scans


Unnamed: 0_level_0,RUNTIME_NS
PROBE_COLUMN,Unnamed: 1_level_1
l_orderkey,164621073876
l_partkey,82724758653
o_orderkey,56692008431
c_custkey,47987057536
l_suppkey,39474563381
o_custkey,18614106082
ps_partkey,13457584606
p_partkey,9783801497
ps_suppkey,2330858192
c_nationkey,921856448
