In [1]:
import findspark
findspark.init() # this must be executed before the below import
from pyspark.sql import SparkSession
import pyarrow as pa
import pyarrow.parquet as pq

In [2]:
#SPARK Configuration
spark = SparkSession \
    .builder \
    .master("local") \
    .appName("Python Spark SQL Execution") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory","8g") \
    .config("spark.memory.offHeap.enabled",True) \
    .config("spark.memory.offHeap.size","8g") \
    .getOrCreate()
fs = pa.hdfs.connect(host='10.77.110.133', port=9001, user='liupengju')



In [3]:
import numpy as np
import time
import rtree
from rtree import index
from partition_tree import PartitionTree

  """Entry point for launching an IPython kernel.


In [4]:
def find_overlap_parquets(query, partition_index):
    '''
    find out all the overlap partition ids
    '''
    query_lower = [qr[0] for qr in query]
    query_upper = [qr[1] for qr in query]
    query_border = tuple(query_lower + query_upper)
    overlap_pids = list(partition_index.intersection(query_border))

    return overlap_pids


def transform_query_to_sql(query, used_dims, column_name_dict, hdfs_path, querytype=0, pids=None):
    sql = ''
    where_clause=''
    for i, dim in enumerate(used_dims):
#         if i==2: continue
        # if query[i][0] != -1:
        where_clause += column_name_dict[dim] + '>' + str(query[i]) + ' and '
        # if query[i][1] != -1:
        where_clause += column_name_dict[dim] + '<' + str(query[len(used_dims) + i]) + ' and '
    where_clause = where_clause[0:-4]  # remove the last 'and '
    
    if pids is not None and len(pids) != 0:
        pids_str = str(set(pids)).replace(" ", "")  # '{1,2,3}'
        pq_hdfs_path = hdfs_path + '/partition_' + pids_str + ".parquet"

    if querytype == 0:
        sql = "SELECT * FROM parquet.`" + pq_hdfs_path + "`WHERE " + where_clause
    elif querytype == 1:
        sql = "SELECT COUNT(*) FROM parquet.`" + pq_hdfs_path + "`WHERE " + where_clause
    elif querytype == 2:
        sql = "SELECT variance(_c0) FROM parquet.`" + pq_hdfs_path + "`WHERE " + where_clause
    elif querytype == 3:
        sql = "SELECT * FROM parquet.`" + pq_hdfs_path + "`"
    elif querytype == 4:
        sql = "SELECT _c1,_c2,_c3 FROM parquet.`" + pq_hdfs_path + "`"
    elif querytype==5:
        sql=[]
        for pid in pids:
            sql.append(f"SELECT * FROM parquet.`{hdfs_path + '/partition_' + str(pid) + '.parquet'}`") 
    # else:
    # pids = str(set(pids)).replace(" ", "") # '{1,2,3}'
    # sql = "SELECT * FROM parquet.`" + hdfs_path + 'partition_' + pids + ".parquet` WHERE " + sql
    # sql = "SELECT COUNT(*) FROM parquet.`" + hdfs_path + 'partition_' + pids + ".parquet` WHERE " + sql
    # sql = "SELECT variance(_c0) FROM parquet.`" + hdfs_path + 'partition_' + pids + ".parquet` WHERE " + sql
    return sql,where_clause

# Core function: record query execution time, scanned data size
def query_with_parquets(query, used_dims, column_name_dict, hdfs_path, querytype=0, partition_tree=None,
                        print_execution_time=False):
    start_time = time.time()

    sql = None
    where_clause=None
    filter_data_size=[]
    if partition_tree == None:
        sql,_ = transform_query_to_sql(query, used_dims, column_name_dict, hdfs_path, querytype)
    else:
        pids = partition_tree.query_single(query)  # find_overlap_parquets(query, rtree_idx)
        sql,where_clause = transform_query_to_sql(query, used_dims, column_name_dict, hdfs_path, querytype, pids)
        pre_sql,_ = transform_query_to_sql(query, used_dims, column_name_dict, hdfs_path, 1, pids)
        # print(sql)
        print("pids:", pids)

    # print("generated sql:", sql)
    end_time_1 = time.time()
    if isinstance (sql,list):
        for sql_item in sql:
            query_result = spark.sql(sql_item)
            filter_data_size.append(len(query_result.where(where_clause).collect()))
    else:
        filter_data_size.append(len(spark.sql(sql).collect()))
    end_time_2 = time.time()
    #     print("result size:", len(query_result))
    #     print("result content:", query_result)

    # 1. compute actual tuples from parquet file
    # pids = partition_tree.query_single(query)
    # pids = str(set(pids)).replace(" ", "")  # '{1,2,3}'
    # parquets_path = hdfs_path + '/partition_' + pids + ".parquet"
    # count_sql = "SELECT COUNT(*) FROM parquet.`" + parquets_path+"`"
    # actual_data_size=spark.sql(count_sql).collect()[0]['count(1)']
    
    # 2. compute actual tuples from parquet meta file
    pids = partition_tree.query_single(query)
    data_size_list=[]
    parquets_path=[hdfs_path + '/partition_' + str(pid) + ".parquet" for pid in pids]
    actual_data_size2=0
    for par_path in parquets_path:
        fw=fs.open(par_path,'rb')
        meta = pa.parquet.read_metadata(fw, memory_map=False).to_dict()
        actual_data_size2+=meta['num_rows']
        data_size_list.append(meta['num_rows'])
        fw.close()
    query_translation_time = end_time_1 - start_time
    query_execution_time = end_time_2 - end_time_1
#     print('query execution time: ', query_execution_time,' parquet size:',sum(data_size_list))
    print(f"{data_size_list}--{filter_data_size}")
    if print_execution_time:
        print('query translation time: ', query_translation_time)
        print('query execution time: ', query_execution_time)
        

#     return (query_translation_time, query_execution_time,query_result.toPandas().shape[0],actual_data_size2,sum(data_size_list))
    return (query_translation_time, query_execution_time, 0,actual_data_size2)

In [5]:
def load_query(path):
    query_set = np.genfromtxt(path, delimiter=' ')
    # query_set = query_set.reshape(len(query_set),-1,2)
    return query_set


def kdnode_2_border(kdnode):
    lower = [domain[0] for domain in kdnode[0]]
    upper = [domain[1] for domain in kdnode[0]]
    border = tuple(lower + upper)  # non interleave
    return border


def load_partitions_from_file(path):
    '''
    the loaded stretched_kdnodes: [num_dims, l1,l2,...,ln, u1,u2,...,un, size, id, pid, left_child,id, right_child_id]
    '''
    stretched_kdnodes = np.genfromtxt(path, delimiter=',')
    num_dims = int(stretched_kdnodes[0, 0])
    kdnodes = []
    for i in range(len(stretched_kdnodes)):
        domains = [[stretched_kdnodes[i, k + 1], stretched_kdnodes[i, 1 + num_dims + k]] for k in range(num_dims)]
        row = [domains]
        row.append(stretched_kdnodes[i, 2 * num_dims + 1])
        # to be compatible with qd-tree's partition, that do not have the last 4 attributes
        if len(stretched_kdnodes[i]) > 2 * num_dims + 2:
            row.append(stretched_kdnodes[i, -4])
            row.append(stretched_kdnodes[i, -3])
            row.append(stretched_kdnodes[i, -2])
            row.append(stretched_kdnodes[i, -1])
        kdnodes.append(row)
    return kdnodes


# def prepare_partition_index(partition_path):
#     partitions = load_partitions_from_file(partition_path)

#     p = index.Property()
#     p.leaf_capacity = 32
#     p.index_capacity = 32
#     p.NearMinimumOverlaoFactor = 16
#     p.fill_factor = 0.8
#     p.overwrite = True
#     pidx = index.Index(properties = p)

#     partition_index = index.Index(properties = p)
#     for i in range(len(partitions)):
#         partition_index.insert(i, kdnode_2_border(partitions[i]))

#     return partition_index

# Execute queries serially to avoid the impact of other factors on query execution
def batch_query(queryset, used_dims, column_name_dict, hdfs_path, querytype=0, partition_path=""):
    #     rtree_idx = None
    #     if use_rtree_idx:
    #         rtree_idx = prepare_partition_index(partition_path)

    partition_tree = PartitionTree(len(used_dims))  # newly added
    partition_tree.load_tree(partition_path)

    start_time = time.time()

    # add statistics result
    results = []
    count = 0
    for i in range(0, len(queryset)):
        result = query_with_parquets(queryset[i], used_dims, column_name_dict, hdfs_path, querytype, partition_tree)
        print('finish query#', count)
        count += 1
        results.append(result)
        # print("query:",queryset[i])
    #         if i == 0:
    #             break # just analysis top k queries
    end_time = time.time()

    result_size = 0
    actual_data_size=0
    total_response_time=0
    for result in results:
        total_response_time+=result[1]
        result_size += result[2]
        actual_data_size+=result[3]
    avg_result_size = int(result_size // len(queryset))
    avg_actual_data_size = int(actual_data_size // len(queryset))
    print('average result size: ', avg_result_size)
    print('average actual data size: ', avg_actual_data_size)
    print('total query response time: ', total_response_time)
    print('average query response time: ', total_response_time / len(queryset))


In [6]:
#Conduct the experiments
# ==== set environment parameters and generate dataset ====
scale_factor = 50
problem_type = 2
# query_path = '/home/liupengju/pycharmProjects/NORA_JOIN_SIMULATION/NORA_experiments/queryset/' #lineitem
query_path = '/home/liupengju/pycharmProjects/NORA_JOIN_SIMULATION/NORA_experiments/queryset/tpcds/' #store_sales
## scale 50 and 10 / 100
training_set = np.genfromtxt(query_path+"prob"+str(problem_type)+"_"+str(scale_factor)+"_train"+".csv", delimiter=',')
# used_dims = [1,2,4] #lineitem
used_dims = [1, 2, 3] #store_sales
num_dims = 16
column_names = ['_c'+str(i) for i in range(num_dims)]
column_name_dict = {}
for i in range(num_dims):
    column_name_dict[i] = column_names[i]
# scale 100
# hdfs_path_nora = 'hdfs://10.77.110.133:9001/par_nora/NORA/prob'+str(problem_type)+'/scale100/merged/'
# hdfs_path_qdtree ='hdfs://10.77.110.133:9001/par_nora/QdTree/prob'+str(problem_type)+'/scale100/merged/'
# hdfs_path_paw ='hdfs://10.77.110.133:9001/par_nora/PAW/prob'+str(problem_type)+'/scale100/merged/'

# scale 50 and 10
# hdfs_base_path = 'hdfs://10.77.110.133:9001/par_nora/'#lineitem
hdfs_base_path = 'hdfs://10.77.110.133:9001/par_nora/tpcds/'#tpcds
hdfs_path_nora = hdfs_base_path + 'NORA/prob' + str(problem_type) + '/scale' + str(scale_factor) + "/merged/"
hdfs_path_qdtree = hdfs_base_path + 'QdTree/prob' + str(problem_type) + '/scale' + str(scale_factor) + "/merged/"
hdfs_path_paw = hdfs_base_path + 'PAW/prob' + str(problem_type) + '/scale' + str(scale_factor) + "/merged/"

# newly added
querytype = 5 # 0: SELECT *;  2: SELECT variance(_c0)
# partition_base_path = '/home/liupengju/pycharmProjects/NORA_JOIN_SIMULATION/PartitionLayout/' #lineitem
partition_base_path = '/home/liupengju/pycharmProjects/NORA_JOIN_SIMULATION/PartitionLayout/tpcds/'  #tpcds

# scale 100
nora_partition_path = partition_base_path + 'prob' + str(problem_type) + '_nora_scale' + str(scale_factor)
qdtree_partition_path = partition_base_path + 'prob' + str(problem_type) + '_qdtree_scale' + str(scale_factor)
paw_partition_path = partition_base_path + 'prob' + str(problem_type) + '_paw_scale' + str(scale_factor)

# scale 50 and 10 / 100
# nora_partition_path = partition_base_path + 'prob' + str(problem_type) + '_nora_scale' + str(scale_factor)
# qdtree_partition_path = partition_base_path + 'prob' + str(problem_type) + '_qdtree_scale' + str(scale_factor)
# kdtree_partition_path = partition_base_path + 'prob' + str(problem_type) + '_kdtree_scale' + str(scale_factor)


In [7]:
# Execute queries over Qd-Tree layout
batch_query(training_set, used_dims, column_name_dict, hdfs_path_qdtree, querytype, qdtree_partition_path)

pids: [273]
[655695]--[182396]
finish query# 0
pids: [177]
[506243]--[36240]
finish query# 1
pids: [310]
[837833]--[29626]
finish query# 2
pids: [488, 426]
[846650, 689377]--[4154, 2702]
finish query# 3
pids: [481]
[506531]--[14582]
finish query# 4
pids: [291]
[732576]--[149909]
finish query# 5
pids: [177]
[506243]--[13491]
finish query# 6
pids: [181]
[1163038]--[3201]
finish query# 7
pids: [165]
[617633]--[12852]
finish query# 8
pids: [287]
[725131]--[29996]
finish query# 9
pids: [262, 303]
[713464, 678490]--[6029, 32015]
finish query# 10
pids: [326]
[1029881]--[27439]
finish query# 11
pids: [413]
[504715]--[30794]
finish query# 12
pids: [430]
[543447]--[10616]
finish query# 13
pids: [384, 315, 316]
[595181, 656997, 543601]--[51278, 62508, 64297]
finish query# 14
pids: [473, 474]
[811879, 753574]--[12825, 15688]
finish query# 15
pids: [444]
[1040340]--[501987]
finish query# 16
pids: [143]
[607690]--[111388]
finish query# 17
pids: [406]
[883941]--[7940]
finish query# 18
pids: [396]
[10

[578531]--[62187]
finish query# 153
pids: [299, 438]
[854248, 530855]--[12083, 133559]
finish query# 154
pids: [340]
[709096]--[20579]
finish query# 155
pids: [200]
[920215]--[24731]
finish query# 156
pids: [454]
[655752]--[52406]
finish query# 157
pids: [192, 360, 462, 359]
[632235, 655220, 596445, 612777]--[55016, 92091, 71246, 86151]
finish query# 158
pids: [295]
[681930]--[19725]
finish query# 159
pids: [233, 407]
[861954, 587562]--[114536, 56138]
finish query# 160
pids: [144]
[536358]--[1347]
finish query# 161
pids: [488, 487]
[846650, 513557]--[7418, 7824]
finish query# 162
pids: [286]
[907988]--[23569]
finish query# 163
pids: [401]
[561728]--[34031]
finish query# 164
pids: [326]
[1029881]--[81455]
finish query# 165
pids: [384, 385, 387]
[595181, 954920, 797962]--[21216, 40169, 43903]
finish query# 166
pids: [411, 412]
[609961, 504975]--[78234, 52768]
finish query# 167
pids: [381, 382]
[758523, 578531]--[36705, 19600]
finish query# 168
pids: [260]
[969296]--[57455]
finish query# 

In [8]:
# Execute queries over PAW layout
batch_query(training_set, used_dims, column_name_dict, hdfs_path_paw, querytype, paw_partition_path)

pids: [275]
[655695]--[182396]
finish query# 0
pids: [177]
[506243]--[36240]
finish query# 1
pids: [316]
[837833]--[29626]
finish query# 2
pids: [437]
[860619]--[6856]
finish query# 3
pids: [517]
[506531]--[14582]
finish query# 4
pids: [295]
[499304]--[149909]
finish query# 5
pids: [177]
[506243]--[13491]
finish query# 6
pids: [334]
[588656]--[3201]
finish query# 7
pids: [165]
[617633]--[12852]
finish query# 8
pids: [291]
[725131]--[29996]
finish query# 9
pids: [264, 309]
[713464, 678490]--[6029, 32015]
finish query# 10
pids: [477]
[517523]--[27439]
finish query# 11
pids: [426]
[538330]--[30794]
finish query# 12
pids: [448]
[543447]--[10616]
finish query# 13
pids: [321, 396]
[497567, 595181]--[126805, 51278]
finish query# 14
pids: [508]
[788213]--[28513]
finish query# 15
pids: [470]
[1040340]--[501987]
finish query# 16
pids: [143]
[607690]--[111388]
finish query# 17
pids: [417]
[504366]--[7940]
finish query# 18
pids: [516]
[520591]--[26435]
finish query# 19
pids: [260]
[969296]--[31820

[920215]--[24731]
finish query# 156
pids: [484]
[655752]--[52406]
finish query# 157
pids: [192, 369, 494]
[632235, 605662, 596445]--[55016, 178242, 71246]
finish query# 158
pids: [299]
[681930]--[19725]
finish query# 159
pids: [233, 419]
[861954, 587562]--[114536, 56138]
finish query# 160
pids: [144]
[536358]--[1347]
finish query# 161
pids: [440]
[851271]--[15242]
finish query# 162
pids: [288]
[907988]--[23569]
finish query# 163
pids: [413]
[561728]--[34031]
finish query# 164
pids: [477, 478]
[517523, 512358]--[59251, 22204]
finish query# 165
pids: [396, 397, 399]
[595181, 954920, 503805]--[21216, 40169, 43903]
finish query# 166
pids: [424, 423]
[504975, 609961]--[52768, 78234]
finish query# 167
pids: [393, 394]
[758523, 578531]--[36705, 19600]
finish query# 168
pids: [260]
[969296]--[57455]
finish query# 169
pids: [472, 316, 471]
[507864, 837833, 519901]--[47280, 159319, 67357]
finish query# 170
pids: [453, 454]
[535613, 533821]--[145445, 123307]
finish query# 171
pids: [209]
[773312]

In [9]:
# Execute queries over TORN layout
batch_query(training_set, used_dims, column_name_dict, hdfs_path_nora, querytype, nora_partition_path)

pids: [273]
[664121]--[182396]
finish query# 0
pids: [177]
[631477]--[36240]
finish query# 1
pids: [316]
[761868]--[29626]
finish query# 2
pids: [451]
[829520]--[6856]
finish query# 3
pids: [541]
[506097]--[14582]
finish query# 4
pids: [291]
[499304]--[149909]
finish query# 5
pids: [177]
[631477]--[13491]
finish query# 6
pids: [493]
[501072]--[3201]
finish query# 7
pids: [164]
[617633]--[12852]
finish query# 8
pids: [285]
[634254]--[29996]
finish query# 9
pids: [481]
[506275]--[38044]
finish query# 10
pids: [495]
[565824]--[27439]
finish query# 11
pids: [438]
[538330]--[30794]
finish query# 12
pids: [551]
[495833]--[10616]
finish query# 13
pids: [405]
[784491]--[178083]
finish query# 14
pids: [564]
[580300]--[28513]
finish query# 15
pids: [484]
[965817]--[501987]
finish query# 16
pids: [143]
[746538]--[111388]
finish query# 17
pids: [429]
[504366]--[7940]
finish query# 18
pids: [419]
[830848]--[26435]
finish query# 19
pids: [450]
[636520]--[318203]
finish query# 20
pids: [447]
[653473]

[547039]--[669]
finish query# 173
pids: [250]
[573230]--[11560]
finish query# 174
pids: [379]
[507635]--[22701]
finish query# 175
pids: [415]
[522399]--[139577]
finish query# 176
pids: [409]
[834988]--[33040]
finish query# 177
pids: [497]
[627503]--[465411]
finish query# 178
pids: [403]
[800094]--[313528]
finish query# 179
pids: [463]
[545014]--[197202]
finish query# 180
pids: [519]
[549035]--[45390]
finish query# 181
pids: [471]
[499660]--[368793]
finish query# 182
pids: [436]
[637016]--[78039]
finish query# 183
pids: [283]
[663263]--[204846]
finish query# 184
pids: [549]
[571315]--[138370]
finish query# 185
pids: [421]
[642527]--[21244]
finish query# 186
pids: [496]
[561236]--[106576]
finish query# 187
pids: [411]
[649268]--[695]
finish query# 188
pids: [521]
[494975]--[13206]
finish query# 189
pids: [477]
[543160]--[168603]
finish query# 190
pids: [373]
[626788]--[22518]
finish query# 191
pids: [259]
[500465]--[292951]
finish query# 192
pids: [526]
[502040]--[75990]
finish query# 19

In [10]:
# check number of row groups
# import pyarrow as pa
# fs = pa.fs.HadoopFileSystem('192.168.6.62', port=9000, user='hdfs', replication=1)
# path1 = 'hdfs://192.168.6.62:9000/user/cloudray/NORA/prob1/merged/partition_94.parquet'
# path2 = 'hdfs://192.168.6.62:9000/user/cloudray/KDTree/prob1/merged/partition_98.parquet'
# fw1 = fs.open_input_file(path1)
# meta1 = pa.parquet.read_metadata(fw1, memory_map=False)
# print(meta1)
# print(meta1.row_group(0))
# fw1.close()
#
# fw2 = fs.open_input_file(path2)
# meta2 = pa.parquet.read_metadata(fw2, memory_map=False)
# print(meta2)
# print(meta2.row_group(0))
# fw2.close()

In [11]:
# path1 = 'hdfs://10.77.110.133:9001/par_nora/QdTree/prob2/scale1/merged/partition_359.parquet'
# path2 = 'hdfs://10.77.110.133:9001/par_nora/QdTree/prob2/scale100/merged/partition_172.parquet'
# time0=time.time()
# rep_count=5
# for i in range(rep_count):
#     res=spark.sql(f"select * from parquet.`{path1}`").collect()
#     print(len(res))
# #     res=spark.sql(f"select min(_c1),max(_c1),min(_c2),max(_c2),min(_c3),max(_c3) from parquet.`{path2}`").collect()
# #     print(res)
# time1=time.time()
# for i in range(rep_count):
#     res=spark.sql(f"select * from parquet.`{path2}` where _c1>7579307 and _c1<7600000").collect()
#     print(len(res))
# time2=time.time()
# print(f"{(time1-time0)/rep_count}   {(time2-time1)/rep_count}")
# fw=fs.open(path1,'rb')
# meta = pa.parquet.read_metadata(fw, memory_map=False).to_dict()
# print(meta['num_rows'])

# fw=fs.open(path2,'rb')
# meta = pa.parquet.read_metadata(fw, memory_map=False).to_dict()
# print(meta['num_rows'])