# System Settings

In [None]:
import os
from pathlib import Path
import pandas as pd
from IPython.display import display
import socket


In [None]:
pd.set_option('display.max_rows', None)
pd.set_option('display.max_colwidth', None)

home = os.path.realpath(str(Path.home()))
cwd = os.getcwd()
print(f'home: {home}')
print(f'cwd: {cwd}')

# Convert the os.environ object to a dictionary and then to a DataFrame
env_df = pd.DataFrame(list(dict(os.environ).items()), columns=['Environment Variable', 'Value'])

display(env_df)

localhost=socket.gethostname()
local_ip=socket.gethostbyname(localhost)

print(f'localhost: {localhost}')
print(f'ip: {local_ip}')

In [None]:
spark_version=!head  -n1 $SPARK_HOME/RELEASE | awk '{print $2}'
spark_version = spark_version[0]

print(f"Spark version from SPARK_HOME: {spark_version}")
spark_version_short=''.join(spark_version.split('.'))

# Imports

In [None]:
import findspark
import os

findspark.init(os.environ['SPARK_HOME'])
os.environ.setdefault('SPARK_SUBMIT_OPTS', '-Dscala.usejavacp=true')

In [None]:
import socket
localhost=socket.gethostname()
local_ip=socket.gethostbyname(localhost)

In [None]:
import atexit
import importlib
import json
import math
import signal
import tempfile
import time
import timeit

import matplotlib
import matplotlib.colors as colors
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import numpy as np
import platform
import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as T
from collections import namedtuple
from concurrent.futures import ThreadPoolExecutor
from datetime import date, datetime
from functools import reduce
from IPython.display import display, HTML
from matplotlib import rcParams
from pyspark import SparkConf, SparkContext
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.sql import SparkSession, SQLContext, Window
from pyspark.sql.functions import col, floor, lit, rank, to_date
from pyspark.sql.types import (DoubleType, FloatType, IntegerType,
                               StringType, StructField, StructType,
                               TimestampType)
import requests
import json
import gzip

rcParams['font.sans-serif'] = 'Courier New'
rcParams['font.family'] = 'Courier New'
rcParams['font.size'] = '12'

%matplotlib inline


# TestTPC

In [None]:
from dataclasses import dataclass
from functools import wraps
from pathlib import Path
from typing import List 
import sqlparse

class TestTPC:
    @dataclass
    class query_info:
        tables: List[str]
        sql: List[str]

    query_infos = {}
    query_ids =[]

    tpctables=[]
        
    def __init__(self, spark, table_dir, data_source = 'parquet', tpc_query_path=''):
        self.spark = spark
        self.sc = spark.sparkSession.sparkContext
        self.appid = self.sc.applicationId
        self.data_source = data_source
        self.table_loaded = False
        self.result = {}
        self.duration = 0
        self.stopped = False
        self.table_dir=table_dir
        self.tpc_query_path=tpc_query_path
        for l in os.listdir(self.tpc_query_path):
            if (l[-3:] == 'sql'):
                with open(os.path.join(self.tpc_query_path,l),"r") as f:
                    self.query_infos[l.split(".")[0]]=self.query_info(self.tpctables,sqlparse.split("\n".join(f.readlines())))
        self.query_ids = sorted(self.query_infos.keys(), key=lambda x: str(len(x))+x if x[-1] != 'a' and x[-1] != 'b' else str(len(x)-1) + x)
        print("http://{}:18080/history/{}/jobs/".format(local_ip, self.sc.applicationId))
     
    def load_table(self, table):
        if type(self.table_dir)==list:
            return self.spark.read.format(self.data_source).load([os.path.join(t, table) for t in self.table_dir])
        else:
            return self.spark.read.format(self.data_source).load(os.path.join(self.table_dir, table))
    
    def load_tables_as_tempview(self, tables):
        for table in tables:
            df = self.load_table(table)
            df.createOrReplaceTempView(table)
        
    def load_all_tables_as_tempview(self):
        print(f"Loading all tables: {self.tpctables}")
        self.load_tables_as_tempview(self.tpctables)
    
    def load_query(self, query):
        info = self.query_infos[query]
        return [self.spark.sql(q) for q in info.sql]
    
    def run_query(self, query, action, explain = False, print_result=False, load_table=True):
        if load_table:
            self.load_all_tables_as_tempview()
        print("start query " + query + ", application id " + self.sc.applicationId)
        self.sc.setJobDescription(query)

        queries = self.load_query(query)
        ts = time.time()
        start_time = timeit.default_timer()
        print("{} : {}".format("Start time", start_time))
        for q in queries:
            if explain: q.explain()
            action(q)
        end_time = timeit.default_timer()
        duration = end_time - start_time
        display(HTML(('Completed Query. Time(sec): <font size=6pt color=red>{:f}</font>'.format(duration))))
        
        self.result[query] = {
            'query_name':query,
            'application_id':self.sc.applicationId,
            'application_time_taken':duration,
            'query_status':'pass',
            'start_time':ts}
        self.duration += float(duration)
        if print_result:
            print(collect)

    def power_run(self, explain=False, print_result=False, load_table=True, action=None):
        if action is None:
            action = lambda df: df.collect()
        if load_table:
            self.load_all_tables_as_tempview()
        for l in self.query_ids:
            self.run_query(l, action, explain=explain, print_result=print_result, load_table=False)

    def print_result(self):
        print(self.result)
        print()
        print(f"total duration:\n{self.duration}\n")
        print(self.appid)
        for t in self.result.values():
            print(t['application_time_taken'])
    
class TestTPCH(TestTPC):
    tpctables = ['customer', 'lineitem', 'nation', 'orders', 'part', 'partsupp', 'region', 'supplier']
        
    def __init__(self, spark, table_dir, data_source = 'parquet',tpc_query_path = '/opt/spark/tpch-queries/'):
        super().__init__(spark, table_dir, data_source, tpc_query_path)
                
class TestTPCDS(TestTPC):
    tpctables = [ 'call_center',
         'catalog_page',
         'catalog_returns',
         'catalog_sales',
         'customer',
         'customer_address',
         'customer_demographics',
         'date_dim',
         'household_demographics',
         'income_band',
         'inventory',
         'item',
         'promotion',
         'reason',
         'ship_mode',
         'store',
         'store_returns',
         'store_sales',
         'time_dim',
         'warehouse',
         'web_page',
         'web_returns',
         'web_sales',
         'web_site']
    
    
    def __init__(self, spark, table_dir, data_source = 'parquet', tpc_query_path = f'/opt/spark/tpcds-queries/'):
        super().__init__(spark, table_dir, data_source, tpc_query_path)

# Create SparkContext

## default config

In [None]:
def convert_to_bytes(size):
    units = {'k': 1, 'm': 2, 'g': 3, 'K': 1, 'M': 2, 'G': 3}
    size = size.lower()
    if size[-1] in units:
        return int(size[:-1]) * 1024 ** units[size[-1]]
    else:
        return int(size)

def yarn_padding(size):
    min_size =  convert_to_bytes('1g')
    step = min_size
    while size > min_size:
        min_size += step
    return min_size - size

def findjemalloc():
    jemallocDir = !ssh $l "whereis libjemalloc.so.2"
    libjemalloc = jemallocDir[0].split(' ')
    return libjemalloc[1]

def get_py4jzip():
    spark_home=os.environ['SPARK_HOME']
    py4jzip = !ls {spark_home}/python/lib/py4j*.zip
    return py4jzip[0]

class SparkContextT:
    def __init__(self, executors_per_node, task_per_core, extra_jars, offheap_ratio):
        res = requests.get("http://"+localhost+":8080/json/")
        restout=json.loads(res.text)
        
        self.workers=restout["workers"]
        self.num_nodes=len(self.workers)
        self.host_is_worker = local_ip in [l["host"] for l in self.workers]
        self.executors_per_node = executors_per_node
        self.cores_per_executor = int(self.workers[0]["cores"]/executors_per_node)
        
        self.task_per_core = task_per_core
        self.memory_per_node = str(self.workers[0]['memory'])+"m"
        self.extra_jars = extra_jars
        self.master = 'spark://'+localhost+":7077"
        self.conf = SparkConf()
        self.offheap_ratio=offheap_ratio
         
        
    def initialize_cfg(self):
        num_executors = self.num_nodes*self.executors_per_node
        parallelism = num_executors*self.cores_per_executor*self.task_per_core

        driver_memory = convert_to_bytes('1g')
        # 1g per core + 1g 
        executor_memory_overhead = num_executors * convert_to_bytes('1g') + convert_to_bytes('1g')

        # Minimun executor memory
        min_memory = convert_to_bytes('1g')

        # Calculate executor onheap memory
        num_driver = 1 if self.host_is_worker else 0
        executor_memory = math.floor((convert_to_bytes(self.memory_per_node) - (executor_memory_overhead + min_memory)*self.executors_per_node - (driver_memory + min_memory)*num_driver)/(offheap_ratio*num_driver + (1+offheap_ratio)*executors_per_node))
        executor_memory = max(executor_memory, min_memory)

        # Calculate driver/executor offheap memory in MB
        #offheap_memory_per_node = convert_to_bytes(memory_per_node) - (executor_memory + executor_memory_overhead) * executors_per_node
        if self.offheap_ratio > 0:
            enable_offheap = True
            offheap_memory = math.floor(executor_memory*self.offheap_ratio)
        else:
            enable_offheap = False
            offheap_memory = 0

        byte_to_mb = lambda x: int(x/(1024 ** 2))
        driver_memory_mb = byte_to_mb(driver_memory)
        executor_memory_overhead_mb = byte_to_mb(executor_memory_overhead)
        executor_memory_mb = byte_to_mb(executor_memory)
        offheap_memory_mb = byte_to_mb(offheap_memory)

        executor_totalmem_mb = executor_memory_overhead_mb + executor_memory_mb + offheap_memory_mb
        executor_totalmem_mb = yarn_padding(executor_totalmem_mb)
        if byte_to_mb(convert_to_bytes(self.memory_per_node)) - executor_totalmem_mb*self.executors_per_node > executor_totalmem_mb:
            executor_memory_overhead_mb += 1024

        print(f'''
            executors per node: {self.executors_per_node}
            parallelism: {parallelism}
            executor memory: {executor_memory_mb}m
            offheap memory: {offheap_memory_mb}m
        ''')

        self.conf.set('spark.app.name', self.app_name)\
            .set('spark.master',self.master)\
            .set('spark.executor.memory', '{:d}m'.format(executor_memory_mb))\
            .set('spark.memory.offHeap.enabled', enable_offheap)\
            .set('spark.memory.offHeap.size','{:d}m'.format(offheap_memory_mb))\
            .set('spark.sql.shuffle.partitions', parallelism)\
            .set('spark.executor.instances', '{:d}'.format(num_executors))\
            .set('spark.executor.cores','{:d}'.format(self.cores_per_executor))\
            .set('spark.task.cpus','{:d}'.format(1))\
            .set('spark.driver.memory', '{:d}m'.format(driver_memory_mb))\
            .set('spark.executor.memoryOverhead', '{:d}m'.format(executor_memory_overhead_mb))\
            .set('spark.driver.maxResultSize', '4g')\
            .set('spark.driver.extraClassPath', self.extra_jars) \
            .set('spark.executor.extraClassPath', self.extra_jars) \
            .set('spark.executorEnv.PYTHONPATH',f"{os.environ['SPARK_HOME']}python:{get_py4jzip()}") \
            .set("spark.sql.broadcastTimeout", "4800") \
            .set('spark.serializer','org.apache.spark.serializer.KryoSerializer')\
            .set('spark.kryoserializer.buffer.max','512m')\
            .set('spark.kryo.unsafe',False)\
            .set('spark.sql.adaptive.enabled',True)\
            .set('spark.sql.autoBroadcastJoinThreshold',"10m")\
            .set('spark.sql.catalogImplementation','hive')\
            .set('spark.sql.optimizer.dynamicPartitionPruning.enabled',True)\
            .set('spark.cleaner.periodicGC.interval', '10s')

    def create_cntx(self):
        print("spark.serializer: ",self.conf.get("spark.serializer"))
        print("master: ",self.conf.get("spark.master"))

        sc = SparkContext(conf = self.conf,master=self.conf.get("spark.master"))
        sc.setLogLevel('ERROR')

        sc.addPyFile(f"{os.environ['SPARK_HOME']}/python/lib/pyspark.zip")
        sc.addPyFile(get_py4jzip())

        spark = SQLContext(sc)

        time.sleep(30)

        spark_session = SparkSession(sc)

        print("appid: ",sc.applicationId)
        print("SparkConf:")

        df = pd.DataFrame(sc.getConf().getAll(), columns=['key', 'value'])
        display(df)
        self.sc=sc
        self.spark=spark


## Spark

In [None]:
class VanillaSparkContext(SparkContextT):
    def __init__(self, executors_per_node, task_per_core, extra_jars, offheap_ratio=2.0):
        super().__init__(executors_per_node, task_per_core, extra_jars, offheap_ratio)
        self.app_name = "vanilla"

## Gluten

In [None]:
class GlutenSparkContext(SparkContextT):
    def __init__(self, executors_per_node, task_per_core, extra_jars, offheap_ratio=7.0):
        super().__init__(executors_per_node, task_per_core, extra_jars, offheap_ratio)
        self.app_name = "gluten"
        
        self.conf.set('spark.sql.files.maxPartitionBytes', '4g')\
            .set('spark.plugins','org.apache.gluten.GlutenPlugin')\
            .set('spark.shuffle.manager','org.apache.spark.shuffle.sort.ColumnarShuffleManager')\
            .set('spark.gluten.sql.columnar.backend.lib','velox')\
            .set('spark.gluten.sql.columnar.maxBatchSize',4096)\
            .set('spark.gluten.sql.columnar.forceShuffledHashJoin',True)\
            .set('spark.executorEnv.LD_PRELOAD', findjemalloc())\
            .set('spark.gluten.sql.columnar.coalesce.batches', 'true')\
            .set('spark.gluten.memory.overAcquiredMemoryRatio','0')

## Create Context

In [None]:
class BenchmarkT:
    def __init__(self, sct, tabledir, data_source = 'parquet', tpc_query_path=''):
        self.sct=sct
        self.tabledir=tabledir
        self.data_source=data_source
        self.tpc_query_path=tpc_query_path
        
    def initialize(self):
        self.sct.initialize_cfg()
        self.sct.create_cntx()
        self.appid = self.sct.sc.applicationId
        print("start run: ", self.appid)
        
    def collect_profile(self):
        ###### collect query run result
        os.makedirs(f'/opt/spark/work-dir/profile/{self.appid}/result',exist_ok=True)
        for query_name in self.test_tpc.result.keys():
            with open(f"/opt/spark/work-dir/profile/{self.appid}/result/{query_name}-result.txt", "w") as f:
                json.dump(self.test_tpc.result[query_name], f, indent=4)
                
        ###### collect profile from worker
        # Define the script to run
        def run_script(partid):
            import subprocess
            import socket
            # Execute a simple command
            result = subprocess.run('df', capture_output=True, text=True)
            host=socket.gethostname()
            local_ip=socket.gethostbyname(host)
            with open("/opt/spark/work-dir/telegraf.out",'r') as f:
                telegraf=f.read()
            return {'host':local_ip,"df":result.stdout,'telegraf':telegraf}

        rdd = self.sct.sc.parallelize(range(self.sct.num_nodes), self.sct.num_nodes)
        results = rdd.map(run_script).collect()

        collected_workers=[]
        for r in results:
            if r['host'] not in collected_workers:
                collected_workers.append(r['host'])
                workerid=str(len(collected_workers))
                os.makedirs(f"/opt/spark/work-dir/profile/{self.appid}/worker{workerid}",exist_ok=True)
                with open(f"/opt/spark/work-dir/profile/{self.appid}/worker{workerid}/df.txt",'w') as f:
                    f.write(r['df'])
                with open(f"/opt/spark/work-dir/profile/{self.appid}/worker{workerid}/telegraf.out",'w') as f:
                    f.write(r['telegraf'])
        for w in self.sct.workers:
            if w['host'] not in collected_workers:
                print("Collection Profile Wrong, profile on worker ", w['id'], w['host']," isn't collected")
        
        ###### collect eventlog
        self.sct.sc.stop()
        
        with open(f'/opt/spark/events/{self.appid}', 'rb') as f_in, \
            gzip.open(f'/opt/spark/work-dir/profile/{self.appid}/{self.appid}.gz', 'wb') as f_out:
            f_out.writelines(f_in)
            
        print("appid is ", self.appid)
        print("saved to /opt/spark/work-dir/profile/")
    
class TPCHBenchmark(BenchmarkT):
    def __init__(self, sct, tabledir, data_source = 'parquet', tpc_query_path=''):
        super().__init__(sct, tabledir, data_source, tpc_query_path)
        app_name_suffix = f"_tpch_spark{spark_version_short}"
        self.sct.app_name = self.sct.app_name + app_name_suffix
    
    def initialize(self):
        super().initialize();
        self.test_tpc=TestTPCH(self.sct.spark, self.tabledir, self.data_source, self.tpc_query_path)
        
    
class TPCDSBenchmark(BenchmarkT):
    def __init__(self, sct, tabledir, data_source = 'parquet', tpc_query_path=''):
        super().__init__(sct, tabledir, data_source, tpc_query_path)
        app_name_suffix = f"_tpcds_spark{spark_version_short}"
        self.sct.app_name = self.sct.app_name + app_name_suffix
        if type(sct)==VanillaSparkContext:
            self.sct.conf.set('spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold', '0')\
                    .set('spark.sql.optimizer.runtime.bloomFilter.enabled', 'true')
        else:
            self.sct.conf.set('spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold', '0')\
                .set('spark.sql.optimizer.runtime.bloomFilter.enabled', 'true')\
                .set('spark.gluten.sql.columnar.joinOptimizationLevel', '18')\
                .set('spark.gluten.sql.columnar.physicalJoinOptimizeEnable', 'true')\
                .set('spark.gluten.sql.columnar.physicalJoinOptimizationLevel', '18')\
                .set('spark.gluten.sql.columnar.logicalJoinOptimizeEnable', 'true')
            
    def initialize(self):
        super().initialize();
        self.test_tpc=TestTPCDS(self.sct.spark, self.tabledir, self.data_source, self.tpc_query_path)
    