In [None]:
import matplotlib
import matplotlib.pyplot as plt
from datetime import datetime
# from cos_backend import CosBackend

In [None]:
import time
from math import ceil
import yaml, laspy
import lithops
import pandas as pd
import numpy as np
import neilpy
from lithops.storage.cloud_proxy import open, os as os_cl
from lithops import ServerlessExecutor
from lidarpartitioner import lidarutils, utils
from lidarpartitioner.las_partitioner import Partitioner
from lidarpartitioner.lidarutils import *
import lidarpartitioner
from lithops.worker.utils import get_memory_usage
from sklearn.neighbors import KDTree

In [None]:
# datetime object containing current date and time
now = datetime.now()

# dd/mm/YY H:M:S
dt_string = now.strftime("%d/%m/%Y %H:%M:%S")

datet = dt_string.replace(":", "_").replace(" ","_").replace("/","_")

In [None]:
def partition_file(fname, out_dir, num_points, sufix, buffer):
    divider = Partitioner(fname, sufix)
    m_threshold = num_points
    divider.make_partition(out_dir, capacity=m_threshold, buffer=buffer)

In [None]:
def outlier_filter(id, file, smr_folder, buffer, call_time):
    
    st_map_process = time.time()
    Allprocessing_time = dict()
    Allprocessing_time['Object_name'] = file.split('/')[-1]
    Allprocessing_time['Map_ID'] = id
    Allprocessing_time['Buffer'] = buffer
    Allprocessing_time['Time_Mapstart'] = round(st_map_process - call_time, 3)
    print("ID of the function: %s" % str(id))
    
    
    # read data stream
    st_read = time.time()
    with open(file, 'rb') as f:
        inF = laspy.read(f)
    point_cloud = np.vstack((inF.x, inF.y, inF.z, inF.withheld)).T
    download_data = time.time() - st_read
    Allprocessing_time['Downloading_Time'] = round(download_data, 3) 
    
    
    # Start applying outliers filter (radius method)
    # https://pdal.io/PDAL.pdf
    st_filter = time.time()
    radius = 2
    min_k = 4
    out_bucket = 'smrfresult'
    print(len(point_cloud))
    try:
        tree = KDTree(point_cloud[:,:3], leaf_size=40)
        b = point_cloud[:,3] == 0
        print(set(b), len(b))
        p_ind = np.where(b)
        k = tree.query_radius(point_cloud[b][:,:3], r=radius, count_only=True)
        outlier = np.asarray(list(map(lambda x: x < min_k, k)))
        not_outlier = list(~np.array(outlier))
        indices = np.asarray(p_ind[0])[not_outlier]
        filtering_time = time.time() - st_filter
        
        st_wres = time.time()
        fname, data = lidarutils.writer_lasfile(inF, indices, file, out_bucket, out_dir=smr_folder,
                                                 reduce_stream = True, is_bool = False)
        en_wres = time.time() - st_wres
    except Exception as e:
        print('Failed!!. Reason: %s' %(e))
    
    Allprocessing_time['Filtering_time'] = round(filtering_time, 3)
    Allprocessing_time['Save_points'] = round(en_wres, 3)
    Allprocessing_time['Maping_time'] = round(time.time() - st_map_process, 3)
    Allprocessing_time['sendResult_time'] = time.time()
    
    return Allprocessing_time, data


def merg_res(results):

    st_reducer = time.time()
    pr_time = dict()
    merging = dict()
    Filtering_time, RWData_time, Maping_time, sendResult_time  = [], [], [], []
    Save_points, PeriodTime_Mapstart = [], []
    num_part = len(results)
    buffer = results[0][0]['Buffer']
    print(num_part)
    
    res_mapdata = []
    for res in results:
        obj = dict()
        RWData_time.append(res[0]['Downloading_Time'])
        Filtering_time.append(res[0]['Filtering_time'])
        Save_points.append(res[0]['Save_points'])
        Maping_time.append(res[0]['Maping_time'])
        PeriodTime_Mapstart.append(res[0]['Time_Mapstart'])
        sendResult_time.append(st_reducer - res[0]['sendResult_time'])
        obj['Object_name'] = res[0]['Object_name']
        obj['data_stream'] = res[1]
        res_mapdata.append(obj)
    del results
    
    # Start merging all the results
    st_merg = time.time()
    m_res = merg_files(res_mapdata, num_part, buffer)
    
    merging_time = time.time() - st_merg
    merging['Merging_time'] = merging_time
    
    pr_time['Funcs_numbers'] = num_part
    pr_time['File_name'] = m_res['File_name']
    pr_time['Points_number'] = m_res['Points_number']
    pr_time['Time_Mapstart'] = min(PeriodTime_Mapstart)
    pr_time['Time_Redstart'] = round(min(sendResult_time), 3)
    pr_time['Downloading_Time'] = max(RWData_time)
    pr_time['Filtering_time'] = max(Filtering_time)
    pr_time['Save_points'] = max(Save_points)
    pr_time['Maping_time'] = max(Maping_time)
    pr_time['Saving_data_time'] = m_res['Saving_data_time']
    pr_time['Merging_files_time'] = m_res['Merging_files_time']
    pr_time['Merged_uploading_time'] = m_res['Merged_uploading_time']
    pr_time['Merging_process'] = m_res['Merging_process']
    pr_time['Reduction_time'] = round(time.time() - st_reducer, 3)
    
    return pr_time

In [None]:
%%time
processing_time = []
data_folder = 'data'
chip_folder = 'chipres'
res_map_folder = 'removed_outliers'
res_red_folder = 'merged_files'
fname = data_folder + '/' + "lasfile.las"
#"Coloreado (RGB) 2016 - PNOA-2016-CAT-352-4556-ORT-CLA-COL.las" "lasfile.las"
# "Coloreado (RGB) 2016 - PNOA-2016-CAT-352-4554-ORT-CLA-COL.las"
# "Coloreado (RGB) 2016 - PNOA-2016-CAT-338-4556-ORT-CLA-COL.las"
folders = [chip_folder, res_map_folder]

for i in range(3):
    
    
    if i > 0:
        
        # Start partitioning
        # Clean folder in bucket if exists, if not create it
        [lidarutils.clean_folder(folder) for folder in folders]
    
        # Set variables and start partitioning
        partitions = 2**i
        buffer = 0
        num_points = ceil(343306/partitions) #5492898 3791954 1373224 343306
        print(num_points)
        
        partition_args = [(fname, chip_folder, num_points, i, buffer) for i in range(1)]
        st_split = time.time()
        with ServerlessExecutor(runtime_memory=3072) as exec: #runtime_memory=3072
            exec.map(partition_file, partition_args)
            exec.get_result()
        end_split = time.time() - st_split
        print(f'Total partition time: {end_split} s')
    
    # Processing files resulting from the partition stage 
    st_processing = time.time()
    if i == 0:
        map_runtime_memory = 2048
    elif i > 4:
        map_runtime_memory = 512
    else:
        map_runtime_memory = 1024
        
    files = os_cl.listdir(chip_folder)
    process_args = [(chip_folder + '/' + file, res_map_folder, buffer, st_processing) for file in files] if i > 0 \
                    else [(fname, res_map_folder, 0, st_processing)]
    with ServerlessExecutor(runtime="ammarokran/new-smrf-conda36:1.0.6") as exec:
        exec.map_reduce(outlier_filter, process_args, merg_res, map_runtime_memory=map_runtime_memory,
                        include_modules=['lidarutils', 'lidarpartitioner', 'utils'])
        st_get = time.time()
        res = exec.get_result()
        end_get = time.time() - st_get
        # fexec.clean()
    end_processing = time.time() - st_processing
    res['GettingResult_time'] = round(end_get, 3)
    res['Processing_time'] = round(end_processing, 3)
    processing_time.append(res)
    print(f'Total processing time: {end_processing} s')

In [None]:
# SpeedUp 
par_times = []
df = pd.DataFrame(processing_time)
seq_time = df.iloc[0]['Processing_time']

par_times = [df.iloc[i]['Processing_time'] for i in range(1, len(df))]
SpUp = [str(round((seq_time/v), 5))+'x' for v in par_times]
SpUp.insert(0, str(0) + ' (base)')
# # print(seq_time, diff)
print('SpeedUp %s' % (SpUp))
df_spup = pd.DataFrame(SpUp, columns=['SpeedUp'])
# df_spup

In [None]:
df_spup['Funcs_num'] = df['Funcs_numbers']
df_spup['Processing_time'] = df['Processing_time']
df_spup = df_spup[['Funcs_num', 'Processing_time', 'SpeedUp']]

In [None]:
df_spup

In [None]:
df2 = pd.DataFrame(processing_time)
A = list(df2['Processing_time'])

ind = np.arange(len(A))    # the x locations for the groups
width = 0.7       # the width of the bars: can also be len(x) sequence

p1 = plt.bar(ind, A, edgecolor='white', width=width)

plt.grid(axis='y', alpha=0.5, ls='--', zorder=0)
plt.xlabel('Number of Functions')
plt.ylabel('Time by Second (s)')
# plt.title('The processing time by using Lithops framework')
plt.xticks(ind, list(df2['Funcs_numbers']))
plt.savefig('./figures_outlier/Processing Time.png', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
df2['Proc_time'] = df2['Filtering_time'] + df2['Save_points']

In [None]:
A = list(df2['Downloading_Time'])
B = list(df2['Proc_time'])
C = list(df2['Time_Redstart'])
D = list(df2['Reduction_time'])
# E = list(df2['Reduction_time'])

bar1 = np.add(A, B).tolist()
bar2 = np.add(A, np.add(B, C)).tolist()
# bar3 = np.add(np.add(A, B), np.add(C, D)).tolist()
ind = np.arange(len(B)) 
width = 0.7

p1 = plt.bar(ind, A, edgecolor='white', width=width, color='purple')
p2 = plt.bar(ind, B, edgecolor='white', width=width,
             bottom=A)
p3 = plt.bar(ind, C, edgecolor='white', width=width,
             bottom=bar1, color='grey')
p4 = plt.bar(ind, D, edgecolor='white', width=width,
             bottom=bar2)
# p5 = plt.bar(ind, E, edgecolor='white', width=width,
#              bottom=bar3)
plt.grid(axis='y', alpha=0.5, ls='--', zorder=0)
plt.xlabel('Number of Functions')
plt.ylabel('Time by Second (s)')
# plt.title('Approach #1: Time of Processing at a time %s'% (dt_string))
plt.xticks(ind, list(df2['Funcs_numbers']))
plt.legend((p1[0], p2[0], p3[0], p4[0]), ('Downloading Time(C1)', 'Processing Time(C2)', 'Obtaining Results Time(C3)', 
                                   'Reduction Time(C4)'), loc='upper left', bbox_to_anchor=(0.41, 1.02))
plt.savefig('./figures_outlier/Processing Time plot at time %s.png'% (datet), dpi=300, bbox_inches='tight')
plt.show()