# Running queries using JSONS produced by Scala code

Install and import all needed packages

In [None]:
%%bash
pip install numpy
pip install pandas
pip install pyspark

In [None]:
import json
import time
import numpy as np
import csv
import multiprocessing
import signal
import pandas as pd
import os
import threading
import random
from pyspark.sql import SparkSession
from py4j.protocol import Py4JJavaError, Py4JError
import psutil
import string

Function for running one query. This means
*  run the original query 5 times (after one initial run, which we do not use)
*  run the rewritten queries 5 times (after one initial run, which we do not use) and drop the created tables each time
*  take the runtimes and calculate mean, median and standard deviation of time for either the original or rewritten query
*  compare the runtimes between the original query, the rewritten query and the rewritten query + the rewriting time (how long the Scala took)
*  save everything in a csv output file

In [None]:
# create a spark session
def create_spark():
    spark = SparkSession.builder \
        .appName("app") \
        .master(f'local[{SPARK_CORES}]') \
        .config("spark.driver.memory", f'{SPARK_MEMORY}g') \
        .config("spark.executor.memory", f'{SPARK_MEMORY}g') \
        .config("spark.memory.offHeap.enabled",False) \
        .config("spark.jars", "postgresql-42.3.3.jar") \
        .getOrCreate()
    return spark

# import the database into Spark from PostgreSQL
def import_db(spark, dbname):

    if dbname == "JOB":
        dbname = "imdb"
    else:
        dbname = dbname.lower()
    
    username = dbname
    password = dbname
    dbname = dbname

    df_tables = spark.read.format("jdbc") \
    .option("url", f'jdbc:postgresql://postgres:5432/{dbname}') \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "information_schema.tables") \
    .option("user", username) \
    .option("password", password) \
    .load()

    for idx, row in df_tables.toPandas().iterrows():
        if row.table_schema == 'public':
            table_name = row.table_name
            df = spark.read.format("jdbc") \
                .option("url", f'jdbc:postgresql://{DBHOST}:5432/{dbname}') \
                .option("driver", "org.postgresql.Driver") \
                .option("dbtable", table_name) \
                .option("user", username) \
                .option("password", password) \
                .load()
    
            print(table_name)
            #print(df.show())
            df.createOrReplaceTempView(table_name)

In [None]:
# functions for handling TO and cancelling those queries in case of a TO
def measure_resource_usage(resource_usage):
    t = threading.current_thread()
    secs = 0
    while getattr(t, "do_run", True):
        resource_usage.append(get_resource_usage(secs))
        #print("resource usage: " + str(resource_usage))
        secs += 1
        time.sleep(1)

def get_resource_usage(t):
    return {
        'time': t,
        'memory': psutil.virtual_memory(),
        'cpu': psutil.cpu_percent(interval=None, percpu=True),
        'cpu_total': psutil.cpu_percent(interval=None, percpu=False)
    }
    
def cancel_query(spark, seconds, group_id):
    time.sleep(seconds)
    print("cancelling jobs with id " + group_id)
    print(spark.sparkContext.cancelJobGroup(group_id))
    print("cancelled job")

def cancel_query_after(spark, seconds):
    group_id = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(16)) #random id
    spark.sparkContext.setJobGroup(group_id, group_id)
    threading.Thread(target=cancel_query, args=(spark, seconds, group_id,)).start()
    return group_id

In [None]:
# function to run the query 6 times checking for TO and calculate and saving all values
def run_query(benchmark, query, spark):
    print(benchmark, query)
    file_path = f'rewritten/{benchmark}_{query}_output.json'
    with open(file_path, 'r') as file:
        json_data = json.load(file)

    # get the original and rewritten query
    original_query = json_data["original_query"]
    rewritten_query_list = json_data["rewritten_query"]
    rewriting_time = json_data["time"]

    # change the queries such that they can be executed in SparkSQL (without changing the output)
    rewritten_query_list_spark = [rewritten_query.replace(" UNLOGGED TABLE ", " VIEW ")
                                                 .replace("TIMESTAMP(0)", "TIMESTAMP")
                                                 .replace("$", "_")
                                                 .replace("CREATE VIEW", "CREATE TEMPORARY VIEW")
                                  for rewritten_query in rewritten_query_list]

    # get the drop queries
    file_path_drop = f'rewritten/{benchmark}_{query}_drop.json'
    with open(file_path_drop, 'r') as file:
        json_drop = json.load(file)
    drop_query_list = json_drop["rewritten_query"]

    drop_query_list_spark = [drop_query.lower()
                                       .replace("drop view", "drop view if exists")
                                       .replace("drop table", "drop table if exists")
                             for drop_query in drop_query_list]

    for drop_query in drop_query_list_spark:
            drop = spark.sql(drop_query)
            drop.show()

    timeout_flag_orig = True
    timeout_flag_rewr = True

    # the first run is just a warm up run and to check for the time out
    try:
        spark.sparkContext._jvm.System.gc()

        start_time = time.time()
        resource_usage = []
        measure_thread = threading.Thread(target=measure_resource_usage, args=(resource_usage, ))
        measure_thread.start()
    
        group_id = cancel_query_after(spark, TIMEOUT)
        result = spark.sql(original_query)
        result.show()
        end_time = time.time()
        print(end_time-start_time)
    
        measure_thread.do_run = False
        timeout_flag_orig = False
    except Py4JError as e:
        print('timeout or error orig: ' + str(e))
        
    try:
        spark.sparkContext._jvm.System.gc()
        resource_usage = []
        measure_thread = threading.Thread(target=measure_resource_usage, args=(resource_usage, ))
        measure_thread.start()

        group_id = cancel_query_after(spark, TIMEOUT)
        for rewritten_query in rewritten_query_list_spark:
            if rewritten_query.startswith("SELECT"):
                result1 = spark.sql(rewritten_query)
                result1.show()
            else:
                result2 = spark.sql(rewritten_query)
                result2.show()

        measure_thread.do_run = False
        timeout_flag_rewr = False

        for drop_query in drop_query_list_spark:
            drop = spark.sql(drop_query)
            drop.show()
        
    except Py4JError as e:
        print('timeout or error rewr: ' + str(e))

    print(timeout_flag_orig, timeout_flag_rewr)
    # original and rewritten query are TOs
    if timeout_flag_orig and timeout_flag_rewr:
        orig_mean = "TO"
        orig_med = "TO"
        orig_std = "-"
        list_original = ["-", "-", "-", "-", "-"]
        
        rewr_mean = "TO"
        rewr_med = "TO"
        rewr_std = "-"
        rewr_mean_plus_rewr = "TO"
        rewr_med_plus_rewr = "TO"
        list_rewritten = ["-", "-", "-", "-", "-"]
        
        orig_or_rewr_mean = "-"
        orig_or_rewr_or_equal = "-"
        orig_or_rewr_plus_rewr_mean = "-"

    # original query is a TO and the rewritten not
    elif timeout_flag_orig:
        orig_mean = "TO"
        orig_med = "TO"
        orig_std = "-"
        list_original = ["-", "-", "-", "-", "-"]

        list_rewritten = []
        print("rewritten")
        for i in range(5):
            print(i)
            spark.sparkContext._jvm.System.gc()
            try:
                # execute the rewritten query
                start_time_rewritten = time.time()
                for rewritten_query in rewritten_query_list_spark:
                    exec = spark.sql(rewritten_query)
                    exec.show()
                end_time_rewritten = time.time()
                rewritten_time = end_time_rewritten - start_time_rewritten
                list_rewritten.append(rewritten_time)
            except Exception as e:
                list_rewritten.append("-")
            # drop all created tables
            for drop_query in drop_query_list_spark:
                drop = spark.sql(drop_query)
                drop.show()
        list_rewritten_filtered = [x for x in list_rewritten if x != "-"]
        rewr_mean = np.mean(list_rewritten_filtered)
        rewr_med = np.median(list_rewritten_filtered)
        rewr_std = np.std(list_rewritten_filtered)
        rewr_mean_plus_rewr = rewr_mean + rewriting_time
        rewr_med_plus_rewr = rewr_med + rewriting_time

        orig_or_rewr_mean = "rewr"
        orig_or_rewr_or_equal = "rewr"
        orig_or_rewr_plus_rewr_mean = "rewr"

    # rewritten query is a TO and the original not
    elif timeout_flag_rewr:
        list_original = []
        print("orig")
        for i in range(5):
            spark.sparkContext._jvm.System.gc()
            # execute the original query
            try:
                start_time_original = time.time()
                exec = spark.sql(original_query)
                exec.show()
                end_time_original = time.time()
                original_time = end_time_original - start_time_original
                list_original.append(original_time)
            except Exception as e:
                list_original.append("-")
        list_original_filtered = [x for x in list_original if x != "-"]
        orig_mean = np.mean(list_original_filtered)
        orig_med = np.median(list_original_filtered)
        orig_std = np.std(list_original_filtered)
        
        rewr_mean = "TO"
        rewr_med = "TO"
        rewr_std = "-"
        rewr_mean_plus_rewr = "TO"
        rewr_med_plus_rewr = "TO"
        list_rewritten = ["-", "-", "-", "-", "-"]

        orig_or_rewr_mean = "orig"
        orig_or_rewr_or_equal = "orig"
        orig_or_rewr_plus_rewr_mean = "orig"

    # both queries are no TOs
    else:
        #print(result, result1)
        list_original = []
        list_rewritten = []
        # take times for 5 runs (run 2-6) for the original query and the rewritten query
        print("orig+rewr")
        for i in range(5):
            print(i)
            spark.sparkContext._jvm.System.gc()
            # execute the original query
            try: 
                start_time_original = time.time()
                exec = spark.sql(original_query)
                exec.show()
                end_time_original = time.time()
                original_time = end_time_original - start_time_original
                list_original.append(original_time)
            except Exception as e:
                list_original.append("-")

            spark.sparkContext._jvm.System.gc()
            # execute the rewritten query
            try:
                start_time_rewritten = time.time()
                for rewritten_query in rewritten_query_list_spark:
                    exec = spark.sql(rewritten_query)
                    exec.show()
                end_time_rewritten = time.time()
                rewritten_time = end_time_rewritten - start_time_rewritten
                list_rewritten.append(rewritten_time)
            except Exception as e:
                list_rewritten.append("-")
            
            # drop all created tables
            for drop_query in drop_query_list_spark:
                drop = spark.sql(drop_query)
                drop.show()

        list_original_filtered = [x for x in list_original if x != "-"]
        list_rewritten_filtered = [x for x in list_rewritten if x != "-"]
        orig_mean = np.mean(list_original_filtered)
        orig_med = np.median(list_original_filtered)
        orig_std = np.std(list_original_filtered)
        rewr_mean = np.mean(list_rewritten_filtered)
        rewr_med = np.median(list_rewritten_filtered)
        rewr_std = np.std(list_rewritten_filtered)
        rewr_mean_plus_rewr = rewr_mean + rewriting_time
        rewr_med_plus_rewr = rewr_med + rewriting_time
        if orig_mean > rewr_mean:
            orig_or_rewr_mean = "rewr"
        else:
            orig_or_rewr_mean = "orig"
        if abs(rewr_mean-orig_mean) < 0.05:
            orig_or_rewr_or_equal = "equal 0.05"
        else:
            orig_or_rewr_or_equal = orig_or_rewr_mean
        if orig_mean > rewr_mean_plus_rewr:
            orig_or_rewr_plus_rewr_mean = "rewr"
        else:
            orig_or_rewr_plus_rewr_mean = "orig"
            
    list_output = [benchmark, query] + [orig_mean, rewr_mean, rewr_mean_plus_rewr, orig_or_rewr_mean, orig_or_rewr_or_equal, \
                                        orig_or_rewr_plus_rewr_mean, rewriting_time] + \
                    list_original + [orig_med, orig_std] + list_rewritten + [rewr_med, rewr_std, rewr_med_plus_rewr]
    #print(list_output)
    file_path = "results/SPA_Scala_comparison_TO_augment_server.csv"
    with open(file_path, 'a', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(list_output)

In [None]:
# Global configuration
SPARK_MEMORY = 120
SPARK_CORES = 4
DBHOST = 'postgres'
TIMEOUT = 100

Create the output csv with the header. We add the running times for each query then.

In [None]:
file_path = "results/SPA_Scala_comparison_TO_augment_server.csv"

names = ["bench", "query", "orig mean", "rewr mean", "rewr mean+rewr", "orig/rewr(mean)", "orig/rewr/equal", "orig/rewr+rewr(mean)", "rewriting", 
         "orig 1", "orig 2", "orig 3", "orig 4", "orig 5", "orig med", "orig_std", "rewr 1", "rewr 2", "rewr 3", "rewr 4", "rewr 5", "rewr med", 
         "rewr_std", "rewr med+rewr", ]

with open(file_path, 'w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(names)

Connect to Spark for each dataset and execute all queries:

## STATS

In [None]:
spark = create_spark()
import_db(spark, "STATS")

In [None]:
folder_path = 'rewritten/'
files = sorted(os.listdir(folder_path))
output_files = [file for file in files if file.endswith('_output.json') and file.startswith('STATS')]

for file in output_files:
    file_split = file.split("_")
    run_query(file_split[0], file_split[1], spark)

### SNAP

In [None]:
spark = create_spark()
import_db(spark, "SNAP")

In [None]:
folder_path = 'rewritten/'
files = sorted(os.listdir(folder_path))
output_files = [file for file in files if file.endswith('_output.json') and file.startswith('SNAP')]

for file in output_files:
    file_split = file.split("_")
    run_query(file_split[0], file_split[1], spark)

### JOB

In [None]:
spark = create_spark()
import_db(spark, "JOB")

In [None]:
folder_path = 'rewritten/'
files = sorted(os.listdir(folder_path))
output_files = [file for file in files if file.endswith('_output.json') and file.startswith('JOB')]

for file in output_files:
    file_split = file.split("_")
    run_query(file_split[0], file_split[1], spark)

### LSQB

In [None]:
spark = create_spark()
import_db(spark, "LSQB")

In [None]:
folder_path = 'rewritten/'
files = sorted(os.listdir(folder_path))
output_files = [file for file in files if file.endswith('_output.json') and file.startswith('LSQB')]

for file in output_files:
    file_split = file.split("_")
    run_query(file_split[0], file_split[1], spark)

### HETIO

In [None]:
spark = create_spark()
import_db(spark, "HETIO")

In [None]:
folder_path = 'rewritten/'
files = sorted(os.listdir(folder_path))
output_files = [file for file in files if file.endswith('_output.json') and file.startswith('HETIO')]

for file in output_files:
    file_split = file.split("_")
    run_query(file_split[0], file_split[1], spark)