In [25]:
import os 
import sys 
import time 
import numpy as np
import json

from pyspark.sql import SparkSession 
from pyspark.conf import SparkConf

from pyspark.context import SparkContext
from pyspark.sql.types import (
    DoubleType, LongType, StringType, StructField, StructType)

import platform,socket,re,uuid,json,psutil,logging


# Schemas for all table types here. These should be in separate scripts when
# refactoring code.
CUSTOMER_SCHEMA = StructType([
    StructField("c_custkey", LongType()),
    StructField("c_name", StringType()),
    StructField("c_address", StringType()),
    StructField("c_nationkey", LongType()),
    StructField("c_phone", StringType()),
    StructField("c_acctbal", DoubleType()),
    StructField("c_mktsegment", StringType()),
    StructField("c_comment", StringType()),
])

LINEITEM_SCHEMA = StructType([
    StructField("l_orderkey", LongType()),  
    StructField("l_partkey", LongType()),
    StructField("l_suppkey", LongType()),
    StructField("l_linenumber", LongType()),
    StructField("l_quantity", DoubleType()),
    StructField("l_extendedprice", DoubleType()),
    StructField("l_discount", DoubleType()),
    StructField("l_tax", DoubleType()),
    StructField("l_returnflag", StringType()),
    StructField("l_linestatus", StringType()),
    StructField("l_shipdate", StringType()),
    StructField("l_commitdate", StringType()),
    StructField("l_receiptdate", StringType()),
    StructField("l_shipinstruct", StringType()),
    StructField("l_shipmode", StringType()),
    StructField("l_comment", StringType())
])

NATION_SCHEMA = StructType([
    StructField("n_nationkey", LongType()), 
    StructField("n_name", StringType()),
    StructField("n_regionkey", LongType()),
    StructField("n_comment", StringType()),
])

ORDER_SCHEMA = StructType([
    StructField("o_orderkey", LongType()),
    StructField("o_custkey", LongType()),
    StructField("o_orderstatus", StringType()),
    StructField("o_totalprice", DoubleType()),
    StructField("o_orderdate", StringType()),
    StructField("o_orderpriority", StringType()),
    StructField("o_clerk", StringType()),
    StructField("o_shippriority", LongType()),
    StructField("o_comment", StringType())
])

PART_SCHEMA = StructType([
    StructField("p_partkey", LongType()),    
    StructField("p_name", StringType()),
    StructField("p_mfgr", StringType()),
    StructField("p_brand", StringType()),
    StructField("p_type", StringType()),
    StructField("p_size", LongType()),
    StructField("p_container", StringType()),
    StructField("p_retailprice", DoubleType()),
    StructField("p_comment", StringType()),
])

PARTSUPP_SCHEMA = StructType([
    StructField("ps_partkey", LongType()),
    StructField("ps_suppkey", LongType()),
    StructField("ps_availqty", LongType()),
    StructField("ps_supplycost", DoubleType()),
    StructField("ps_comment", StringType())
])

REGION_SCHEMA = StructType([
    StructField("r_regionkey", LongType()),   
    StructField("r_name", StringType()),
    StructField("r_comment", StringType()),  
])

SUPPLIER_SCHEMA = StructType([
    StructField("s_suppkey", LongType()),    
    StructField("s_name", StringType()),
    StructField("s_address", StringType()),
    StructField("s_nationkey", LongType()),
    StructField("s_phone", StringType()),
    StructField("s_acctbal", DoubleType()),
    StructField("s_comment", StringType())
])

TABLE_SCHEMA_MAP = {
        "customer": CUSTOMER_SCHEMA,
        "lineitem": LINEITEM_SCHEMA,
        "nation": NATION_SCHEMA,
        "region": REGION_SCHEMA,
        "orders": ORDER_SCHEMA,
        "part": PART_SCHEMA,
        "partsupp": PARTSUPP_SCHEMA,
        "supplier": SUPPLIER_SCHEMA,
}

CURRENT_FILE_PATH = os.path.dirname(os.getcwd())
if "training_data" not in CURRENT_FILE_PATH:
    CURRENT_FILE_PATH += "/training_data"
print(CURRENT_FILE_PATH)
    
def getSystemInfo():
    info={}
    try:
        info['platform']=platform.system()
        info['platform-release']=platform.release()
        info['platform-version']=platform.version()
        info['architecture']=platform.machine()
        info['num_cpus'] = os.cpu_count()
        info['hostname']=socket.gethostname()
        info['ip-address']=socket.gethostbyname(socket.gethostname())
        info['mac-address']=':'.join(re.findall('..', '%012x' % uuid.getnode()))
        info['processor']=platform.processor()
        info['ram']=str(round(psutil.virtual_memory().total / (1024.0 **3)))+" GB"
        info['total_storage']=str(round(psutil.disk_usage(CURRENT_FILE_PATH).total / (1024.0 **3)))+" GB"
        info['free_storage']=str(round(psutil.disk_usage(CURRENT_FILE_PATH).free / (1024.0 **3)))+" GB"
    except Exception as e:
        logging.exception(e)
    return info

# read in 22 TPCH queries
TPCH_QUERIES = {}
for i in range(1, 23):
    with open(f"{CURRENT_FILE_PATH}/queries/{i}.sql") as f:
        TPCH_QUERIES[i] = f.read() 
    


/home/hoped/spark-autotuner/training_data


In [37]:
def run_queries_og(parameters, n=10, debug=False, find_median_runtime=True):
    '''
    Run TPC-H queries 10 times and take the median runtime of each query 
    to generate a single training run for a set of parameters.
    
    Input: 
    parameters: list of parameter dictionaries 
    debug: if true will print out params and result time, false suppresses print statements
    
    Returns: 
    training_data dictionary with params and results 
    '''
    result = {'params': [p.copy() for p in parameters], 'runtimes': {'total': []}}
    spark = None
    # add chosen parameter values to spark
    param_name_index = {}
    try:
        conf = SparkConf(loadDefaults=False)
        spark_params = []
        for i, param in enumerate(parameters):
            if param['spark_param']:
                spark_params.append((param['name'], str(param['cur_value'])))
                param_name_index[param['name']] = i

        conf.setAll(spark_params)
        spark  = SparkSession.builder.config(conf=conf).getOrCreate()
                
    except Exception as e:
        if spark:
            spark.stop()
        # this might happen because some parameters are related,
        # and we might have made an impossible parameter assignment
        result = {'params':parameters, 'runtimes': {}, 'msg': str(e)}
        if debug:
            print("error when setting ", parameters, e)
        return result
    
    configurations = spark.sparkContext.getConf().getAll()
    if debug:
        print("Configuration")
    for item in configurations: 
        if debug:
            print(item)
        if param_name_index.get(item[0]) is not None:
            assert item[1] == param[param_name_index.get(item[0])]['cur_value'], f'Spark session param {item} != {param[param_name_index.get(item[0])]}'
    
    # load tables
    if debug:
        print("loading tables")
    tables = {}
    for table_name, table_schema in TABLE_SCHEMA_MAP.items():
        table = spark.read.csv(f"{CURRENT_FILE_PATH}/{SF_STR}/{table_name}.tbl", sep = "|",
                               schema=table_schema)
        table.createOrReplaceTempView(table_name)
        tables[table_name] = table
    
    if debug:
        print("running queries")
    # take median of n runs for each query
    for j in range(n):
        result['runtimes']['total'].append(0)
        for qnum, qtext in TPCH_QUERIES.items(): 
                # Measure execution time of sql query.
            try:
                start_time = time.time()
                results = spark.sql(qtext, **tables)
                end_time = time.time()
                query_time = end_time - start_time
                result['runtimes'].setdefault(qnum, []).append(query_time)
                result['runtimes']['total'][-1] += query_time
            except:
                if debug:
                    print(f"failed while running query {qnum}...  ")
    if debug:
        print("done running queries")
    if find_median_runtime:
        # take median of all runtimes as final output
        for key, times in result['runtimes'].items():
            result['runtimes'][key] = np.median(times)
            if debug:
                print(key, result['runtimes'][key])
    # reset spark so we can load new param config next time
    spark.stop()
    #spark.newSession()#_instantiatedContext attribute of the session to None after calling session.stop().
    
    return result

In [38]:
sf = 1 # GB, default table scale factor
job_name = 'local_run'
SF_STR = f"sf{sf}"

In [39]:
result = run_queries_og([], n=100, find_median_runtime=False)
result['runtimes']['total'], np.median(result['runtimes']['total']), max(result['runtimes']['total'])

([0.2102336883544922,
  0.19797110557556152,
  0.18102312088012695,
  0.17831206321716309,
  0.18066120147705078,
  0.22466421127319336,
  0.18740582466125488,
  0.17984366416931152,
  0.1779019832611084,
  0.17885327339172363,
  0.17812252044677734,
  0.2745347023010254,
  0.18263888359069824,
  0.2833106517791748,
  0.1967933177947998,
  0.21156930923461914,
  0.23160982131958008,
  0.2412569522857666,
  0.17873525619506836,
  0.17847228050231934,
  0.17816162109375,
  0.18703699111938477,
  0.1779642105102539,
  0.20836806297302246,
  0.1785109043121338,
  0.17832612991333008,
  0.1867237091064453,
  0.17916083335876465,
  0.18141889572143555,
  0.296567440032959,
  0.17829656600952148,
  0.21231412887573242,
  0.24968457221984863,
  0.18040180206298828,
  0.17844295501708984,
  0.20157527923583984,
  0.18844246864318848,
  0.17975687980651855,
  0.1783885955810547,
  0.17870235443115234,
  0.1829688549041748,
  0.3233530521392822,
  0.22744369506835938,
  0.19604873657226562,
  0.1

In [33]:
def run_queries(parameters, n=10, debug=False, find_median_runtime=True):
    '''
    Run TPC-H queries 10 times and take the median runtime of each query 
    to generate a single training run for a set of parameters.
    
    Input: 
    parameters: list of parameter dictionaries 
    debug: if true will print out params and result time, false suppresses print statements
    
    Returns: 
    training_data dictionary with params and results 
    '''
    result = {'params': [p.copy() for p in parameters], 'runtimes': {'total': []}}
    spark = None
    # add chosen parameter values to spark
    param_name_index = {}
    try:
        conf = SparkConf(loadDefaults=False)
        spark_params = []
        for i, param in enumerate(parameters):
            if param['spark_param']:
                spark_params.append((param['name'], str(param['cur_value'])))
                param_name_index[param['name']] = i

        conf.setAll(spark_params)
        spark  = SparkSession.builder.config(conf=conf).getOrCreate()
        spark.catalog.clearCache() # clear cache
                
    except Exception as e:
        if spark:
            spark.stop()
        # this might happen because some parameters are related,
        # and we might have made an impossible parameter assignment
        result = {'params':parameters, 'runtimes': {}, 'msg': str(e)}
        if debug:
            print("error when setting ", parameters, e)
        return result
    
    configurations = spark.sparkContext.getConf().getAll()
    if debug:
        print("Configuration")
    for item in configurations: 
        if debug:
            print(item)
        if param_name_index.get(item[0]) is not None:
            assert item[1] == param[param_name_index.get(item[0])]['cur_value'], f'Spark session param {item} != {param[param_name_index.get(item[0])]}'
    
    # load tables
    if debug:
        print("loading tables")
    tables = {}
    for table_name, table_schema in TABLE_SCHEMA_MAP.items():
        table = spark.read.csv(f"{CURRENT_FILE_PATH}/{SF_STR}/{table_name}.tbl", sep = "|",
                               schema=table_schema)
        table.createOrReplaceTempView(table_name)
        tables[table_name] = table
    
    if debug:
        print("running queries")
        
    # take median of n runs for each query
    for j in range(n):
        spark.catalog.clearCache() # clear cache before each run
        result['runtimes']['total'].append(0)
        for qnum, qtext in TPCH_QUERIES.items(): 
                # Measure execution time of sql query.
            try:
                start_time = time.time()
                results = spark.sql(qtext, **tables)
                end_time = time.time()
                query_time = end_time - start_time
                result['runtimes'].setdefault(qnum, []).append(query_time)
                result['runtimes']['total'][-1] += query_time
            except:
                if debug:
                    print(f"failed while running query {qnum}...  ")
    if debug:
        print("done running queries")
    if find_median_runtime:
        # take median of all runtimes as final output
        for key, times in result['runtimes'].items():
            result['runtimes'][key] = np.median(times)
            if debug:
                print(key, result['runtimes'][key])
    # reset spark so we can load new param config next time
    spark.catalog.clearCache() # clear cache at the end of each run just in case?
    spark.stop()
    #spark.newSession()#_instantiatedContext attribute of the session to None after calling session.stop().
    
    return result

In [36]:
result = run_queries([], n=100, find_median_runtime=False)
result['runtimes']['total'], np.median(result['runtimes']['total']), max(result['runtimes']['total'])

([0.2587094306945801,
  0.20500421524047852,
  0.17894196510314941,
  0.17899274826049805,
  0.18035173416137695,
  0.23854613304138184,
  0.17979025840759277,
  0.17872023582458496,
  0.17866897583007812,
  0.17903900146484375,
  0.22313928604125977,
  0.2436084747314453,
  0.17853474617004395,
  0.23714065551757812,
  0.23153400421142578,
  0.2521653175354004,
  0.24042224884033203,
  0.270127534866333,
  0.17946720123291016,
  0.18448901176452637,
  0.17935895919799805,
  0.1782224178314209,
  0.23639488220214844,
  0.24494051933288574,
  0.18637585639953613,
  0.17913818359375,
  0.17864012718200684,
  0.17867469787597656,
  0.178422212600708,
  0.21886754035949707,
  0.18841195106506348,
  0.1786661148071289,
  0.17829394340515137,
  0.1783301830291748,
  0.25531625747680664,
  0.2540466785430908,
  0.23046302795410156,
  0.17897796630859375,
  0.1786348819732666,
  0.20357036590576172,
  0.21613764762878418,
  0.20429015159606934,
  0.22131133079528809,
  0.18821978569030762,
  0

In [None]:
results_combo = []
for i in range(20):
    result = run_queries_og([], n=20, find_median_runtime=False)
    results_combo.append(result['runtimes']['total'])

In [None]:
first_times = [r[0] for r in results_combo]
all_times = []
for r in results_combo:
    all_times += r

def get_stats(runtimes):
    print(f'median: {round(np.median(runtimes), 5)}, average: {round(np.average(runtimes), 5)}, std: {round(np.std(runtimes), 5)}, min: {round(min(runtimes), 5)}, max: {round(max(runtimes), 5)}')

print(get_stats(first_times))
print(get_stats(all_times))
print('---')
for r in results_combo:
    print(get_stats(r))