In [1]:
import dask.dataframe as dd
import pandas as pd
import numpy as np
import random
import string
import gc

In [2]:
NROFCORES=8

In [3]:
def createTable(rowCount):
    gc.collect()
    return dd.from_pandas(pd.DataFrame({'bucket': [''.join(random.choices(string.ascii_lowercase, k=2)) for _ in range(rowCount)],
                  'weight': [random.uniform(0, 2) for _ in range(rowCount)],
                  'qty': [random.randint(0, 100) for _ in range(rowCount)],
                  'risk': [random.randint(0, 10) for _ in range(rowCount)]}), NROFCORES)

In [4]:
def fn(t):
    res = t.groupby('bucket').agg({'bucket': 'count', 'qty': [sum, np.mean], 'risk': [sum, np.mean]}).compute()
    res.columns = res.columns.map('_'.join)
    return res.rename(columns={'bucket_count':'NR', 'qty_sum':'TOTAL_QTY','qty_mean':'AVG_QTY', 
                        'risk_sum':'TOTAL_RISK','risk_mean':'AVG_RISK'}).join(
        t.groupby('bucket').apply(lambda g: np.average(g.qty, weights=g.weight), meta=('x', 'f8')).to_frame('W_AVG_QTY').compute()).join(
        t.groupby('bucket').apply(lambda g: np.average(g.risk, weights=g.weight), meta=('x', 'f8')).to_frame('W_AVG_RISK').compute())


In [5]:
def my_agg(x):
    data = {'NR': x.bucket.count(),
            'TOTAL_QTY': x.qty.sum(),
            'AVG_QTY': x.qty.mean(),
            'TOTAL_RISK': x.risk.sum(),
            'AVG_RISK': x.risk.mean(),
            'W_AVG_QTY':  np.average(x.qty, weights=x.weight),
            'W_AVG_RISK':  np.average(x.risk, weights=x.weight)
           }
    return pd.Series(data, index=['NR', 'TOTAL_QTY', 'AVG_QTY', 'TOTAL_RISK', 
                                  'AVG_RISK', 'W_AVG_QTY', 'W_AVG_RISK'])

## Row Number 10k

In [6]:
t = createTable(10 * 1000)

In [7]:
%timeit fn(t)

906 ms ± 4.87 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [8]:
%timeit t.groupby('bucket').apply(my_agg, \
    meta={'NR': 'i8', 'TOTAL_QTY': 'i8', 'AVG_QTY': 'f8', 'TOTAL_RISK': 'i8', 'AVG_RISK': 'f8', 'W_AVG_QTY': 'f8', 'W_AVG_RISK': 'f8'}) \
    .compute().astype({'NR': 'int64', 'TOTAL_QTY': 'int64', 'TOTAL_RISK': 'int64'})

1.38 s ± 8.33 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


## Row Number 100k

In [9]:
t = createTable(100 * 1000)

In [10]:
%timeit fn(t)

1.12 s ± 6.13 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [11]:
%timeit t.groupby('bucket').apply(my_agg, \
    meta={'NR': 'i8', 'TOTAL_QTY': 'i8', 'AVG_QTY': 'f8', 'TOTAL_RISK': 'i8', 'AVG_RISK': 'f8', 'W_AVG_QTY': 'f8', 'W_AVG_RISK': 'f8'}) \
    .compute().astype({'NR': 'int64', 'TOTAL_QTY': 'int64', 'TOTAL_RISK': 'int64'})

1.5 s ± 8.84 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


## Row Number 1M

In [12]:
t = createTable(1000 * 1000)

In [13]:
%timeit fn(t)

2.79 s ± 31.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [14]:
%timeit t.groupby('bucket').apply(my_agg, \
    meta={'NR': 'i8', 'TOTAL_QTY': 'i8', 'AVG_QTY': 'f8', 'TOTAL_RISK': 'i8', 'AVG_RISK': 'f8', 'W_AVG_QTY': 'f8', 'W_AVG_RISK': 'f8'}) \
    .compute().astype({'NR': 'int64', 'TOTAL_QTY': 'int64', 'TOTAL_RISK': 'int64'})

2.62 s ± 13.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


## Row Number 10M

In [15]:
t = createTable(10 * 1000 * 1000)

In [16]:
%timeit fn(t)

24 s ± 959 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [17]:
%timeit t.groupby('bucket').apply(my_agg, \
    meta={'NR': 'i8', 'TOTAL_QTY': 'i8', 'AVG_QTY': 'f8', 'TOTAL_RISK': 'i8', 'AVG_RISK': 'f8', 'W_AVG_QTY': 'f8', 'W_AVG_RISK': 'f8'}) \
    .compute().astype({'NR': 'int64', 'TOTAL_QTY': 'int64', 'TOTAL_RISK': 'int64'})

14.4 s ± 247 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
