# Experiment 1

## Setup

* Processor: Intel(R) Core(TM) i5-8250U CPU @ 1.60GHz 1.80 GHz
* Number of cores: 4
* Memory: 16.0 GB

## Dataset

* The r/place Parquet dataset - 12GB (22GB uncompressed) - /datas/2022_place_deephaven.parquet

## Tests

* Reading in the file, speed comparison
* Compute metrics of a column: (mean, std)
* Finding the unique value of a column
* Cumulative sum of a column
* Groupby Aggregation

## Helper Functions

In [3]:
import time
import matplotlib.pyplot as plt

In [4]:
inputpath = '../datas/2022_place_deephaven.parquet'

## Baseline: Pandas

In [5]:
import pandas as pd

In [6]:
start_time = time.time()
df = pd.read_parquet(inputpath)
print(f"1. Pandas - Reading exec time: {time.time() - start_time}")

ImportError: Unable to find a usable engine; tried using: 'pyarrow', 'fastparquet'.
A suitable version of pyarrow or fastparquet is required for parquet support.
Trying to import the above resulted in these errors:
 - Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.
 - Missing optional dependency 'fastparquet'. fastparquet is required for parquet support. Use pip or conda to install fastparquet.

In [13]:
df.shape

(160353104, 7)

In [14]:
df.head()

Unnamed: 0,timestamp,user_id,rgb,x1,y1,x2,y2
0,2022-04-01 12:44:10.315000+00:00,4068945,8318294,42,42,-32768,-32768
1,2022-04-01 12:44:22.671000+00:00,4068946,41832,999,999,-32768,-32768
2,2022-04-01 12:44:26.626000+00:00,4068947,3576042,44,42,-32768,-32768
3,2022-04-01 12:44:31.703000+00:00,4068948,13948889,2,2,-32768,-32768
4,2022-04-01 12:44:44.409000+00:00,4068949,3576042,23,23,-32768,-32768


In [16]:
start_time = time.time()
_ = df['x1'].mean()
print(f"2. Pandas - Mean exec time: {time.time() - start_time}")

2. Pandas - Mean exec time: 0.4100003242492676


In [17]:
start_time = time.time()
_ = df['x1'].std()
print(f"3. Pandas - Std exec time: {time.time() - start_time}")

3. Pandas - Std exec time: 2.5820040702819824


In [18]:
start_time = time.time()
_ = df['user_id'].unique()
print(f"4. Pandas - Unique exec time: {time.time() - start_time}")

4. Pandas - Unique exec time: 16.425997495651245


In [19]:
start_time = time.time()
_ = df['y1'].cumsum()
print(f"5. Pandas - Cumsum exec time: {time.time() - start_time}")

5. Pandas - Cumsum exec time: 1.2599973678588867


In [21]:
start_time = time.time()
_ = df.groupby('user_id')['x1'].mean()
print(f"6. Pandas - Groupby Aggregation exec time: {time.time() - start_time}")

6. Pandas - Groupby Aggregation exec time: 65.74299740791321


## Ray

### A low-level framework for parallelizing Python code across processors or clusters

In [35]:
import ray

In [None]:
start_time = time.time()
df = ray.data.read_parquet(inputpath)
print(f"1. Ray - Reading exec time: {time.time() - start_time}")

In [None]:
start_time = time.time()
df.mean('x1')
print(f"2. Ray - Mean exec time: {time.time() - start_time}")

In [None]:
start_time = time.time()
df.std("x1")
print(f"3. Ray - Std exec time: {time.time() - start_time}")

In [None]:
start_time = time.time()
df.unique('user_id')
print(f"4. Ray - Unique exec time: {time.time() - start_time}")

In [None]:
start_time = time.time()
df.cumsum('y1')
print(f"5. Ray - Cumsum exec time: {time.time() - start_time}")

In [None]:
start_time = time.time()
df.groupby('user_id').mean('x1')
print(f"6. Ray - Groupby Aggregation exec time: {time.time() - start_time}")

## Dask

### A low-level sceduler and a high-level partial Pandas replacement, geared toward running code on compute clusters

In [37]:
import dask
import dask.dataframe as dd

In [38]:
start_time = time.time()
df = dd.read_parquet(inputpath)
print(f"1. Dask - Reading exec time: {time.time() - start_time}")

1. Dask - Reading exec time: 0.15599966049194336


In [39]:
start_time = time.time()
_ = df['x1'].mean().compute(scheduler ="processes")
print(f"2. Dask - Mean exec time: {time.time() - start_time}")

2. Dask - Mean exec time: 51.74338912963867


In [40]:
start_time = time.time()
_ = df['x1'].std().compute(scheduler ="processes")
print(f"3. Dask - Std exec time: {time.time() - start_time}")

3. Dask - Std exec time: 18.715003967285156


In [41]:
start_time = time.time()
_ = df['user_id'].unique().compute(scheduler ="processes")
print(f"4. Dask - Unique exec time: {time.time() - start_time}")

4. Dask - Unique exec time: 29.739998817443848


In [42]:
start_time = time.time()
_ = df['y1'].cumsum().compute(scheduler ="processes")
print(f"5. Dask - Cumsum exec time: {time.time() - start_time}")

5. Dask - Cumsum exec time: 28.987157344818115


In [None]:
start_time = time.time()
_ = df.groupby('user_id')['x1'].mean().compute(scheduler ="processes")
print(f"6. Dask - Groupby Aggregation exec time: {time.time() - start_time}")

## Modin

### A drop-in replacement for Pandas, powered by either Dask or Ray

In [46]:
import ray
import modin.pandas as pd
ray.init()

ImportError: Unsupported pandas version: 1.3.4

In [None]:
start_time = time.time()
df = pd.read_parquet(inputpath)
print(f"1. Modin - Reading exec time: {time.time() - start_time}")

In [None]:
start_time = time.time()
_ = df['x1'].mean()
print(f"2. Modin - Mean exec time: {time.time() - start_time}")

In [None]:
start_time = time.time()
_ = df['x1'].std()
print(f"3. Modin - Std exec time: {time.time() - start_time}")

In [None]:
start_time = time.time()
_ = df['user_id'].unique()
print(f"4. Modin - Unique exec time: {time.time() - start_time}")

In [None]:
start_time = time.time()
_ = df['y1'].cumsum()
print(f"5. Modin - Cumsum exec time: {time.time() - start_time}")

In [None]:
start_time = time.time()
_ = df.groupby('user_id')['x1'].mean()
print(f"6. Modin - Groupby Aggregation exec time: {time.time() - start_time}")

## Results

In [54]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

tests = ['read', 'mean', 'std', 'unique', 'cumsum', 'groupby']
results = { 'pandas' : [60 + 46, 0.088, 1.13, 60+17, 1.09, 120 +2],
            'dask' : [17, 0.673, 4.33, 29.5, 240+12, 60+22],
            'modin' : [3*60+45, 0.526, 5.75, 120+47, 0.276, 480+44],
            'vaex': [120+43, 5.72, 5.59, 480+44, 7.26, 6*60 + 48]
        } 

results_df = pd.DataFrame(data=results, index=tests)

for test in results_df.index:
	results_df.loc[test].plot(kind='bar', title=test)
	plt.show()

AttributeError: module 'pandas.plotting._matplotlib' has no attribute 'plot'