# Vaex Run

In [1]:
import os
import vaex as vx
import time
import numpy as np
import shutil
import gc
import joblib

In [4]:
PATH_DATA = '../../df-showdown-data/'
TEMP_PATH = PATH_DATA+'temp/'
PATH_OUTPUT = '../output/'

SLA_NAME = 'vaex-sla.pkl'

In [5]:
all_sla = {}

##     Task1 & Task2- I/O

In [6]:
all_sla['task1'] = {}
all_sla['task1']['csv'] = {}
all_sla['task1']['prq'] = {}

all_sla['task2'] = {}
all_sla['task2']['csv'] = {}
all_sla['task2']['prq'] = {}


In [7]:
for i in range(9):
    fn = 'data_{}.csv'.format(i)
    start = time.perf_counter()
    dtemp1 = vx.open(PATH_DATA+fn)
    end = time.perf_counter() - start
    nrows = str(int(dtemp1.shape[0]/1000000))
    all_sla['task1']['csv'][nrows]=end

    start = time.perf_counter()
    dtemp1.export(TEMP_PATH + 'temp.csv',index=False)
    end = time.perf_counter() - start
    all_sla['task2']['csv'][nrows]=end

    del dtemp1
    gc.collect()
    os.remove(TEMP_PATH + 'temp.csv')
    time.sleep(2)

    fn = 'data_{}.parquet'.format(i)
    start = time.perf_counter()
    dtemp1 = vx.open(PATH_DATA+fn)
    end = time.perf_counter() - start
    nrows = str(int(dtemp1.shape[0]/1000000))
    all_sla['task1']['prq'][nrows]=end
    

    start = time.perf_counter()
    dtemp1.export(TEMP_PATH + 'temp.parquet',index=False)
    end = time.perf_counter() - start
    all_sla['task2']['prq'][nrows]=end

    del dtemp1
    gc.collect()
    os.remove(TEMP_PATH + 'temp.parquet')

In [None]:
all_sla

{'task1': {'csv': {'0': 0.2829878000000008,
   '1': 2.218104699999998,
   '3': 4.497733400000001,
   '9': 10.768187100000006,
   '19': 21.651349699999997,
   '34': 40.695483800000034,
   '45': 50.86708269999997,
   '58': 66.61110070000007,
   '71': 85.8427071000001},
  'prq': {'0': 0.1353975000000105,
   '1': 0.7144555999999938,
   '3': 1.2158343000000116,
   '9': 4.565950799999996,
   '19': 7.757134000000008,
   '34': 13.594654099999957,
   '45': 18.150944200000026,
   '58': 22.409480099999996,
   '71': 28.027638700000125}},
 'task2': {'csv': {'0': 0.48640100000000075,
   '1': 4.362415499999997,
   '3': 8.332460199999986,
   '9': 24.607668099999984,
   '19': 49.68972739999998,
   '34': 86.75615580000004,
   '45': 111.79567409999993,
   '58': 145.78824739999993,
   '71': 183.58592249999992},
  'prq': {'0': 0.1621298999999965,
   '1': 1.707086600000011,
   '3': 2.352528299999989,
   '9': 6.430279799999994,
   '19': 12.849858099999949,
   '34': 23.022432500000036,
   '45': 30.08414089999

## Get Data for other task

In [10]:
all_df = [vx.open(PATH_DATA+'data_{}.parquet'.format(i)) for i in [2,3,4,5,6]]
df_right = vx.open(PATH_DATA+'data_to_join.parquet')

## Task3 - Sorting

In [11]:
n_task = 'task3'
all_sla[n_task] = {}

In [12]:
for dtemp1 in all_df:
    start = time.perf_counter()
    dtemp2 = dtemp1.sort_values(['Date','Amount'],ascending=[True,False])
    end = time.perf_counter() - start
    nrows = str(int(dtemp1.shape[0]/1000000))
    all_sla[n_task][nrows]=end

    del dtemp1
    del dtemp2
    gc.collect()
    time.sleep(1)


In [13]:
all_sla[n_task]

{'3': 0.63470099999995,
 '9': 2.022421900000154,
 '19': 4.136806799999931,
 '34': 10.888667199999873,
 '45': 16.219327799999974}

## Task4 - Filtering

In [14]:
n_task = 'task4'
all_sla[n_task] = {}

In [15]:
# dtemp1 = all_df[0]
filt1 = ['Shipped - Delivered to Buyer', 'Cancelled']
for dtemp1 in all_df:
    start = time.perf_counter()
    dtemp2 = dtemp1[(dtemp1['Amount']>300) & (~dtemp1['Status'].isin(filt1))]
    end = time.perf_counter() - start
    nrows = str(int(dtemp1.shape[0]/1000000))
    all_sla[n_task][nrows]=end

    del dtemp1
    del dtemp2
    gc.collect()
    time.sleep(1)


In [16]:
all_sla[n_task]

{'3': 0.21421140000006744,
 '9': 0.5953160999999909,
 '19': 1.2111443000001145,
 '34': 2.2449423999999,
 '45': 2.9474488999999267}

## Task5 - Merging

In [35]:
n_task = 'task5'
all_sla[n_task] = {}

In [36]:
for dtemp1 in all_df:
    start = time.perf_counter()
    dtemp2 = pd.merge(dtemp1,df_right,on=['Date','ship-service-level'],how='left')
    end = time.perf_counter() - start
    nrows = str(int(dtemp1.shape[0]/1000000))
    all_sla[n_task][nrows]=end

    del dtemp1
    del dtemp2
    gc.collect()
    time.sleep(1)

In [37]:
all_sla[n_task]

{'3': 0.5124172000000726,
 '9': 1.4778619999997318,
 '19': 2.9784222000002956,
 '34': 5.352248900000177,
 '45': 7.045791199999712}

## Task6 - udf apply

In [38]:
n_task = 'task6'
all_sla[n_task] = {}

In [39]:
def fun(x):
    x0 = x + 1
    for i in range(25):
        if x0<800:
            x0 += i
            x0 = (x0/3.0)*2.5
            x0 = x0*1.2
        else:
            x0 += i/2.0
            x0 = (x0/4.0)*3.8

    return x0

In [40]:
# dtemp1=all_df[0]
# dtemp1['Amount2'] = dtemp1['Amount'].apply(fun)
for dtemp1 in all_df:
    start = time.perf_counter()
    dtemp1['Amount2'] = dtemp1['Amount'].apply(fun)
    end = time.perf_counter() - start
    nrows = str(int(dtemp1.shape[0]/1000000))
    all_sla[n_task][nrows]=end
    
    del dtemp1
    gc.collect()
    time.sleep(1)

In [41]:
all_sla[n_task]

{'3': 15.072074999999586,
 '9': 43.90379790000043,
 '19': 86.94100830000025,
 '34': 156.61809700000003,
 '45': 202.96429179999996}

## Task7 - aggregation

In [42]:
n_task = 'task7'
all_sla[n_task] = {}

In [43]:
def p25(x):
    return np.percentile(x,25)

def p75(x):
    return np.percentile(x,75)

In [44]:
# dtemp1=all_df[0]
for dtemp1 in all_df:
    start = time.perf_counter()
    dtemp2 = dtemp1.groupby(['Date','Status']).agg({'Amount':[np.mean, np.size, p25, p75]})
    end = time.perf_counter() - start
    nrows = str(int(dtemp1.shape[0]/1000000))
    all_sla[n_task][nrows]=end

    del dtemp1
    del dtemp2
    gc.collect()
    time.sleep(1)

In [45]:
all_sla[n_task]

{'3': 0.787891700000273,
 '9': 2.2168736999997236,
 '19': 4.148283800000172,
 '34': 7.475733299999774,
 '45': 9.774048400000083}

## Save results

In [46]:
joblib.dump(all_sla,PATH_OUTPUT+SLA_NAME)

['../output/pandas-sla.pkl']

In [47]:
all_sla

{'task1': {'csv': {'0': 0.2829878000000008,
   '1': 2.218104699999998,
   '3': 4.497733400000001,
   '9': 10.768187100000006,
   '19': 21.651349699999997,
   '34': 40.695483800000034,
   '45': 50.86708269999997,
   '58': 66.61110070000007,
   '71': 85.8427071000001},
  'prq': {'0': 0.1353975000000105,
   '1': 0.7144555999999938,
   '3': 1.2158343000000116,
   '9': 4.565950799999996,
   '19': 7.757134000000008,
   '34': 13.594654099999957,
   '45': 18.150944200000026,
   '58': 22.409480099999996,
   '71': 28.027638700000125}},
 'task2': {'csv': {'0': 0.48640100000000075,
   '1': 4.362415499999997,
   '3': 8.332460199999986,
   '9': 24.607668099999984,
   '19': 49.68972739999998,
   '34': 86.75615580000004,
   '45': 111.79567409999993,
   '58': 145.78824739999993,
   '71': 183.58592249999992},
  'prq': {'0': 0.1621298999999965,
   '1': 1.707086600000011,
   '3': 2.352528299999989,
   '9': 6.430279799999994,
   '19': 12.849858099999949,
   '34': 23.022432500000036,
   '45': 30.08414089999

In [52]:
# all_sla['task7']