In [None]:
import pandas as pd
import pyarrow.parquet as pq
import sys
import numpy as np
import time
import random
import pickle
import matplotlib.pyplot as plt

from lsm_tree.GenOne.lsm_tree_original_test import LSMTree
from lsm_tree.GenOne.lsm_tree_cuckoo_filter_mem_test import LSMTreeCuckoo
from lsm_tree.GenOne.lsm_tree_bloom_filter_mem_test import LSMTreeBloom
from tqdm import tqdm
from statistics import stdev, mean

## Loading Data

In [None]:
df_parquet = pq.read_table('Data/2019-01-01_performance_fixed_tiles.parquet').to_pandas()

In [None]:
SEGMENTS_DIRECTORY_CKF = 'segments/cuckoo/'
SEGMENT_BASENAME_CKF = 'LSMTreeCuckoo'
WAL_BASENAME_CKF = 'wal_file_cuckoo'

SEGMENTS_DIRECTORY_BF = 'segments/bloom/'
SEGMENT_BASENAME_BF = 'LSMTreeBloom'
WAL_BASENAME_BF = 'wal_file_bloom'

size = sys.getsizeof(df_parquet[['quadkey','avg_d_kbps']])/1000000
print('Whole DataFrame      : ',size , len(df_parquet))

Whole DataFrame      :  395.040068 4877036


In [None]:
df_parquet['hashed_vals'] = df_parquet['tile'].apply(lambda x: abs(hash(x)))
df_parquet.head()

Unnamed: 0,quadkey,tile,avg_d_kbps,avg_u_kbps,avg_lat_ms,tests,devices,hashed_vals
0,231113112003202,"POLYGON((-90.6591796875 38.4922941923613, -90....",66216,12490,13,28,4,6628451788224452118
1,1322111021111001,"POLYGON((110.352172851562 21.2893743558604, 11...",102598,37356,13,15,4,6338773450739873531
2,3112203030003110,"POLYGON((138.592529296875 -34.9219710361638, 1...",24686,18736,18,162,106,6510551075776564950
3,320000130321312,"POLYGON((-87.637939453125 40.225024210605, -87...",17674,13989,78,364,4,334763833012859542
4,320001332313103,"POLYGON((-84.7430419921875 38.9209554204673, -...",441192,218955,22,14,1,8652878199503431291


In [None]:
cardinality = df_parquet['hashed_vals'].nunique()/len(df_parquet)
cardinality

1.0

## LSM Tree Testing Functions 

In [None]:
def insert_testing(lsm, df):
    start_time = time.time()
    for num, row in enumerate(df.iterrows()):   
        lsm.db_set(str(row[1][-1]), str(row[1][1]))   
    end_time = time.time()
    insertion_time = end_time - start_time
    return insertion_time

def query_testing(lsm, df, sample_num):
    random_samples = df.sample(n=sample_num)
    start_time = time.time()
    for row in random_samples.iterrows():   
        lsm.db_get(str(row[1][-1]))   
    end_time = time.time()
    query_time = end_time - start_time
    return query_time

def delete_testing(lsm, df, sample_num):
    random_samples = df.sample(n=sample_num)
    start_time = time.time()
    for row in random_samples.iterrows():   
        lsm.db_del(str(row[1][-1]))   
    end_time = time.time()
    deletion_time = end_time - start_time
    return deletion_time, random_samples

def query_testing_2(lsm, df):
    random_samples = df
    start_time = time.time()
    for row in random_samples.iterrows():   
        lsm.db_get(str(row[1][-1]))   
    end_time = time.time()
    query_time = end_time - start_time
    return query_time

def reset_lsm(batch_size, lvl_list, fpp, time_th):
    
    !rm segments/cuckoo/*
    !rm segments/bloom/*
    !rm segments/lsm_original/*
    
    lsm_ckf = LSMTreeCuckoo(SEGMENT_BASENAME_CKF, SEGMENTS_DIRECTORY_CKF, WAL_BASENAME_CKF)
    lsm_ckf.set_size_threshold(batch_size)
    lsm_ckf.set_ckf_fpp(fpp)
    lsm_ckf.set_time_threshold(time_th)
    lsm_ckf.set_levels_threshold(lvl_list[0], lvl_list[1])

    lsm_bf = LSMTreeBloom(SEGMENT_BASENAME_BF, SEGMENTS_DIRECTORY_BF, WAL_BASENAME_BF)
    lsm_bf.set_size_threshold(batch_size)
    lsm_bf.set_bf_fpp(fpp)
    lsm_bf.set_time_threshold(time_th)
    lsm_bf.set_levels_threshold(lvl_list[0], lvl_list[1])
    
    lsm_original = LSMTree()
    lsm_original.set_time_threshold(time_th)
    
    return lsm_ckf, lsm_bf, lsm_original

# Query, Deletion Testing

## Test Configurations

In [None]:
# insert_range        = [200000,  300000, 400000,  500000,  600000,  700000]
# query_del_test_list = [5000,     10000,  15000,   20000,   25000,   30000] 
# query_del_ranges    = ['5k',     '10k',  '15k',   '20k',   '25k',   '30k'] 
# batch_size          = [5000,    50000,  50000,   50000,   50000,   50000] 
# lvl_list            = [[10,20],[10,20],[10,20], [10,20], [10,20], [10,20]]


insert_range        = [100000,  150000, 200000, 250000,  300000,   350000]
query_del_test_list = [10000,     15000,  20000,  25000,   30000,   35000] 
query_del_ranges    = ['10k',     '15k',  '20k',  '25k',   '30k',   '35k'] 
batch_size          = [25000,     30000,  35000,  40000,   45000,   50000] 
lvl_list            = [[10,20],[20,40],[40,80],[50,100],[60,120],[70,140]]

In [None]:
fpp = 0.1
df = df_parquet
time_th = 0.05

insertion_results = {}
query_bef_results = {}
deletion_results  = {}
query_aft_results = {}

## Test Conduction (Run ONLY if Needed)

In [9]:
for insert_num in tqdm(insert_range[:3]):
    _, _, lsm_original = reset_lsm(batch_size[insert_range.index(insert_num)], lvl_list[insert_range.index(insert_num)], fpp, time_th)
    test_numb = insert_range.index(insert_num)
    sample_num = query_del_test_list[test_numb]
    data = df_parquet.iloc[:insert_num]
    key = str(insert_num)
    
    #Insertion Testing
    print(f"Starting Insertion Test #{test_numb}")
    orgl_insertion_time = insert_testing(lsm_original, data)

    #Query Before Testing
    print(f"Starting Query Before Test #{test_numb}")
    orgl_befquery_time = query_testing(lsm_original, data, sample_num)

    #Deletion Testing
    print(f"Starting Deletion Test #{test_numb}")
    orgl_deletion_time, random_samples = delete_testing(lsm_original, data, sample_num)
    orgl_deletion_time = orgl_deletion_time
    
    #Query After Testing
    print(f"Starting Query After Test #{test_numb}")
    orgl_aftquery_time = query_testing_2(lsm_original, random_samples)
    
    #Result Organization
    insertion_results[key] = (orgl_insertion_time, )
    query_bef_results[key] = (orgl_befquery_time, )
    deletion_results[key]  = (orgl_deletion_time, )
    query_aft_results[key] = (orgl_aftquery_time, )

  0%|          | 0/6 [00:00<?, ?it/s]

Starting Insertion Test #0
Starting Query Before Test #0
Starting Deletion Test #0
Starting Query After Test #0


 17%|█▋        | 1/6 [05:19<26:39, 319.82s/it]

rm: cannot remove 'segments/cuckoo/*': No such file or directory
rm: cannot remove 'segments/bloom/*': No such file or directory
Starting Insertion Test #1
Starting Query Before Test #1
Starting Deletion Test #1
Starting Query After Test #1


 33%|███▎      | 2/6 [20:42<44:58, 674.65s/it]

rm: cannot remove 'segments/cuckoo/*': No such file or directory
rm: cannot remove 'segments/bloom/*': No such file or directory
Starting Insertion Test #2
Starting Query Before Test #2
Starting Deletion Test #2
Starting Query After Test #2


 50%|█████     | 3/6 [50:40<59:22, 1187.46s/it]

rm: cannot remove 'segments/cuckoo/*': No such file or directory
rm: cannot remove 'segments/bloom/*': No such file or directory
Starting Insertion Test #3
Starting Query Before Test #3
Starting Deletion Test #3
Starting Query After Test #3


 67%|██████▋   | 4/6 [1:40:30<1:03:18, 1899.08s/it]

rm: cannot remove 'segments/cuckoo/*': No such file or directory
rm: cannot remove 'segments/bloom/*': No such file or directory
Starting Insertion Test #4
Starting Query Before Test #4
Starting Deletion Test #4
Starting Query After Test #4


 83%|████████▎ | 5/6 [2:55:47<47:23, 2843.10s/it]  

rm: cannot remove 'segments/cuckoo/*': No such file or directory
rm: cannot remove 'segments/bloom/*': No such file or directory
Starting Insertion Test #5
Starting Query Before Test #5
Starting Deletion Test #5
Starting Query After Test #5


100%|██████████| 6/6 [4:44:16<00:00, 2842.72s/it]


In [None]:
for insert_num in tqdm(insert_range[:]):
    
    lsm_ckf , lsm_bf, _ = reset_lsm(batch_size[insert_range.index(insert_num)], lvl_list[insert_range.index(insert_num)], fpp, time_th)
    test_numb = insert_range.index(insert_num)
    sample_num = query_del_test_list[test_numb]
    data = df_parquet.iloc[:insert_num]
    key = str(insert_num)
    
    #Insertion Testing
    print(f"Starting Insertion Test #{test_numb}")
    bf_insertion_time  = insert_testing(lsm_bf      , data)
    ckf_insertion_time = insert_testing(lsm_ckf     , data)

    #Query Before Testing
    print(f"Starting Query Before Test #{test_numb}")
    bf_befquery_time = query_testing(lsm_bf      , data, sample_num)
    ckf_befquery_time = query_testing(lsm_ckf     , data, sample_num)

    #Deletion Testing
    print(f"Starting Deletion Test #{test_numb}")
    bf_deletion_time, random_samples  = delete_testing(lsm_bf      , data, sample_num)
    ckf_deletion_time, random_samples = delete_testing(lsm_ckf     , data, sample_num)
    bf_deletion_time = bf_deletion_time
    ckf_deletion_time = ckf_deletion_time
    
    #Query After Testing
    print(f"Starting Query After Test #{test_numb}")
    bf_aftquery_time  = query_testing_2(lsm_bf      , random_samples)
    ckf_aftquery_time = query_testing_2(lsm_ckf     , random_samples)
    
    #Result Organization
    insertion_results[key] = (bf_insertion_time, ckf_insertion_time) + insertion_results[key]
    query_bef_results[key] = (bf_befquery_time ,  ckf_befquery_time) + query_bef_results[key]
    deletion_results[key]  = (bf_deletion_time ,  ckf_deletion_time) + deletion_results[key]
    query_aft_results[key] = (bf_aftquery_time ,  ckf_aftquery_time) + query_aft_results[key]

  0%|          | 0/6 [00:00<?, ?it/s]

rm: cannot remove 'segments/cuckoo/*': No such file or directory
rm: cannot remove 'segments/bloom/*': No such file or directory
Starting Insertion Test #0
Starting Query Before Test #0
Starting Deletion Test #0
Starting Query After Test #0


 17%|█▋        | 1/6 [05:47<28:55, 347.18s/it]

Starting Insertion Test #1
Starting Query Before Test #1
Starting Deletion Test #1
Starting Query After Test #1


 33%|███▎      | 2/6 [16:50<35:33, 533.35s/it]

Starting Insertion Test #2
Starting Query Before Test #2
Starting Deletion Test #2
Starting Query After Test #2


 50%|█████     | 3/6 [36:00<40:43, 814.65s/it]

Starting Insertion Test #3
Starting Query Before Test #3
Starting Deletion Test #3
Starting Query After Test #3


 67%|██████▋   | 4/6 [1:06:34<40:34, 1217.37s/it]

Starting Insertion Test #4
Starting Query Before Test #4
Starting Deletion Test #4
Starting Query After Test #4


 83%|████████▎ | 5/6 [1:46:07<27:13, 1633.95s/it]

Starting Insertion Test #5
Starting Query Before Test #5
Starting Deletion Test #5
Starting Query After Test #5


100%|██████████| 6/6 [2:37:05<00:00, 1570.93s/it]


# Insertion Testion 

## Test Configurations 

In [7]:
fpp = 0.001
df = df_parquet
time_th = 0.05

insert_ranges       = [25000  ,  50000,  100000,  200000,   400000,   800000,  1600000] #,  3200000]
insert_ranges_ticks = ['25k'  ,'50k'  ,'100k'  ,'200k'  ,'400k'   , '800k'  ,  '1600k'] #  ,  '3200k']
batch_size          = [2500   ,   5000,   10000,   20000,    40000,    80000,   160000] # ,   360000] 
lvl_list            = [[20,40],[40,80],[60,120],[80,160],[100,200],[120,240],[140,300]] # ,[150,310]]

len(insert_ranges), len(insert_ranges_ticks), len(batch_size), len(lvl_list)

(7, 7, 7, 7)

## Running The Test 

In [10]:
inserts_results = {}

for batch_num, insert_num, lvl in tqdm(zip(batch_size, insert_ranges, lvl_list)):
    
    _, _, lsm_original = reset_lsm(batch_num, lvl, fpp, time_th)
    
    print(f'{insert_num} Insertion Test Starts!!')
    insertion_org = insert_testing(lsm_original, df.iloc[:insert_num])
    inserts_results[str(insert_num)] = (insertion_org,)

0it [00:00, ?it/s]

rm: cannot remove 'segments/cuckoo/*': No such file or directory
rm: cannot remove 'segments/bloom/*': No such file or directory
25000 Insertion Test Starts!!


1it [00:02,  2.75s/it]

rm: cannot remove 'segments/cuckoo/*': No such file or directory
rm: cannot remove 'segments/bloom/*': No such file or directory
rm: cannot remove 'segments/lsm_original/*': No such file or directory
50000 Insertion Test Starts!!


2it [00:07,  3.92s/it]

rm: cannot remove 'segments/cuckoo/*': No such file or directory
rm: cannot remove 'segments/bloom/*': No such file or directory
rm: cannot remove 'segments/lsm_original/*': No such file or directory
100000 Insertion Test Starts!!


3it [00:16,  6.12s/it]

rm: cannot remove 'segments/cuckoo/*': No such file or directory
rm: cannot remove 'segments/bloom/*': No such file or directory
200000 Insertion Test Starts!!


4it [00:32, 10.32s/it]

rm: cannot remove 'segments/cuckoo/*': No such file or directory
rm: cannot remove 'segments/bloom/*': No such file or directory
400000 Insertion Test Starts!!


5it [01:05, 18.46s/it]

rm: cannot remove 'segments/cuckoo/*': No such file or directory
rm: cannot remove 'segments/bloom/*': No such file or directory
800000 Insertion Test Starts!!


6it [02:10, 34.32s/it]

rm: cannot remove 'segments/cuckoo/*': No such file or directory
rm: cannot remove 'segments/bloom/*': No such file or directory
1600000 Insertion Test Starts!!


7it [04:20, 37.19s/it]


In [11]:
inserts_results

{'25000': (1.863973617553711,),
 '50000': (3.925391674041748,),
 '100000': (7.90558385848999,),
 '200000': (15.935945749282837,),
 '400000': (32.04067420959473,),
 '800000': (64.25781059265137,),
 '1600000': (128.47691226005554,)}

In [None]:
with open('insert_results_org', 'wb') as b:
    pickle.dump(insertion_results, b)

In [None]:
inserts_results = {}

for batch_num, insert_num, lvl in tqdm(zip(batch_size, insert_ranges, lvl_list)):
    
    lsm_ckf , lsm_bf, _ = reset_lsm(batch_num, lvl, fpp, time_th)
    
    print(f'{insert_num} Insertion Test Starts!!')
    insertion_bf = insert_testing(lsm_bf, df.iloc[:insert_num])
    inserts_results[str(insert_num)] = (insertion_bf,)
    insertion_ckf = insert_testing(lsm_ckf, df.iloc[:insert_num])
    inserts_results[str(insert_num)] += (insertion_ckf,)

In [None]:
with open('insert_results_bfckf', 'wb') as b:
    pickle.dump(insertion_results, b)

## Test Data Vizualization

### To save data

In [9]:
# with open('insert_results_org', 'wb') as b:
#     pickle.dump(insertion_results, b)

# with open('query_bef_results_bckor_10', 'wb') as b:
#     pickle.dump(query_bef_results, b)
    
# with open('deletion_results_bckor_10', 'wb') as b:
#     pickle.dump(deletion_results, b)
    
# with open('query_aft_results_bckor_10', 'wb') as b:
#     pickle.dump(query_aft_results, b)