# General Model with Chunk-based Configuration Dependencies (CCD)

In [1]:
from pyomo.environ import *
from pyomo.opt import SolverFactory

import pandas as pd
import numpy as np

import itertools
import operator
import csv
import os

### Load data

In [2]:
# performance measurements of the benchmark queries
df_perf = pd.read_csv('../data/benchmark/runtimes.csv')
# memory consumption of segments
df_memory = pd.read_csv('../data/benchmark/memory_consumption.csv')
# memory consumption of indexes
df_memory_index = pd.read_csv('../data/benchmark/memory_consumption_index.csv')

# calibration data for hyrise 
df_poslist_scan_penalty = pd.read_csv('../data/calibration/poslist_scan_penalty.csv')
# calibration data for storage devices
df_storage_penalty = pd.read_csv('../data/calibration/storage_penalty.csv')

# workload definition 
df_workload = pd.read_csv('../data/workloads/workload_1/workload.csv')
# chunk access statistics 
df_chunk_access = pd.read_csv('../data/workloads/workload_1/chunk_access.csv')

# Apply indexes on pandas dataframes
df_perf.set_index(['ORDER_BY', 'ENCODING', 'SCAN_COLUMN', 'SELECTIVITY', 'INDEX', 'SCAN_TYPE'], inplace=True)
df_memory.set_index(['ORDER_BY', 'ENCODING', 'COLUMN', 'CHUNK_ID'], inplace=True)
df_memory_index.set_index(['ORDER_BY', 'ENCODING', 'COLUMN', 'CHUNK_ID'], inplace=True)
df_poslist_scan_penalty.set_index(['ENCODING'], inplace=True)
df_storage_penalty.set_index(['STORAGE', 'ENCODING', 'INDEX'], inplace=True)
df_chunk_access.set_index(['QUERY_ID', 'CHUNK'], inplace=True)

### Helper functions (print)

In [30]:
# copied from data_transformation file
DICT_ENCODINGS = ["Dictionary", "Unencoded", "LZ4", "RunLength", "FoR-SIMD"]
DICT_COLUMNS = ["captain_id", "latitude", "longitude", "timestamp", "captain_status"]

def print_header(display, longest_str, column_dict, storage):
    print('Storage:', storage)
    if display in ['conf_values', 'conf_keys']:
        for column_name in ['CHUNK'] + column_dict:
            print(column_name.ljust(longest_str), end='')
        print('')
    elif display == 'costs':
        print('Chunk'.ljust(longest_str), end='')
        print('Memory'.ljust(longest_str), end='')
        print('Objective'.ljust(longest_str), end='')
        print('')

def print_conf(model, display, column_dict, encoding_dict):
    longest_str = max([len(encoding) for encoding in encoding_dict]) + 10
    line_break_count = 0
    config = []
     
    for item in model.x:
        if round(model.x[item].value) == 1.0:
            config += [item]
    
    for d in model.d:
        print_header(display, longest_str, column_dict, d)
        for item in config:
            if item[3] == d:
                order = item[1]
                encoding = encoding_config[valid_configs[item[2]][0]]
                index = index_config[valid_configs[item[2]][1]] 
            
                print(str(item[0]).ljust(longest_str), end='')
            
                if display in ['conf_values', 'conf_keys']:    
                    for i in range(len(column_dict)):
                        val = ''
                        if display == 'conf_values':
                            val = display_conf_values(encoding[i], order, index[i], encoding_dict, i)
                        elif display == 'conf_keys':
                            val = str([item[0], encoding[i], order, index[i]])
                
                        print(val.ljust(longest_str), end='')
            
                    print('')
                
                if display == 'costs':
                    memory = str(model.m[item[0], item[1], item[2]])
                    print(memory.ljust(longest_str), end='')
                    objective = display_obj(item, model) 
                    print(str(objective).ljust(longest_str), end='')
                    print('')
        print('')

def display_conf_values(encoding, order, index, encoding_dict, i):
    s  = '- ' if order != i else 'S '
    s += '- ' if index == 0 else 'I '
    s += encoding_dict[encoding]
    return s

def display_obj(item, model):
    val = 0
    for q in model.q:
        val += model.c[q, item[0], item[1], item[2], item[3]] * df_workload.iloc[q]['FREQUENCY']
    return val
    
def print_result(result, model, column_dict=DICT_COLUMNS, encoding_dict=DICT_ENCODINGS):
    print(f'Solving for budget;')
    for i in range(len(model.d)):
        print(f'  Storage: {i}     Storage Size: {model.b[i].value}') 
    print('')
    condition = result.json_repn()['Solver'][0]['Termination condition']
    print(f'Result: {condition}', end='')
    if condition == 'optimal':
        print(f" (walltime: {float(result.json_repn()['Solver'][0]['Wall time']):.4f}s)")
        print(f'Objective: {model.obj.expr()}')
        for d in model.d:
            print(f'  {d}: {model.MemoryBudgetConstraint[d].body()/1_000_000} MB')
        print('')
        print_conf(model, 'conf_values', DICT_COLUMNS, DICT_ENCODINGS)
    print('')

### Export

In [4]:
# settings for csv export of configurations
FOLDER = '../data/config/'
MODEL_NAME = 'CCD'
SEPARATOR = '|'
CONVERSION_FACTOR = 1_000_000 # Bytes to Megabytes 

def serialize(val, seperator):
    config_str = ''
    for key in val:
        config_str += str(val[key].value//CONVERSION_FACTOR) + seperator
    return config_str[:-1]

def serialize_constraint(constraint, storage):
    config_str = ''
    for key in range(len(storage)):
        config_str += str(value(model.MemoryBudgetConstraint[key].body)/CONVERSION_FACTOR) + SEPARATOR
    return config_str[:-1]

def segment_memory(chunk, column, encoding, order, index):
    memory_consumption = df_memory.loc[(order, encoding, column, chunk)]['SIZE_IN_BYTES']
    if index == 1:
        memory_consumption += df_memory_index.loc[(order, encoding, column, chunk)]['SIZE_IN_BYTES']
    return memory_consumption    

def get_file_name(model, folder):
    return folder + MODEL_NAME + '_' + serialize(model.b, '-') + '.csv'

def write_file(file_path, header, lines, d = ','):          
    with open(file_path, mode='w') as out_file:
        csv_writer = csv.writer(out_file, delimiter=d)
        if header:
            csv_writer.writerow(header)
           
        for line in lines:
            csv_writer.writerow(line)
                
    out_file.close()
    
def export_config(model):
    lines = []
    
    config = model.x
    for item in config:
        if round(config[item].value) == 1.0:
            encoding = encoding_config[valid_configs[item[2]][0]]
            index = index_config[valid_configs[item[2]][1]]
            order = item[1]
            
            for column in range(len(N)):
                lines += [[item[0], column, encoding[column], order, index[column], item[3]]]
            
    write_file(get_file_name(model, FOLDER), ['CHUNK', 'COLUMN', 'ENCODING', 'SORT', 'INDEX', 'STORAGE'], lines)

## Helper Functions

In [5]:
# Indexed columns need to have dictionary encoding
# The function checks for a given index and encoding config if there is no violation of index and 
# encoding rule for any segment 
def valid_configuration(encoding, index):
    for i in range(len(index)):
        if index[i] == 1 and encoding[i] != 0:
            return False
    return True 

# In Hyrise, scan operations performed on the pos list are significantly slower than isolated 
# executed scan operations. To consider this in our cost estimation, we introduce an encoding-specific 
# parameter, which can be measured during the model calibration. 
def scan_order_penalty(scan_order, encoding):
    return 1 if (scan_order == 0) else df_poslist_scan_penalty.loc[(encoding)]['PENALTY']

# storage penalties
def storage_penalty(encoding, index, storage):
    return df_storage_penalty.loc[(storage, encoding, index)]['PENALTY']

In [27]:
model = ConcreteModel()

STORAGE_BUDGET = {0:3_000_000_000, 1:3_000_000_000}

# set of order configurations 
O = df_perf.index.levels[0].unique()

# set of encodings
E = df_perf.index.levels[1].unique()

# set of columns
N = df_perf.index.levels[2].unique()

# set of chunks
M = df_memory.index.levels[3].unique()

# generate all configurations 
encoding_config = [item for item in itertools.product(range(len(E)), repeat=len(N))]
index_config = [item for item in itertools.product(range(2), repeat=len(N))]

# index vector configurations
model.i = Set(initialize=range(0, len(index_config)))

# encoding vector configurations
model.e = Set(initialize=range(0, len(encoding_config)))

# query index 
model.q = Set(initialize=range(0, df_workload.shape[0]))

# storage devices
model.d = Set(initialize=range(0, len(STORAGE_BUDGET)))

# storage budget per storage device 
model.b = Param(model.d, within=NonNegativeIntegers, initialize=STORAGE_BUDGET, mutable=True)

# implicit rule that hyrise only supports indicies on dictionary encoded segments 
valid_configs = [[e, i] for e in model.e for i in model.i if valid_configuration(encoding_config[e], index_config[i])]

# set of valid configurations 
model.v = Set(initialize=range(0, len(valid_configs)))

# decision variable to describe chosen combination of options 
model.x = Var(M, O, model.v, model.d, within=Binary) 

In [24]:
# returns the approximated costs of single segment scan based on the overall column scan costs
def segment_access_init(model, m, s):
    q = df_workload.iloc[s]['QUERY_ID']
 
    if df_chunk_access.loc[(q, m)]['ACCESSED'] == 0:
        return 0
    return (1/df_chunk_access.loc[(q, slice(None))]['ACCESSED'].sum())
model.a = Param(M, model.q, within=NonNegativeReals, initialize=segment_access_init)

### Pre-Calculation of Performance Costs for Queries 
Since the calculation of the scan costs based on the scanned columns and the defined cost model (see performance_per_query) can take several minutes, we can pre-calculate the values.     

In [8]:
# Assumption the column scans of one query are "and" connected  
def performance_per_query(q, m, e, o, i, d):
    scan = df_workload.iloc[q]
    
    # set index value to 0 for all scans with a scan order value >= 1
    # indexes can only be used to speed up the first scan
    index_value = index_config[i][scan['SCAN_COLUMN']] if scan['SCAN_ORDER'] == 0 else 0
    
    return df_perf.loc[(o, encoding_config[e][scan['SCAN_COLUMN']], scan['SCAN_COLUMN'], scan['SELECTIVITY'], 
                   index_value, scan['SCAN_TYPE'])]['TIME'] * model.a[m, q] * scan['SCAN_FACTOR'] * \
                   scan_order_penalty(scan['SCAN_ORDER'], encoding_config[e][scan['SCAN_COLUMN']]) * \
                   storage_penalty(encoding_config[e][scan['SCAN_COLUMN']], index_value, d)

def pre_calc_performance_per_query():
    with open('../data/pre_calc/perf_per_query.csv', mode='w') as out_file:
        csv_writer = csv.writer(out_file)
        csv_writer.writerow(['q', 'm', 'o', 'v', 'd', 'time'])
        
        for q in model.q:
            for m in M:
                for o in O:
                    for d in model.d:
                        for v in model.v:
                            csv_writer.writerow([q, m, o, v, d, performance_per_query(q, m, valid_configs[v][0], o, valid_configs[v][1], d)])
        
    out_file.close()  
    
#pre_calc_performance_per_query()
print('Generated Query Performance Data.')

Generated Query Performance Data.


### Pre-Calculation of Memory Consumption of Chunks 
Since the calculation of the memory consumption for each chunk configuration can take up several to minutes.  

In [11]:
def memory_per_chunk(m, e, o, i):
    memory_consumption = 0
    encoding = encoding_config[e]
    for n in range(len(N)):
        memory_consumption += df_memory.loc[(o, encoding[n], n, m)]['SIZE_IN_BYTES']
        if index_config[i][n] == 1:
            memory_consumption += df_memory_index.loc[(o, encoding[n], n, m)]['SIZE_IN_BYTES']
    return memory_consumption

def pre_calc_memory_per_chunk():
    with open('../data/pre_calc/memory_per_chunk.csv', mode='w') as out_file:
        csv_writer = csv.writer(out_file)
        csv_writer.writerow(['m', 'o', 'v', 'memory'])
        
        for m in M:
            for o in O:
                for v in model.v:
                        csv_writer.writerow([m, o, v, memory_per_chunk(m, valid_configs[v][0], o, valid_configs[v][1])])
        
    out_file.close() 
    
#pre_calc_memory_per_chunk()
print('Generated Memory Consumption Data.')

Generated Memory Consumption Data.


### Loading Pre-Calc Values

In [21]:
df_pre_calc_perf = np.genfromtxt('../data/pre_calc/perf_per_query.csv', delimiter=',')[1:]
df_pre_calc_mem = np.genfromtxt('../data/pre_calc/memory_per_chunk.csv', delimiter=',')[1:]

# define sections length in pre loaded data sets 
LENGHT_QUERY_SECTION = len(df_pre_calc_perf) / len(model.q)
LENGHT_CHUNK_SECTION = LENGHT_QUERY_SECTION / len(M)
LENGTH_ORDER_SECTION = LENGHT_CHUNK_SECTION / len(O)
LENGHT_STORAGE_SECTION = LENGTH_ORDER_SECTION / len(model.d)

LENGTH_MEM_CHUNK_SECTION = len(df_pre_calc_mem) / len(M)
LENGTH_MEM_ORDER_SECTION = LENGTH_MEM_CHUNK_SECTION / len(O)

# methods to get correct line for a given configuration in the files
def calc_perf_row_id(q, m, o, v, d): 
    return LENGHT_QUERY_SECTION * q + LENGHT_CHUNK_SECTION * m + LENGTH_ORDER_SECTION * o +  LENGHT_STORAGE_SECTION * d + v

def cal_mem_row_id(m, o, v):
    return LENGTH_MEM_CHUNK_SECTION * m + LENGTH_MEM_ORDER_SECTION * o + v

### Contraints & Objective Definition and Solving 
Several minutes to execute since loading of the params takes some time. 

In [28]:
# Runtime performance for column per encoding, sorting, indexing, and selectivity 
def performance_init(model, q, m, o, v, d):
    return df_pre_calc_perf[int(calc_perf_row_id(q, m, o, v, d))][5]
model.c = Param(model.q, M, O, model.v, model.d, within=NonNegativeReals, initialize=performance_init)

# memory consumption of a chunk for a given encoding ordering, and sorting configuration
def memory_init(model, m, o, v):
    return df_pre_calc_mem[int(cal_mem_row_id(m, o, v))][3]
model.m = Param(M, O, model.v, within=NonNegativeReals, initialize=memory_init)

# Objective
def runtime(m):
    return sum(m.x[chunk_id, ordering_id, valid_conf_id, storage_id] * \
               m.c[query_id, chunk_id, ordering_id, valid_conf_id, storage_id] * \
               df_workload.iloc[query_id]['FREQUENCY']
               for query_id in model.q
               for chunk_id in M
               for ordering_id in O
               for valid_conf_id in model.v
               for storage_id in model.d) 
model.obj = Objective(rule=runtime)

#constraints 

# Size within budget
def memory_budget_rule(m, d):
    # sum up memory consumption of all 
    return sum(m.x[chunk_id, ordering_id, valid_conf_id, d] * \
               m.m[chunk_id, ordering_id, valid_conf_id]
               for chunk_id in M
               for ordering_id in O
               for valid_conf_id in model.v) <= m.b[d]
model.MemoryBudgetConstraint = Constraint(model.d, rule=memory_budget_rule)

# Exactly one encoding, sorting, and indexing configuration per chunk
def one_encoding_sorting_indexing_tiering_config_active_per_chunk_rule(m, i):
    return sum(m.x[i, ordering_id, valid_conf_id, storage_id]
                for ordering_id in O
                for valid_conf_id in model.v
                for storage_id in model.d) == 1
model.SingleEncodingConstraints = Constraint(M, rule=one_encoding_sorting_indexing_tiering_config_active_per_chunk_rule)

# Solving

In [31]:
solver = SolverFactory('gurobi')
solver.options['threads'] = 16

result = solver.solve(model)
print_result(result, model)

Solving for budget;
  Storage: 0     Storage Size: 3000000000
  Storage: 1     Storage Size: 3000000000

Result: optimal (walltime: 12.5364s)
Objective: 213979951305.09988
  0: 2982.208944 MB
  1: 1365.78666 MB

Storage: 0
CHUNK               captain_id          latitude            longitude           timestamp           captain_status      
0                   S - RunLength       - - Unencoded       - - Unencoded       - - Dictionary      - - LZ4             
1                   S - RunLength       - - FoR-SIMD        - - FoR-SIMD        - - FoR-SIMD        - - FoR-SIMD        
2                   S - RunLength       - - Unencoded       - - FoR-SIMD        - - FoR-SIMD        - - FoR-SIMD        
3                   - - Dictionary      S - RunLength       - - Dictionary      - - Dictionary      - - FoR-SIMD        
4                   - - Dictionary      S - RunLength       - - Dictionary      - - FoR-SIMD        - - FoR-SIMD        
5                   - - Dictionary      S - RunLeng

In [None]:
ssd_budget = 3_000_000_000

for budget in range( 0, 11_000_000_000, 1_000_000_000):
    model.b[0] = budget
    model.b[1] = ssd_budget
    
    result = solver.solve(model)
    print_result(result, model)
    #export_config(model)

Solving for budget;
  Storage: 0     Storage Size: 0
  Storage: 1     Storage Size: 3000000000

Result: optimal (walltime: 6.8865s)
Objective: 20849251310785.26
  0: 0.0 MB
  1: 2992.402208 MB

Storage: 0
CHUNK               captain_id          latitude            longitude           timestamp           captain_status      

Storage: 1
CHUNK               captain_id          latitude            longitude           timestamp           captain_status      
0                   - - LZ4             - - FoR-SIMD        - - FoR-SIMD        S - RunLength       - - LZ4             
1                   S - RunLength       - - FoR-SIMD        - - FoR-SIMD        - - FoR-SIMD        - - FoR-SIMD        
2                   S - RunLength       - - FoR-SIMD        - - FoR-SIMD        - - FoR-SIMD        - - FoR-SIMD        
3                   - - Dictionary      S - RunLength       - - FoR-SIMD        - - FoR-SIMD        - - FoR-SIMD        
4                   - - Dictionary      S - RunLength    