In [None]:
from pyspark.sql import SparkSession
import os
import pandas as pd
import time
import string
import pathlib
import random
import threading
import time
from urllib.parse import urlsplit, urlunsplit
import requests
import json
from py4j.protocol import Py4JJavaError, Py4JError
import glob
import psutil

In [None]:
# Global configuration
SPARK_MEMORY = 900
SPARK_CORES = 60
DBHOST = 'postgres'
QUERY_TIMEOUT = 60 * 30
QUERY_TIMEOUT = 60 * 180

In [None]:
def create_spark():
    spark = SparkSession.builder \
        .appName("app") \
        .master(f'local[{SPARK_CORES}]') \
        .config("spark.driver.memory", f'{SPARK_MEMORY}g') \
        .config("spark.executor.memory", f'{SPARK_MEMORY}g') \
        .config("spark.memory.offHeap.enabled",False) \
        .config("spark.jars", "postgresql-42.3.3.jar") \
        .getOrCreate()
    return spark

## Cluster
# def create_spark():
#     spark = SparkSession.builder \
#         .appName("app") \
#         .master('spark://10.100.42.35:7078') \
#         .config("spark.driver.memory", f'{SPARK_MEMORY}g') \
#         .config("spark.executor.memory", f'{SPARK_MEMORY}g') \
#         .config("spark.driver.host", "10.100.42.223") \
#         .config("spark.driver.bindAddress", "0.0.0.0") \
#         .config("spark.driver.port", "4060") \
#         .config("spark.memory.offHeap.enabled",OFFHEAP) \
#         .config("spark.jars", "postgresql-42.3.3.jar") \
#         .getOrCreate()
#     return spark

In [None]:
def extract_metrics(spark, group_id):
    parsed = list(urlsplit(spark.sparkContext.uiWebUrl))
    host_port = parsed[1]
    parsed[1] = 'localhost' + host_port[host_port.find(':'):]
    API_URL = f'{urlunsplit(parsed)}/api/v1'

    app_id = spark.sparkContext.applicationId
    sql_queries = requests.get(API_URL + f'/applications/{app_id}/sql', params={'length': '100000'}).json()
    query_ids = [q['id'] for q in sql_queries if q['description'] == group_id]
    if (len(query_ids) == 0):
        print(f'query with group {group_id} not found')
        return None
    query_id = query_ids[0]
    print(f'query id: {query_id}')
    
    query_details = requests.get(API_URL + f'/applications/{app_id}/sql/{query_id}',
                                 params={'details': 'true', 'planDescription': 'true'}).json()
    
    success_job_ids = query_details['successJobIds']
    running_job_ids = query_details['runningJobIds']
    failed_job_ids = query_details['failedJobIds']
    
    job_ids = success_job_ids + running_job_ids + failed_job_ids
    
    job_details = [requests.get(API_URL + f'/applications/{app_id}/jobs/{jid}').json() for jid in job_ids]
    
    job_stages = {}
    
    for j in job_details:
        stage_ids = j['stageIds']
        
        stage_params = {'details': 'true', 'withSummaries': 'true'}
        stages = [requests.get(API_URL + f'/applications/{app_id}/stages/{sid}', stage_params) for sid in stage_ids]
        
        job_stages[j['jobId']] = [stage.json() for stage in stages if stage.status_code == 200] # can be 404
    
    return query_details, job_details, job_stages

In [None]:
def import_db(spark, dbname):
    
    username = dbname
    password = dbname
    dbname = dbname

    df_tables = spark.read.format("jdbc") \
    .option("url", f'jdbc:postgresql://{DBHOST}:5432/{dbname}') \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "information_schema.tables") \
    .option("user", username) \
    .option("password", password) \
    .load()

    for idx, row in df_tables.toPandas().iterrows():
        if row.table_schema == 'public':
            table_name = row.table_name
            df = spark.read.format("jdbc") \
                .option("url", f'jdbc:postgresql://{DBHOST}:5432/{dbname}') \
                .option("driver", "org.postgresql.Driver") \
                .option("dbtable", table_name) \
                .option("user", username) \
                .option("password", password) \
                .load()
    
            print(table_name)
            #print(df.show())
            df.createOrReplaceTempView(table_name)

def random_str(size=16, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.choice(chars) for _ in range(size))

def set_group_id(spark):
    group_id = random_str()
    spark.sparkContext.setJobGroup(group_id, group_id)
    return group_id

def cancel_query(spark, seconds, group_id):
    time.sleep(seconds)
    print("cancelling jobs with id " + group_id)
    print(spark.sparkContext.cancelJobGroup(group_id))
    print("cancelled job")

def cancel_query_after(spark, seconds):
    group_id = random_str()
    spark.sparkContext.setJobGroup(group_id, group_id)
    threading.Thread(target=cancel_query, args=(spark, seconds, group_id,)).start()
    return group_id
    
def run_query(spark, file):
    with open(file, 'r') as f:
        query = '\n'.join(filter(lambda line: not line.startswith('limit') and not line.startswith('-'), f.readlines()))
        
        print("running query: \n" + query)
        return spark.sql(query)

def get_resource_usage(t):
    return {
        'time': t,
        'memory': psutil.virtual_memory(),
        'cpu': psutil.cpu_percent(interval=None, percpu=True),
        'cpu_total': psutil.cpu_percent(interval=None, percpu=False)
    }
def explain_str(df):
    return df._sc._jvm.PythonSQLUtils.explainString(df._jdf.queryExecution(), 'extended')

In [None]:
resource_usage = []

def measure_resource_usage(resource_usage):
    t = threading.current_thread()
    secs = 0
    while getattr(t, "do_run", True):
        resource_usage.append(get_resource_usage(secs))
        #print("resource usage: " + str(resource_usage))
        secs += 1
        time.sleep(1)

def benchmark_query(spark, query, respath, run):
    spark.sparkContext._jvm.System.gc()
    start_time = time.time()

    resource_usage = []

    measure_thread = threading.Thread(target=measure_resource_usage, args=(resource_usage, ))
    measure_thread.start()

    group_id = cancel_query_after(spark, QUERY_TIMEOUT)
    df1 = run_query(spark, query)
    df1.show()

    measure_thread.do_run = False

    end_time = time.time()
    diff_time = end_time - start_time

    execution, jobs, job_stages = extract_metrics(spark, group_id)

    with open(respath + f'/resource-usage-{run}.json', 'w') as f:
        f.write(json.dumps(resource_usage, indent=2))
    with open(respath + f'/explain-{run}.txt', 'w') as f:
        f.write(explain_str(df1))

    resource_list = map(lambda r: [r['time'], r['memory'].used, r['cpu_total']], resource_usage)
    resource_df = pd.DataFrame(resource_list, columns = ['time', 'memory_used', 'cpu_used'])
    resource_df.to_csv(respath + f'/resource-usage-{run}.csv')

    peak_memory = max(map(lambda r: r['memory'].used, resource_usage)) / (1000 * 1000 * 1000) # GB

    if execution is not None:
            with open(respath + f'/execution-{run}.json', 'w') as f:
                f.write(json.dumps(execution, indent=2))
            with open(respath + f'/jobs-{run}.json', 'w') as f:
                f.write(json.dumps(jobs, indent=2))
            with open(respath + f'/stages-{run}.json', 'w') as f:
                f.write(json.dumps(job_stages, indent=2))
    return (diff_time, peak_memory)

def benchmark(spark, dbname, query_file, mode, run):
    #spark.sql("SET spark.sql.yannakakis.enabled = false").show()
    # run the query once to warm up Spark (load the relation in memory)
    #df0 = run_query(query)
    #df0.show()
    
    query_name = os.path.basename(query_file)

    respath = f'benchmark-results-{dbname}/' + query_name + "/" + mode
    pathlib.Path(respath).mkdir(parents=True, exist_ok=True)

    if mode == "opt":
        spark.sql("SET spark.sql.yannakakis.enabled = true").show()
    elif mode == "ref":
        spark.sql("SET spark.sql.yannakakis.enabled = false").show()
    else:
        return []

    try:
        (runtime, peak_memory) = benchmark_query(spark, query_file, respath, run)
        return [query_name, runtime, peak_memory, mode, run]
    except Py4JError as e:
        print('timeout or error: ' + str(e))
        return [query_name, None, None, mode, run]

def benchmark_all(dbname, mode, runs, queries, group_in_leaves=False, physical_cj=False):
    spark = create_spark()
    import_db(spark, dbname)

    if physical_cj:
        spark.sql("SET spark.sql.codegen.wholeStage = true").show()
        spark.sql("SET spark.sql.yannakakis.physicalCountJoinEnabled = true").show()
    else:
        spark.sql("SET spark.sql.codegen.wholeStage = true").show()
        spark.sql("SET spark.sql.yannakakis.physicalCountJoinEnabled = false").show()
    if group_in_leaves:
        spark.sql("SET spark.sql.yannakakis.countGroupInLeaves = true").show()
    else:
        spark.sql("SET spark.sql.yannakakis.countGroupInLeaves = false").show()

    results_df = df = pd.DataFrame([], columns = ['query', 'runtime', 'peak_memory', 'mode', 'run', 'group_leaves', 'physical_cj'])
    results_file = f'benchmark-results-{dbname}/results-{mode}.csv'
    if (os.path.exists(results_file)):
        results_df = pd.read_csv(results_file, index_col=0)

    for run in runs:
        for q in queries:
            results = [benchmark(spark, dbname, q, mode, run) + [group_in_leaves, physical_cj]]
            new_df = pd.DataFrame(results, columns = ['query', 'runtime', 'peak_memory', 'mode', 'run', 'group_leaves', 'physical_cj'])
            results_df = pd.concat([results_df, new_df], ignore_index=True)
            results_df.to_csv(f'benchmark-results-{dbname}/results-{mode}.csv')
            print(results_df)
    

## SNAP Benchmark

### Optimized execution

In [None]:
#### benchmark configuration
group_in_leaves = False
dbname = 'snap'
mode = 'opt'
runs = ['1', '2', '3', '4', '5', '6']
#runs = ['1']
####

tables = ['patents', 'wiki', 'google', 'dblp']
#tables = ['wiki']


for tablename in tables:
    queries = sorted(glob.glob(f'snap-queries/all/{tablename}-*'))
    print('running queries: ' + str(queries))
    benchmark_all(dbname, mode, runs, queries, physical_cj=True)



### Ref execution

In [None]:
#### benchmark configuration
group_in_leaves = False
dbname = 'snap'
mode = 'ref'
runs = ['1', '2', '3', '4', '5', '6']
####

queries = ['snap-queries/all/patents-path02.sql',
          'snap-queries/all/patents-path03.sql',
          'snap-queries/all/patents-path04.sql',
          'snap-queries/all/patents-path05.sql',
          'snap-queries/all/patents-tree01.sql',
          'snap-queries/all/wiki-path02.sql',
           'snap-queries/all/google-path02.sql',
           'snap-queries/all/google-path03.sql',
           'snap-queries/all/google-path04.sql',
           'snap-queries/all/dblp-path02.sql',
           'snap-queries/all/dblp-path03.sql',
           'snap-queries/all/dblp-path04.sql',
           'snap-queries/all/dblp-path05.sql',
           'snap-queries/all/dblp-tree01.sql',
           'snap-queries/all/dblp-tree02.sql'
          ]


print('running queries: ' + str(queries))

benchmark_all(dbname, mode, runs, queries)

## LSQB Benchmark

In [None]:
#### benchmark configuration
dbname = 'lsqb'
group_in_leaves = False
physical_cj = True
#mode = 'opt'
runs = ['1', '2', '3', '4', '5', '6']
runs = ['1', '2']
####

queries = ['lsqb/sql/q1.sql', 'lsqb/sql/q4.sql']
queries_hints = ['lsqb/sql/q1-hint.sql', 'lsqb/sql/q4-hint.sql']

print('running queries: ' + str(queries))
#benchmark_all(dbname, 'opt', runs, queries, group_in_leaves=False, physical_cj=True)
#benchmark_all(dbname, 'opt', runs, queries_hints, group_in_leaves=False, physical_cj=True)

#benchmark_all(dbname, 'opt', ['1'], ['lsqb/sql/q4.sql', 'lsqb/sql/q4-hint.sql'], group_in_leaves=False, physical_cj=True)
#benchmark_all(dbname, 'opt', ['3', '4', '5', '6'], ['lsqb/sql/q4.sql', 'lsqb/sql/q4-hint.sql'], group_in_leaves=False, physical_cj=True)
#benchmark_all(dbname, 'opt', ['1', '2', '3', '4', '5', '6'], ['lsqb/sql/q4.sql', 'lsqb/sql/q4-hint.sql'], group_in_leaves=False, physical_cj=False)

#benchmark_all(dbname, 'opt', ['1', '2', '3', '4', '5', '6'], ['lsqb/sql/q1-hint.sql'], group_in_leaves=False, physical_cj=False)
#benchmark_all(dbname, 'opt', ['1', '2', '3', '4', '5', '6'], ['lsqb/sql/q1-hint.sql'], group_in_leaves=False, physical_cj=True)
#benchmark_all(dbname, 'opt', ['3', '4', '5', '6'], ['lsqb/sql/q1.sql'], group_in_leaves=False, physical_cj=False)
#benchmark_all(dbname, 'opt', ['3', '4', '5', '6'], ['lsqb/sql/q1.sql'], group_in_leaves=False, physical_cj=True)
benchmark_all(dbname, 'ref', ['4', '5', '6'], ['lsqb/sql/q1.sql'])


#benchmark_all(dbname, 'opt', runs, queries_hints, group_in_leaves=False, physical_cj=True)
#benchmark_all(dbname, 'ref', ['3', '4', '5', '6'], ['lsqb/sql/q4.sql'])

## TPC-H Benchmark

In [None]:
#### benchmark configuration
group_in_leaves = False
dbname = 'tpch'
runs = ['1', '2', '3', '4', '5', '6']
#runs = ['x1', 'x2']
####

queries = ['tpch-kit/dbgen/queries/postgres/2.sql',
           'tpch-kit/dbgen/queries/postgres/11.sql', 
           'tpch-kit/dbgen/queries/postgres/11-hint.sql']
queries += ['tpch-queries/median-1.sql', 'tpch-queries/median-1-hint.sql']
#queries = ['tpch-queries/median-1.sql', 'tpch-queries/median-1-hint.sql' ]
#queries = ['tpch-kit/dbgen/queries/postgres/11.sql', 
#           'tpch-kit/dbgen/queries/postgres/11-hint.sql']
#queries = ['tpch-queries/2-subq.sql'] #, 'tpch-queries/2-subq-hint.sql']

print('running queries: ' + str(queries))
benchmark_all(dbname, 'ref', ['3', '4', '5', '6'], queries)
benchmark_all(dbname, 'opt', ['3', '4', '5', '6'], queries, group_in_leaves = group_in_leaves, physical_cj=True)
benchmark_all(dbname, 'opt', ['1', '2', '3', '4', '5', '6'], queries, group_in_leaves = group_in_leaves, physical_cj=False)
#benchmark_all(dbname, 'opt', runs, queries, group_in_leaves = group_in_leaves, physical_cj=False)


## JOB (IMDB) Benchmark

In [None]:
#### benchmark configuration
group_in_leaves = False
dbname = 'imdb'
runs = ['1', '2', '3', '4', '5', '6']
####

queries = ['job/2a.sql', 'job/2b.sql', 'job/2c.sql', 'job/2d.sql',
           'job/3a.sql', 'job/3b.sql', 'job/3c.sql',
           'job/5a.sql', 'job/5b.sql', 'job/5c.sql',
           'job/17a.sql', 'job/17b.sql', 'job/17c.sql', 'job/17d.sql', 'job/17e.sql', 'job/17f.sql',
           'job/20a.sql', 'job/20b.sql', 'job/20c.sql',
          ]

print('running queries: ' + str(queries))
benchmark_all(dbname, 'opt', runs, queries, physical_cj=True)
#benchmark_all(dbname, 'ref', runs, queries)

## STATS Benchmark

In [None]:
#### benchmark configuration
dbname = 'stats'
#mode = 'opt'
runs = ['1', '2', '3', '4', '5', '6']
#runs = ['04']
#runs = ['01']
####

queries = sorted(glob.glob('stats-queries/*.sql'))
queries_hint = sorted(glob.glob('stats-queries/hints/*.sql'))

print('running queries: ' + str(queries))
#benchmark_all(dbname, 'opt', runs, queries, physical_cj=True)
benchmark_all(dbname, 'opt', runs, queries_hint, group_in_leaves=True, physical_cj=True)
#benchmark_all(dbname, 'ref', runs, queries)

## hetionet

In [None]:
#### benchmark configuration
dbname = 'hetio'
#mode = 'opt'
runs = ['1', '2', '3', '4', '5', '6']
#runs = ['04']
#runs = ['01']
####

queries = sorted(glob.glob('hetio/*.sql'))
#queries = ['hetio/CtDpSpD.sql']

print('running queries: ' + str(queries))
#benchmark_all(dbname, 'opt', runs, queries, physical_cj=True)
benchmark_all(dbname, 'opt', runs, queries, group_in_leaves=False, physical_cj=True)
benchmark_all(dbname, 'opt', runs, queries, group_in_leaves=False, physical_cj=False)
benchmark_all(dbname, 'ref', runs, queries)
#benchmark_all(dbname, 'ref', runs, queries)

In [None]:
spark = create_spark()
#import_db(spark, 'stats')
import_db(spark, 'lsqb')

In [None]:
spark.sparkContext.setLogLevel("INFO")
spark.sql("SET spark.sql.yannakakis.enabled = false").show()
#spark.sql("SET spark.sql.yannakakis.enabled = false").show()
#df = run_query(spark, 'stats-queries/142-135.sql')
#df = run_query(spark, 'stats-queries/hints/142-135-hint.sql')
#df = run_query(spark, 'stats-queries/hints/141-068-hint.sql')
df = run_query(spark, 'lsqb/sql/q1.sql')
df.show()
print(explain_str(df))

In [None]:

df = spark.sql('select count(*) from comments as c')
df.show()

In [None]:
spark.sql('cache table comments').show()

In [None]:
a1 = 'a'
a2 = 'a'

set([a1, 'b']) - set([a2])