In [2]:
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
from numpy.testing import assert_array_almost_equal
import statsmodels.api as sm
from line_profiler import LineProfiler


## Generating some data

In [68]:
def generate_random_data(n_rows, n_cols, seed=1):
    """ Generate random gaussian data with a given seed """
    np.random.seed(seed)
    random_data = np.random.normal(size=n_cols * n_rows, scale=4)
    random_data = random_data.reshape(n_rows, n_cols)
    random_df = pd.DataFrame(random_data)
    return random_df

df = generate_random_data(n_rows=64*50, n_cols=1)
df['group'] = np.arange(len(df)) % 64

In [4]:
def ols_groupby_lstsq(series):
    lenght_x = series.shape[0]
    X = np.arange(lenght_x)
    ones = np.ones(lenght_x)
    X = np.vstack((X, ones)).T
    slope, intercept = np.linalg.lstsq(X, series, rcond=-1)[0]
    return slope

In [5]:
%%timeit
slopes_by_group = df.groupby('group')[0].apply(ols_groupby_lstsq)

13.7 ms ± 497 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [6]:
%%timeit
slopes_by_group = df.groupby('group')[0].agg(ols_groupby_lstsq)

7.35 ms ± 67.2 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [7]:
%%timeit
slopes_by_group = df.groupby('group')[0].transform(ols_groupby_lstsq)

24.1 ms ± 500 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [70]:
# from collections import defaultdict
import numba
from numba.typed import Dict
from numba import types

def get_group_ixs0(ids):
    id_hash = defaultdict(list)
    for j, key in enumerate(ids):
        id_hash[key].append(j)
    id_hash = {k: np.array(v) for k, v in id_hash.items()}
    return id_hash

# @numba.jit(nopython=False)
def get_group_ixs(group_ids):
    result = dict()
    for key in set(group_ids):
        result[key] = []

    for i, val in enumerate(group_ids):
        result[val].append(i)
    return result

# @numba.jit(nopython=True)
def group_apply(values, group_ids, func):
    output = np.repeat(np.nan, len(values))
    ixs = get_group_ixs(group_ids)
    for ix in ixs.values():
        output[ix] = func(values[list(ix)])
    return output

In [71]:
def ols_group_ixs_numba(array):
    lenght_x = array.shape[0]
    X = np.arange(lenght_x)
    ones = np.ones(lenght_x)
    X = np.vstack((X, ones)).T
    slope = np.linalg.lstsq(X, array, rcond=-1)[0][0]
    return slope

In [72]:
%%timeit
group_apply(df[0].values[0:1], df['group'].values[0:1], ols_lstsq_raw)

NameError: name 'ols_lstsq_raw' is not defined

In [62]:
@numba.jit(nopython=True)
def ols_lstsq_raw_numba(row):
    """ """
    lenght_x = row.shape[0]
    X = np.arange(lenght_x)
    ones = np.ones(lenght_x)
    X = np.vstack((X, ones)).T
    slope, intercept = np.linalg.lstsq(X, row, rcond=-1)[0]
    return slope

group_apply(df[0].values[0:1], df['group'].values[0:1], ols_lstsq_raw_numba)

In [63]:
%%timeit
group_apply(df[0].values, df['group'].values, ols_lstsq_raw_numba)

2.47 ms ± 34.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [38]:
group_ids = [1, 1, 2, 2, 3]
get_group_ixs(group_ids)
aaa = set(group_ids)

In [45]:
result = dict()
for key in set(group_ids):
    result[key] = np.array([], dtype='int64')

for i, val in enumerate(group_ids):
    result[val] = np.append(result[val], i)
result

{1: array([0, 1]), 2: array([2, 3]), 3: array([4])}

In [46]:
results_pandas = df.groupby('group')[0].transform(ols_groupby_lstsq)
results_group_apply = group_apply(df[0].values, df['group'].values, ols_group_ixs)
assert_array_almost_equal(results_pandas, results_group_apply)

### Dask

- Pandas and Numpy distributed computing
- Bag (standard Python collections), Array(NumPy) and Distributed DataFrame (Pandas)
- Super-easy parallelised Pandas functions

Dask official documentation: https://docs.dask.org/en/latest/dataframe.html

In [None]:
import dask.dataframe as dd

In [None]:
%%timeit
N_PARTITIONS = 4
SCHEDULER = "processes"
ddf = dd.from_pandas(df, npartitions=N_PARTITIONS, sort=False)
slopes = ddf.groupby("group")[0].transform(
    ols_groupby_lstsq,
    # axis=1,
    # meta=(None, 'float64'),
    # raw=True,
).compute(scheduler=SCHEDULER)

In [None]:
%%timeit
N_PARTITIONS = 16
SCHEDULER = "processes"
ddf = dd.from_pandas(df, npartitions=N_PARTITIONS, sort=False)
slopes = ddf.groupby("group")[0].apply(
    ols_groupby_lstsq,
).compute(scheduler=SCHEDULER)