In [None]:

import awswrangler as wr
from pyathena import connect
import os
import boto3
import json

## Constants and variable to store the metrics

In [None]:
METRICS = {
    "hive":{
        "load_full":{},
        "queries_full":{},
        "queries_incremental":{},
    },
    "iceberg":{
        "load_full":{},
        "queries_full":{},
        "queries_incremental":{},
        "queries_rewrite":{}
    }
}

In [None]:
S3_BASE_PATH = "{{athena_bucket}}"
ATHENA_SCHEMA='hive_test'
TABLE_NAMES = ["customer","lineorder","part","supplier","date"]
TABLE_TYPES = ["iceberg","hive"]
INCREMENTAL_DATASET_COUNT = 100
NUM_QUERIES_TO_RUN = 30
SF = "s100"

## Cleaning all tables in the athena schema and in the s3 paths

In [None]:
#clear s3 folders of the experimental tables
wr.s3.delete_objects(f'{S3_BASE_PATH}/iceberg/', use_threads=True)
wr.s3.delete_objects(f'{S3_BASE_PATH}/hive/', use_threads=True)

In [None]:
#delete all tables in schema
session = boto3.session.Session(region_name='us-east-2')
tables_to_delete = list(wr.catalog.get_tables(database=ATHENA_SCHEMA, boto3_session=session))
for t in tables_to_delete:
    wr.catalog.delete_table_if_exists(database=ATHENA_SCHEMA, table=t["Name"], boto3_session=session)

## Utils

In [None]:
def get_metrics_from_cursor(cursor):
    time = cursor.total_execution_time_in_millis - cursor.query_queue_time_in_millis
    data_scanned = cursor.data_scanned_in_bytes
    planning_time = cursor.query_planning_time_in_millis
    execution_time = cursor.engine_execution_time_in_millis
    return {"time": time, "data_scanned": data_scanned, "planning_time": planning_time, "execution_time": execution_time}

athena_cursor = connect(s3_staging_dir=f"{S3_BASE_PATH}/athena_staging_dir/",
                 region_name="us-east-2").cursor()
def execute_query(query):
    res =  athena_cursor.execute(query)
    return get_metrics_from_cursor(res)

In [None]:
ssb_queries = []
for query in os.listdir("queries/ssb"):
    query_name = query.replace(".sql","")
    with open("queries/ssb/"+query, 'r') as file:
        query_string = file.read()
    ssb_queries.append((query_name,query_string))

In [None]:
def get_raw_full_external_table_name(table_name):
    return f"{ATHENA_SCHEMA}.full_external_{table_name}_{SF}"

def get_raw_incremental_external_table_name(table_name, n):
    return f"{ATHENA_SCHEMA}.incremental_external_{table_name}_{SF}_{n}"

def get_iceberg_table_name(table_name):
    return f"{ATHENA_SCHEMA}.iceberg_{table_name}"

def get_hive_table_name(table_name):
    return f"{ATHENA_SCHEMA}.hive_{table_name}"

def get_raw_full_external_table_location(table_name):
    return f"{S3_BASE_PATH}/{SF}/{table_name}/full/"

def get_raw_incremental_external_table_location(table_name, n):
    return f"{S3_BASE_PATH}/{SF}/{table_name}/incremental/{n}/"

def get_iceberg_table_location(table_name):
    return f"{S3_BASE_PATH}/iceberg/{table_name}"

def get_hive_table_location(table_name):
    return f"{S3_BASE_PATH}/hive/{table_name}"


In [None]:
CREATE_EXTERNAL_TABLE_QUERIES = {
    "customer": """
        CREATE EXTERNAL TABLE {table_name}
        (
                C_CUSTKEY       BigInt,
                C_NAME          varchar(25),
                C_ADDRESS       varchar(25),
                C_CITY          varchar(10), 
                C_NATION        varchar(25), 
                C_REGION        varchar(12), 
                C_PHONE         varchar(15),
                C_MKTSEGMENT    varchar(10) 
        )
        STORED AS PARQUET
        LOCATION  '{location}' 
    """,
    "lineorder": """
        CREATE EXTERNAL TABLE {table_name}
        (
            LO_ORDERKEY             BigInt, 
            LO_LINENUMBER           Int, 
            LO_CUSTKEY              BigInt,
            LO_PARTKEY              BigInt, 
            LO_SUPPKEY              BigInt,
            LO_ORDERDATE            Date,
            LO_ORDERPRIORITY        varchar(15), 
            LO_SHIPPRIORITY         Int, 
            LO_QUANTITY             Int, 
            LO_EXTENDEDPRICE        Int, 
            LO_ORDTOTALPRICE        Int, 
            LO_DISCOUNT             Int,
            LO_REVENUE              int, 
            LO_SUPPLYCOST           BigInt,
            LO_TAX                  Int,
            LO_COMMITDATE           Date,
            LO_SHIPMODE             varchar(10)
        )
        STORED AS PARQUET
        LOCATION  '{location}' 
    """,
    "part": """
        CREATE EXTERNAL TABLE {table_name}
        (
            P_PARTKEY       BigInt,
            P_NAME          varchar(22),
            P_MFGR          varchar(6), 
            P_CATEGORY      varchar(7), 
            P_BRAND         varchar(9), 
            P_COLOR         varchar(11), 
            P_TYPE          varchar(25),
            P_SIZE          Int,
            P_CONTAINER     varchar(10) 
        )
        STORED AS PARQUET
        LOCATION  '{location}' 
    """,
    "supplier": """
        CREATE EXTERNAL TABLE {table_name}
        (
            S_SUPPKEY       BigInt,
            S_NAME          varchar(25),
            S_ADDRESS       varchar(25),
            S_CITY          varchar(10),
            S_NATION        varchar(15),
            S_REGION        varchar(12),
            S_PHONE         varchar(15)
        )
        STORED AS PARQUET
        LOCATION  '{location}' 
    """,
    "date": """
        CREATE EXTERNAL TABLE {table_name}
        (
            D_DATEKEY            Date,
            D_DATE               char(18),
            D_DAYOFWEEK          char(8),
            D_MONTH              char(9),
            D_YEAR               int,
            D_YEARMONTHNUM       bigint,
            D_YEARMONTH          char(7),
            D_DAYNUMINWEEK       Int,
            D_DAYNUMINMONTH      Int,
            D_DAYNUMINYEAR       Int,
            D_MONTHNUMINYEAR     int,
            D_WEEKNUMINYEAR      Int,
            D_SELLINGSEASON      varchar(12),
            D_LASTDAYINWEEKFL    Int,
            D_LASTDAYINMONTHFL   Int,
            D_HOLIDAYFL          Int,
            D_WEEKDAYFL          Int
        )
        STORED AS PARQUET
        LOCATION  '{location}' 
    """
}

In [None]:
CREATE_HIVE_TABLE_QUERIES = {
    "customer": """
        CREATE EXTERNAL TABLE {table_name}
        (
                C_CUSTKEY       BigInt,
                C_NAME          varchar(25),
                C_ADDRESS       varchar(25),
                C_CITY          varchar(10), 
                C_NATION        varchar(25), 
                C_REGION        varchar(12), 
                C_PHONE         varchar(15),
                C_MKTSEGMENT    varchar(10) 
        )
        STORED AS PARQUET
        LOCATION  '{location}' 
    """,
    "lineorder": """
        CREATE EXTERNAL TABLE {table_name}
        (
            LO_ORDERKEY             BigInt, 
            LO_LINENUMBER           Int, 
            LO_CUSTKEY              BigInt,
            LO_PARTKEY              BigInt, 
            LO_SUPPKEY              BigInt,
            LO_ORDERDATE            Date,
            LO_ORDERPRIORITY        varchar(15), 
            LO_SHIPPRIORITY         Int, 
            LO_QUANTITY             Int, 
            LO_EXTENDEDPRICE        Int, 
            LO_ORDTOTALPRICE        Int, 
            LO_DISCOUNT             Int,
            LO_REVENUE              int, 
            LO_SUPPLYCOST           BigInt,
            LO_TAX                  Int,
            LO_COMMITDATE           Date,
            LO_SHIPMODE             varchar(10)
        )
        STORED AS PARQUET
        LOCATION  '{location}' 
    """,
    "part": """
        CREATE EXTERNAL TABLE {table_name}
        (
            P_PARTKEY       BigInt,
            P_NAME          varchar(22),
            P_MFGR          varchar(6), 
            P_CATEGORY      varchar(7), 
            P_BRAND         varchar(9), 
            P_COLOR         varchar(11), 
            P_TYPE          varchar(25),
            P_SIZE          Int,
            P_CONTAINER     varchar(10) 
        )
        STORED AS PARQUET
        LOCATION  '{location}'  
    """,
    "supplier": """
        CREATE EXTERNAL TABLE {table_name}
        (
            S_SUPPKEY       BigInt,
            S_NAME          varchar(25),
            S_ADDRESS       varchar(25),
            S_CITY          varchar(10),
            S_NATION        varchar(15),
            S_REGION        varchar(12),
            S_PHONE         varchar(15)
        )
        STORED AS PARQUET
        LOCATION  '{location}'  
    """,
    "date": """
        CREATE EXTERNAL TABLE {table_name}
        (
            D_DATEKEY            Date,
            D_DATE               char(18),
            D_DAYOFWEEK          char(8),
            D_MONTH              char(9),
            D_YEAR               int,
            D_YEARMONTHNUM       bigint,
            D_YEARMONTH          char(7),
            D_DAYNUMINWEEK       Int,
            D_DAYNUMINMONTH      Int,
            D_DAYNUMINYEAR       Int,
            D_MONTHNUMINYEAR     Int,
            D_WEEKNUMINYEAR      Int,
            D_SELLINGSEASON      varchar(12),
            D_LASTDAYINWEEKFL    Int,
            D_LASTDAYINMONTHFL   Int,
            D_HOLIDAYFL          Int,
            D_WEEKDAYFL          Int
        )
        STORED AS PARQUET
        LOCATION  '{location}' 
    """
}

In [None]:
CREATE_ICEBERG_TABLE_QUERIES = {
    "customer": """
        CREATE TABLE {table_name}
        (
                C_CUSTKEY       BigInt,
                C_NAME          string,
                C_ADDRESS       string,
                C_CITY          string, 
                C_NATION        string, 
                C_REGION        string, 
                C_PHONE         string,
                C_MKTSEGMENT    string 
        )
        LOCATION  '{location}' 
        TBLPROPERTIES ('table_type' = 'ICEBERG');
    """,
    "lineorder": """
        CREATE TABLE {table_name}
        (
            LO_ORDERKEY             BigInt, 
            LO_LINENUMBER           Int, 
            LO_CUSTKEY              BigInt,
            LO_PARTKEY              BigInt, 
            LO_SUPPKEY              BigInt,
            LO_ORDERDATE            Date,
            LO_ORDERPRIORITY        string, 
            LO_SHIPPRIORITY         Int, 
            LO_QUANTITY             Int, 
            LO_EXTENDEDPRICE        Int, 
            LO_ORDTOTALPRICE        Int, 
            LO_DISCOUNT             Int,
            LO_REVENUE              int, 
            LO_SUPPLYCOST           BigInt,
            LO_TAX                  Int,
            LO_COMMITDATE           Date,
            LO_SHIPMODE             string
        )
        LOCATION  '{location}'
        TBLPROPERTIES ('table_type' = 'ICEBERG');
    """,
    "part": """
        CREATE TABLE {table_name}
        (
            P_PARTKEY       BigInt,
            P_NAME          string,
            P_MFGR          string, 
            P_CATEGORY      string, 
            P_BRAND         string, 
            P_COLOR         string, 
            P_TYPE          string,
            P_SIZE          Int,
            P_CONTAINER     string 
        )
        LOCATION  '{location}'  
        TBLPROPERTIES ('table_type' = 'ICEBERG');
    """,
    "supplier": """
        CREATE TABLE {table_name}
        (
            S_SUPPKEY       BigInt,
            S_NAME          string,
            S_ADDRESS       string,
            S_CITY          string,
            S_NATION        string,
            S_REGION        string,
            S_PHONE         string
        )
        LOCATION  '{location}'  
        TBLPROPERTIES ('table_type' = 'ICEBERG');
    """,
    "date": """
        CREATE TABLE {table_name}
        (
            D_DATEKEY            Date,
            D_DATE               string,
            D_DAYOFWEEK          string,
            D_MONTH              string,
            D_YEAR               Int,
            D_YEARMONTHNUM       bigint,
            D_YEARMONTH          string,
            D_DAYNUMINWEEK       Int,
            D_DAYNUMINMONTH      Int,
            D_DAYNUMINYEAR       Int,
            D_MONTHNUMINYEAR     Int,
            D_WEEKNUMINYEAR      Int,
            D_SELLINGSEASON      string,
            D_LASTDAYINWEEKFL    Int,
            D_LASTDAYINMONTHFL   Int,
            D_HOLIDAYFL          Int,
            D_WEEKDAYFL          Int
        )
        LOCATION  '{location}' 
        TBLPROPERTIES ('table_type' = 'ICEBERG');
    """
}

## Create Raw Tables

In [None]:
for table_name, query in CREATE_EXTERNAL_TABLE_QUERIES.items():
    full_table_name = get_raw_full_external_table_name(table_name)
    table_location = get_raw_full_external_table_location(table_name)
    formatted_query = query.format(table_name=full_table_name, location=table_location)
    execute_query(formatted_query)


In [None]:
#create incremental lineorder tables
table_name = "lineorder"
query = CREATE_EXTERNAL_TABLE_QUERIES["lineorder"]
for n in range(INCREMENTAL_DATASET_COUNT):
    full_table_name = get_raw_incremental_external_table_name(table_name, n)
    table_location = get_raw_incremental_external_table_location(table_name, n)
    formatted_query = query.format(table_name=full_table_name, location=table_location)
    execute_query(formatted_query)

## Create Hive Tables

In [None]:
for table_name, query in CREATE_HIVE_TABLE_QUERIES.items():
    full_table_name = get_hive_table_name(table_name)
    table_location = get_hive_table_location(table_name)
    formatted_query = query.format(table_name=full_table_name, location=table_location)
    execute_query(formatted_query)
    
    

## Load Hive tables with full load

In [None]:

for table in TABLE_NAMES:
    hive_table = get_hive_table_name(table)
    full_table = get_raw_full_external_table_name(table)
    query = f"""
        INSERT INTO {hive_table} SELECT * FROM {full_table}
    """
        
    res = execute_query(query)
    METRICS["hive"]["load_full"][table] = res
        

## Run SSB Queries for experiment 1

In [None]:
date_table = get_hive_table_name("date")
supplier_table = get_hive_table_name("supplier")
lineorder_table = get_hive_table_name("lineorder")
part_table = get_hive_table_name("part")
customer_table = get_hive_table_name("customer")

In [None]:

for run_number in range(NUM_QUERIES_TO_RUN):
    METRICS["hive"]["queries_full"][run_number] = {}
    print(f"Running query set:{run_number}" )
    for query_name, query in ssb_queries:
        formatted_query = query.format(date_table = date_table, 
                                                supplier_table = supplier_table, 
                                                lineorder_table = lineorder_table,
                                                part_table = part_table, 
                                                customer_table = customer_table)
        
        res = execute_query(formatted_query)

        METRICS["hive"]["queries_full"][run_number][query_name] = res

## Clear fact table, load incrementaly and run experiment 2

In [None]:
lineorder_path = get_hive_table_location("lineorder")
wr.s3.delete_objects(lineorder_path, use_threads=True)

In [None]:

for n in range(INCREMENTAL_DATASET_COUNT):
    hive_table = get_hive_table_name("lineorder")
    incremental_table = get_raw_incremental_external_table_name("lineorder",n)
    query = f"""
        INSERT INTO {hive_table} SELECT * FROM {incremental_table}
    """
    
    res = execute_query(query)

In [None]:
for run_number in range(NUM_QUERIES_TO_RUN):
    METRICS["hive"]["queries_incremental"][run_number] = {}
    print(f"Running query set:{run_number}" )
    for query_name, query in ssb_queries:
        formatted_query = query.format(date_table = date_table, 
                                                supplier_table = supplier_table, 
                                                lineorder_table = lineorder_table,
                                                part_table = part_table, 
                                                customer_table = customer_table)
        
        res = execute_query(formatted_query)

        METRICS["hive"]["queries_incremental"][run_number][query_name] = res

## Create Iceberg tables

In [None]:
for table_name, query in CREATE_ICEBERG_TABLE_QUERIES.items():
    full_table_name = get_iceberg_table_name(table_name)
    table_location = get_iceberg_table_location(table_name)
    formatted_query = query.format(table_name=full_table_name, location=table_location)
    execute_query(formatted_query)
    

In [None]:

for table in TABLE_NAMES:
    iceberg_table = get_iceberg_table_name(table)
    full_table = get_raw_full_external_table_name(table)
    query = f"""
        INSERT INTO {iceberg_table} SELECT * FROM {full_table}
    """
        
    res = execute_query(query)
    METRICS["iceberg"]["load_full"][table] = res
        

In [None]:
date_table = get_iceberg_table_name("date")
supplier_table = get_iceberg_table_name("supplier")
lineorder_table = get_iceberg_table_name("lineorder")
part_table = get_iceberg_table_name("part")
customer_table = get_iceberg_table_name("customer")

In [None]:

for run_number in range(NUM_QUERIES_TO_RUN):
    METRICS["iceberg"]["queries_full"][run_number] = {}
    print(f"Running query set:{run_number}" )
    for query_name, query in ssb_queries:
        formatted_query = query.format(date_table = date_table, 
                                                supplier_table = supplier_table, 
                                                lineorder_table = lineorder_table,
                                                part_table = part_table, 
                                                customer_table = customer_table)
        
        res = execute_query(formatted_query)

        METRICS["iceberg"]["queries_full"][run_number][query_name] = res

## Clear fact table, load incrementaly and run experiment 2

In [None]:
iceberg_lineorder_location = get_iceberg_table_location("lineorder")
iceberg_lineorder_name = get_iceberg_table_name("lineorder")
wr.s3.delete_objects(lineorder_path, use_threads=True)
execute_query(f"DROP TABLE {iceberg_lineorder_name}")

query =  CREATE_ICEBERG_TABLE_QUERIES["lineorder"]
formatted_query = query.format(table_name=iceberg_lineorder_name, location=iceberg_lineorder_location)
execute_query(formatted_query)

In [None]:

for n in range(INCREMENTAL_DATASET_COUNT):
    iceberg_table = get_iceberg_table_name("lineorder")
    incremental_table = get_raw_incremental_external_table_name("lineorder",n)
    query = f"""
        INSERT INTO {iceberg_table} SELECT * FROM {incremental_table}
    """
    
    res = execute_query(query)

In [None]:
for run_number in range(NUM_QUERIES_TO_RUN):
    METRICS["iceberg"]["queries_incremental"][run_number] = {}
    print(f"Running query set:{run_number}" )
    for query_name, query in ssb_queries:
        formatted_query = query.format(date_table = date_table, 
                                                supplier_table = supplier_table, 
                                                lineorder_table = lineorder_table,
                                                part_table = part_table, 
                                                customer_table = customer_table)
        
        res = execute_query(formatted_query)

        METRICS["iceberg"]["queries_incremental"][run_number][query_name] = res

## Vacuum, OPTIMIZE and run experiment

In [None]:
execute_query(f"OPTIMIZE {iceberg_lineorder_name} REWRITE DATA USING BIN_PACK")
execute_query(f"VACUUM {iceberg_lineorder_name}")

In [None]:
for run_number in range(NUM_QUERIES_TO_RUN):
    METRICS["iceberg"]["queries_rewrite"][run_number] = {}
    print(f"Running query set:{run_number}" )
    for query_name, query in ssb_queries:
        formatted_query = query.format(date_table = date_table, 
                                                supplier_table = supplier_table, 
                                                lineorder_table = lineorder_table,
                                                part_table = part_table, 
                                                customer_table = customer_table)
        
        res = execute_query(formatted_query)

        METRICS["iceberg"]["queries_rewrite"][run_number][query_name] = res

## Output metrics to json

In [None]:
with open(f"{SF}_metrics.json", "w") as outfile: 
    json.dump(METRICS, outfile)

In [None]:
METRICS