In [10]:
import pandas as pd
import dask.dataframe as dd
from typing import Callable, Type, Union, List, Tuple, Dict

df = pd.DataFrame({
    "id": [1, 2, 4, 4, 5],
    "columns": ["col1", "col2", "col1", "col1", "col3"],
    "values": ["a", "b", "c", "d", "e"]
})
ddf = dd.from_pandas(df, npartitions=1)
ddf.head()

Unnamed: 0,id,columns,values
0,1,col1,a
1,2,col2,b
2,4,col1,c
3,4,col1,d
4,5,col3,e


# Basic version

In [7]:
def dask_pivot_table(
    ddf: dd.DataFrame,
    index: str, columns: str,
    values: str,
    aggfunc: Callable,
    value_type: "str") -> dd.DataFrame:
    columns_set = ddf[columns].unique().compute()
    series = []
    for column in columns_set:
        df = ddf[ddf[columns] == column]
        if len(df.index) == 0:
            series.append(dd.from_pandas(pd.Series([], name=name, dtype=object), npartitions=1))
            continue
        df = df[[index, values]].groupby(index).aggregate(list)
        df["new_value"] = df[values].apply(aggfunc, meta=(values, value_type))
        s = df["new_value"]
        s.name = column
        series.append(s)
    pivoted = series[0].to_frame()
    for s in series[1:]:
        frame = s.to_frame()
        pivoted = dd.merge(pivoted, frame, on=index, how="outer")
    return pivoted

In [8]:
dask_pivot_table(ddf, "id", "columns", "values", lambda x: ','.join(x), "str").compute()

Unnamed: 0_level_0,col1,col2,col3
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1,a,,
4,"c,d",,
2,,b,
5,,,e


# Advanced version with multi aggfunc support

In [11]:
def _dask_pivot_table_aggregate_series(df: dd.DataFrame, name: str, index: str, values: str,
                                aggfunc: Union[Callable, Dict[str, Callable]],
                                value_type: str = "str") -> Union[dd.Series, Tuple[dd.Series]]:
    kdf = df[[index, values]].groupby(index).aggregate(list)
    if isinstance(aggfunc, dict):
        ret = []
        for func_name, func in aggfunc.items():
            s = kdf[values].apply(func, meta=(values, value_type))
            serie_name = name
            if func_name != '':
                serie_name += '_%s' % func_name
            s.name = serie_name
            ret.append(s)
        return tuple(ret)
    s = kdf[values].apply(aggfunc, meta=(values, value_type))
    s.name = name
    return s


def _dask_pivot_table_concat_series(series: Tuple[dd.Series], index: str) -> dd.DataFrame:
    first = series[0]
    ret = first
    elems_are_tuple = False
    if isinstance(first, (list, tuple)):
        elems_are_tuple = True
        ret = first[0].to_frame()
        for serie_item in first[1:]:
            serie_item_df = serie_item.to_frame()
            ret = dd.merge(ret, serie_item_df, on=index, how="outer")
    else:
        ret = ret.to_frame()
    for serie in series[1:]:
        if elems_are_tuple:
            for serie_item in serie:
                serie_item_df = serie_item.to_frame()
                ret = dd.merge(ret, serie_item_df, on=index, how="outer")
        else:
            serie_df = serie.to_frame()
            ret = dd.merge(ret, serie_df, on=index, how="outer")
    return ret


def dask_pivot_table(df: dd.DataFrame,
                     index: str,
                     columns: str,
                     values: str,
                     aggfunc: Union[Callable, Dict[str, Callable]],
                     value_type: str = "str") -> dd.DataFrame:
    """A better dd.pivot_table() that supports any aggfunc (dask is limited to mean, max, count).
    :param df: The dask dataframe to pivot
    :param index: The name of the column containing the indices
    :param columns: The name of the column containing the columns
    :param values: The name of the column containing the values
    :param aggfunc: The aggregation function to use (ie.lambda x: ','.join(x))
    Can also be a dictionary of column_name:aggregate_function eg.
    {"count": len, "concat": lambda x: ','.join(x)}
    :param value_type: The type of the value ("str", "int64", ...)
    :return: The pivot dataframe
    """
    column_set = df[columns].unique().compute()
    series = []
    for column in column_set:
        kdf = df[df[columns] == column]
        if len(df.index) == 0:
            series.append(
                dd.from_pandas(pd.Series([], name=column, dtype=object),
                               npartitions=1)
            )
            continue
        series.append(_dask_pivot_table_aggregate_series(
            kdf, column, index, values, aggfunc, value_type
        ))
    return _dask_pivot_table_concat_series(series, index)

In [12]:
df = pd.DataFrame({
    "id": [1, 2, 4, 4, 5],
    "columns": ["col1", "col2", "col1", "col1", "col3"],
    "values": ["a", "b", "c", "d", "e"]
})
ddf = dd.from_pandas(df, npartitions=1)
ddf.head()

Unnamed: 0,id,columns,values
0,1,col1,a
1,2,col2,b
2,4,col1,c
3,4,col1,d
4,5,col3,e


In [14]:
dask_pivot_table(ddf, "id", "columns", "values",
                 {'':lambda x: ','.join(x), 'multiple_value':lambda x: len(x)>1},
                 "str").compute()

Unnamed: 0_level_0,col1,col1_multiple_value,col2,col2_multiple_value,col3,col3_multiple_value
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
1,a,False,,,,
4,"c,d",True,,,,
2,,,b,False,,
5,,,,,e,False
