In [46]:
import pandas as pd
import sqlalchemy
from dotenv import dotenv_values
import numpy as np

from models.utils import distr_maker, model_distr_pack
from preprocessing.instances import instSet_transform

config = dotenv_values()
engine = sqlalchemy.create_engine(f'postgresql+psycopg2://{config["USERNAME"]}:{config["PASSWORD"]}@{config["HOST"]}/{config["DATABASE"]}')
SNOWFLAKE_INSTANCE = instSet_transform()
SNOWFLAKE_INSTANCE = SNOWFLAKE_INSTANCE[SNOWFLAKE_INSTANCE["id"] == "c5d.24xlarge"]
SNOWFLAKE_INSTANCE

Unnamed: 0,id,memory_Gib,vcpu_count,clock_ghz,storage_Gib,storage_count,storage_type,network_Gbps,network_is_steady,cost_usdph,loading_comment,id_prefix,id_numstr,id_number,id_slice,id_slice_factor,id_slice_of,id_slice_net,id_slice_sto,calc_net_speed,calc_s3_speed,calc_mem_speed,calc_sto_speed,calc_cpu_real,calc_mem_caching,calc_sto_caching,calc_mem_spooling,calc_sto_spooling
8698,c5d.24xlarge,192.0,96.0,3.0,3600.0,4.0,NVMe,25,True,4.608,,c5d,24,24.0,24.0,1.0,8698,25.0,4.0,3.125,2.5,50,8.0,48.0,96.0,1800.0,96.0,1800.0


In [47]:
df_types = {
        'warehouse_id': 'int64',
        'cpu_micros': 'int64',
        'scan_s3': 'int64',
        'scan_cache': 'int64',
        'spool_ssd': 'int64',
        'spool_s3': 'int64',
        'warehouse_size': 'int64'
}

sql_statement = sqlalchemy.text("""
   SELECT warehouseId,
   sum(systemCpuTime) + sum(userCpuTime) AS cpu_micros,
   sum(persistentReadBytesS3)            AS scan_s3,
   sum(persistentReadBytesCache)         AS scan_cache,
   sum(intDataReadBytesLocalSSD)         AS spool_ssd,
   sum(intDataReadBytesS3)               AS spool_s3,
   avg(warehouseSize)                    AS warehouse_size
  FROM snowset TABLESAMPLE SYSTEM (0.01)
  WHERE warehouseSize = 1
  group by warehouseId
""")

result_df = None
with engine.connect() as conn:
    result_df = pd.DataFrame(conn.execute(sql_statement).fetchall(), columns=[
        'warehouse_id',
        'cpu_micros',
        'scan_s3',
        'scan_cache',
        'spool_ssd',
        'spool_s3',
        'warehouse_size'
    ]).astype(df_types)


Unnamed: 0,warehouse_id,cpu_micros,scan_s3,scan_cache,spool_ssd,spool_s3,warehouse_size
0,8065482011927780317,44680000,5644288,5432662688,0,0,1
1,3216081187640928772,20000,0,0,0,0,1
2,1852444650211263026,750000,0,2588672,0,0,1
3,6500664921228645018,1720000,0,4473856,0,0,1
4,7922643692981235608,2630000,0,13987840,0,0,1
...,...,...,...,...,...,...,...
64,807135403192681547,140220000,1677287424,101814016,0,0,1
65,9065742722817182439,667950000,181395968,15908220896,4791070720,0,1
66,592107993998651373,42020000,0,10134528,40513536,0,1
67,6882855326578019261,3820000,3294208,1955136,27623424,0,1


In [54]:
def snowset_estimate_cache_skew(row):
    scanned = (row['scan_s3'] + row['scan_cache'])/ row['warehouse_size'] / 1024**3
    tail = row['scan_s3'] / row['warehouse_size'] / 1024**3

    bins = {
        'data_mem': pd.DataFrame(data={'size': SNOWFLAKE_INSTANCE['calc_mem_caching'].round(decimals=0), 'prio': SNOWFLAKE_INSTANCE['calc_mem_speed']}),
        'data_sto': pd.DataFrame(data={'size': SNOWFLAKE_INSTANCE['calc_sto_caching'].round(decimals=0), 'prio': SNOWFLAKE_INSTANCE['calc_sto_speed']}),
        'data_s3': pd.DataFrame(data={'size': scanned, 'prio': SNOWFLAKE_INSTANCE['calc_net_speed']})
    }

    skew = 0.00001
    error = 1
    i = 1

    while error > 0.01 and i < 101 and skew > 0:
        distribution = distr_maker(skew, round(scanned))
        pack = model_distr_pack(bins, distribution)
        if pack.iloc[0]["data_s3"] == 0:
            break

        err_abs = round(pack.iloc[0]['data_s3'] - float(tail),2)
        error = round(abs(err_abs/tail), 2)
        skew = skew + np.sign(err_abs) * min(0.1, error / (i * 0.5))
        i += i

    row['data_scan'] = scanned
    row['cache_skew_tail'] = tail
    row['cache_skew'] = skew
    row['cache_skew_error'] = error
    row['cache_skew_iter'] = i

    return row



In [59]:
def snowset_spool_frac_estimation(row):
    scanned = row['scan_s3'] + row['scan_cache']
    spooled = row['spool_s3'] + row['spool_ssd']
    if scanned:
        frac = spooled / scanned
    else:
        frac = 0
    row['spool_frac'] = frac

    return row


In [60]:
result_df = result_df.apply(snowset_estimate_cache_skew, axis=1)
result_df = result_df.apply(snowset_spool_frac_estimation, axis=1)

In [57]:
def snowset_row_est_spool_skew(row):
    scanned = row['spool_frac'] * row['data_scan']
    tail = row['spool_s3'] / row['warehouse_size'] / 1024 ** 3

    if scanned < 1:
        row['spool_skew_tail'] = tail
        row['spool_skew'] = 0.0001
        row['spool_skew_error'] = 0
        row['spool_skew_iter'] = 0

        return row

    bins = {
        'data_mem': pd.DataFrame(data={'size': SNOWFLAKE_INSTANCE['calc_mem_caching'].round(decimals=0), 'prio': SNOWFLAKE_INSTANCE['calc_mem_speed']}),
        'data_sto': pd.DataFrame(data={'size': SNOWFLAKE_INSTANCE['calc_sto_caching'].round(decimals=0), 'prio': SNOWFLAKE_INSTANCE['calc_sto_speed']}),
        'data_s3': pd.DataFrame(data={'size': scanned, 'prio': SNOWFLAKE_INSTANCE['calc_net_speed']})
    }

    skew = 0.00001
    error = 1
    iter_count = 1

    while error > 0.01 and iter_count < 101 and skew > 0:
        dist_est = distr_maker(skew, round(scanned))
        pack = model_distr_pack(bins, dist_est)

        if not pack.iloc[0]['data_s3'] or tail == 0:
            break

        print("Data S3: ", pack.iloc[0]['data_s3'])
        print("Tail: ", tail)
        err_abs = pack.iloc[0]['data_s3'] - tail
        error = abs(err_abs / tail)
        skew = skew + np.sign(err_abs) * min(0.1, error / (iter_count * 0.5))
        iter_count += 1

    if iter_count >= 100:
        print(["Aborted after 100 iterations, skew might not be very accurate", skew])
    if skew <= 0:
        print(["Skew < 0, this is a weird row.", skew])

    row['spool_skew_tail'] = tail
    row['spool_skew'] = skew
    row['spool_skew_error'] = error
    row['spool_skew_iter'] = iter_count

    return row


In [63]:
result_df.apply(snowset_row_est_spool_skew, axis=1)

Unnamed: 0,warehouse_id,cpu_micros,scan_s3,scan_cache,spool_ssd,spool_s3,warehouse_size,data_scan,cache_skew_tail,cache_skew,cache_skew_error,cache_skew_iter,spool_frac,spool_skew_tail,spool_skew,spool_skew_error,spool_skew_iter
0,8.065482e+18,44680000.0,5.644288e+06,5.432663e+09,0.000000e+00,0.0,1.0,5.064818,0.005257,0.00001,1.0,1.0,0.000000,0.0,0.00010,0.0,0.0
1,3.216081e+18,20000.0,0.000000e+00,0.000000e+00,0.000000e+00,0.0,1.0,0.000000,0.000000,0.00001,1.0,1.0,0.000000,0.0,0.00010,0.0,0.0
2,1.852445e+18,750000.0,0.000000e+00,2.588672e+06,0.000000e+00,0.0,1.0,0.002411,0.000000,0.00001,1.0,1.0,0.000000,0.0,0.00010,0.0,0.0
3,6.500665e+18,1720000.0,0.000000e+00,4.473856e+06,0.000000e+00,0.0,1.0,0.004167,0.000000,0.00001,1.0,1.0,0.000000,0.0,0.00010,0.0,0.0
4,7.922644e+18,2630000.0,0.000000e+00,1.398784e+07,0.000000e+00,0.0,1.0,0.013027,0.000000,0.00001,1.0,1.0,0.000000,0.0,0.00010,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
64,8.071354e+17,140220000.0,1.677287e+09,1.018140e+08,0.000000e+00,0.0,1.0,1.656917,1.562096,0.00001,1.0,1.0,0.000000,0.0,0.00010,0.0,0.0
65,9.065743e+18,667950000.0,1.813960e+08,1.590822e+10,4.791071e+09,0.0,1.0,14.984623,0.168938,0.00001,1.0,1.0,0.297774,0.0,0.00001,1.0,1.0
66,5.921080e+17,42020000.0,0.000000e+00,1.013453e+07,4.051354e+07,0.0,1.0,0.009439,0.000000,0.00001,1.0,1.0,3.997575,0.0,0.00010,0.0,0.0
67,6.882855e+18,3820000.0,3.294208e+06,1.955136e+06,2.762342e+07,0.0,1.0,0.004889,0.003068,0.00001,1.0,1.0,5.262262,0.0,0.00010,0.0,0.0


In [None]:
def generate_params_from_snowflake(snowflake_data):
    return {
        'cpu_h': snowflake_data['cpu_micros'] / 10**6 / 60**2,
        'total_reads': snowflake_data['scan_s3'] + snowflake_data['scan_cache'],
        'cache_skew': snowflake_data['cache_skew'],
        'first_read_from_s3': False,
        'spooling_fraction': snowflake_data['spool_frac'],
        'spooling_skew': snowflake_data['spool_skew'],
        'spooling_read_sum':  snowflake_data['spool_frac'] * (snowflake_data['scan_s3'] + snowflake_data['scan_cache']),
        'scaling_param': 0.95,
        'max_instance_count': 128
    }
