In [0]:
from datetime import datetime, timedelta
import time

import pandas as pd
import numpy as np

In [0]:
table_size = 12.5

number_unique_clients = 15_000 # this is the column to be used as z_order
client_ids_to_use = [f'_{i:09}' for i in range(number_unique_clients)]

number_cols_in_table = 10

SCHEMA_NAME = 'liquid'
PREFIX_TRADITIONAL = 'partition_zorder'
PREFIX_CLUSTER = 'liquid_cluster'
PREFIX_CLUSTER_INVERTED = 'liquid_cluster_inverted'
PREFIX_SOURCE = 'source'

In [0]:
def create_dummy_df(
    size,
    partition_multiplier,
    clients_choose,
    number_cols
):
    number_rows = int(size * 1_024 ** 3 / (3 + 10 + 8 * number_cols))
    print(f'Number rows to insert: {number_rows:,}')

    number_parittions_to_set = max(round(size * partition_multiplier), 1)
    print(f'Number parititions: {number_parittions_to_set:,}')

    begin_date = datetime(2000, 1, 1)
    end_date = begin_date + timedelta(days = number_parittions_to_set)
    dates_choose = np.arange(begin_date, end_date, timedelta(days = 1)).astype(datetime)

    df = {'col_double_' + str(i): np.random.rand(number_rows) for i in range(1, number_cols + 1)}
    df['reference_date'] = np.random.choice(dates_choose, number_rows)
    df['client_id'] = np.random.choice(clients_choose, number_rows)

    df = pd.DataFrame(df)

    df['reference_date'] = df['reference_date'].dt.date

    df = spark.createDataFrame(df)

    return df

def describe_table(
    table_name,
):
    table_details = spark.sql(f'DESCRIBE DETAIL {SCHEMA_NAME}.{table_name}').collect()

    # print(f'Size: {table_details[0]["sizeInBytes"] / (1_024 ** 3):,.0f} GB')
    print(f'{table_details[0]["numFiles"]:,} files')
    print(f'Partition cols: {table_details[0]["partitionColumns"]}')
    print(f'Clustering cols: {table_details[0]["clusteringColumns"]}')

def optimize_table(table_name, with_zorder):
    additional = 'ZORDER BY (client_id)' if with_zorder else ''
    print(additional)
    spark.sql(f'OPTIMIZE {SCHEMA_NAME}.{table_name} {additional}')

def create_table(
    prefix,
    suffix,
    size,
    number_cols,
):
    if prefix == PREFIX_TRADITIONAL:
        table_name = prefix + '_' + str(size).replace('.', '_') + 'GB_' + suffix
        partition_strategy = 'PARTITIONED BY (reference_date)'
    elif prefix == PREFIX_CLUSTER:
        table_name = prefix + '_' + str(size).replace('.', '_') + 'GB_' + suffix
        partition_strategy = 'CLUSTER BY (reference_date, client_id)'
    elif prefix == PREFIX_CLUSTER_INVERTED:
        table_name = prefix + '_' + str(size).replace('.', '_') + 'GB_' + suffix
        partition_strategy = 'CLUSTER BY (client_id, reference_date)'
    elif prefix == PREFIX_SOURCE:
        table_name = prefix + '_' + str(size).replace('.', '_') + 'GB_' + suffix
        partition_strategy = ''
    else:
        print(f'wrong prefix: {prefix}')
        return

    cols = [
        'col_double_' + str(i) + ' DOUBLE'
        for i in range(1, number_cols + 1)
    ]
    cols = ', '.join(cols)

    spark.sql(
        f'''

        CREATE OR REPLACE TABLE {SCHEMA_NAME}.{table_name}
        (
            reference_date DATE,
            client_id STRING,
            {cols}
        )
        USING delta
        {partition_strategy}

        '''
    )

    with_zorder = prefix == PREFIX_TRADITIONAL
    optimize_table(table_name, with_zorder)

    return table_name

def ingest_values(
    table_name,
    prefix,
):
    
    start = time.time()
    (
        spark.table(SCHEMA_NAME + '.' + table_name.replace(prefix, PREFIX_SOURCE))
        .write
        .mode('append')
        .saveAsTable(SCHEMA_NAME + '.' + table_name)
    )
    end = time.time()
    total_time_seconds = end - start

    return total_time_seconds

def create_source(
    suffix,
    table_size,
    number_cols_in_table,
    client_ids_to_use,
    partition_multiplier,
):
    source_table = create_table(PREFIX_SOURCE, suffix, table_size, number_cols_in_table)

    df = create_dummy_df(table_size, partition_multiplier, client_ids_to_use, number_cols_in_table)

    (
        df
        .write
        .mode('append')
        .saveAsTable(SCHEMA_NAME + '.' + source_table)
    )

    df = []

    return

In [0]:
create_source('ingest', table_size, number_cols_in_table, client_ids_to_use, 1.0)

In [0]:
ingestion_results = {}

for table_suffix in [PREFIX_TRADITIONAL, PREFIX_CLUSTER, PREFIX_CLUSTER_INVERTED]:

    created_table = create_table(table_suffix, 'ingest', table_size, number_cols_in_table)

    describe_table(created_table)

    total_time = ingest_values(created_table, table_suffix)

    ingestion_results[created_table] = total_time

    print(table_suffix, table_size, f'{total_time:,}')

In [0]:
ingestion_results

In [0]:
client_to_search = client_ids_to_use[np.random.randint(0, len(client_ids_to_use) - 1)]
client_to_search = "'" + client_to_search + "'"

available_dates = (
    spark
    .table(SCHEMA_NAME + '.' + list(ingestion_results.keys())[0])
    .select('reference_date')
    .distinct()
    .collect()
)
available_dates = [dt[0] for dt in available_dates]

if len(available_dates) > 1:
    date_to_search = available_dates[np.random.randint(0, len(available_dates) - 1)]
else:
    date_to_search = available_dates[0]
date_to_search = "'" + f'{date_to_search:%Y-%m-%d}' + "'"

In [0]:
for table_name in ingestion_results:
    with_zorder = table_name.startswith(PREFIX_TRADITIONAL)
    optimize_table(table_name, with_zorder)

In [0]:
read_zorder_col_results = {}
read_partition_col_results = {}

for table_name in ingestion_results:

    start = time.time()
    res = (
        spark
        .sql(
            f'''

            SELECT
                *
            FROM
                {SCHEMA_NAME}.{table_name}
            WHERE
                client_id == {client_to_search}
            LIMIT 1000
            '''
        )
        .collect()
    )
    end = time.time()
    total_time_seconds = end - start

    read_zorder_col_results[table_name] = total_time_seconds

    start = time.time()
    res = (
        spark
        .sql(
            f'''

            SELECT
                *
            FROM
                {SCHEMA_NAME}.{table_name}
            WHERE
                reference_date == {date_to_search}
            LIMIT 1000
            '''
        )
        .collect()
    )
    end = time.time()
    total_time_seconds = end - start

    read_partition_col_results[table_name] = total_time_seconds

In [0]:
read_partition_col_results

In [0]:
read_zorder_col_results

In [0]:
create_source('mult_part', table_size, number_cols_in_table, client_ids_to_use, 2.0)

In [0]:
ingestion_part_results = {}

for table_suffix in [PREFIX_TRADITIONAL, PREFIX_CLUSTER, PREFIX_CLUSTER_INVERTED]:

    created_table = create_table(table_suffix, 'mult_part', table_size, number_cols_in_table)

    describe_table(created_table)

    total_time = ingest_values(created_table, table_suffix)

    ingestion_part_results[created_table] = total_time

    print(table_suffix, table_size, f'{total_time:,}')

In [0]:
ingestion_part_results

In [0]:
number_unique_clients = 75_000 # this is the column to be used as z_order
client_ids_to_use = [f'_{i:09}' for i in range(number_unique_clients)]

create_source('mult_zorder', table_size, number_cols_in_table, client_ids_to_use, 1.0)

In [0]:
ingestion_zorder_results = {}

for table_suffix in [PREFIX_TRADITIONAL, PREFIX_CLUSTER, PREFIX_CLUSTER_INVERTED]:

    created_table = create_table(table_suffix, 'mult_zorder', table_size, number_cols_in_table)

    describe_table(created_table)

    total_time = ingest_values(created_table, table_suffix)

    ingestion_zorder_results[created_table] = total_time

    print(table_suffix, table_size, f'{total_time:,}')

In [0]:
ingestion_zorder_results