In [None]:
import dask.dataframe as dd
from dask.highlevelgraph import HighLevelGraph
from dask.dataframe.core import new_dd_object
import pandas as pd
import datetime
import os
from pathlib import Path
import glob

In [None]:
output_path = './out'
os.makedirs(output_path, exist_ok=True)

In [None]:
iris = pd.read_csv('https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv')
iris['petal_product'] = iris['petal_length'] * iris['petal_width']
iris.index = pd.DatetimeIndex(pd.date_range(start=datetime.date(2000, 1, 1), periods=150), name='trading_date')
iris.columns = pd.MultiIndex.from_product([['grp1', 'grp2'], ['a', 'b', 'c']])
iris.columns.names = ['toto', 'tata']
iris

In [None]:
dask_df = dd.from_pandas(iris, npartitions=5)
dask_df

In [None]:
def to_partitioned_parquet(dask_df, path):
    pd.to_pickle(dask_df._meta, Path(path) / 'meta.pickle')
    pd.to_pickle(dask_df.divisions, Path(path) / 'divisions.pickle')
    
    def dump_partition(df, partition_info):
        partition_number = partition_info['number'] 
        if partition_number != -1:
            df.to_parquet( Path(path) / f'df_{partition_number}.parquet')
        return df
    
    return dd.map_partitions(dump_partition, dask_df)

In [None]:
res = to_partitioned_parquet(dask_df, output_path).compute()

In [None]:
res

In [None]:
def from_partitioned_parquet(path):
    meta = pd.read_pickle(Path(path) / 'meta.pickle')
    divisions = pd.read_pickle(Path(path) / 'divisions.pickle')
    
    layer = {('read-csv', i): (pd.read_parquet, path)
             for i, path in enumerate(sorted(glob.glob('./out/*.parquet')))}

    graph = HighLevelGraph({'read-csv': layer},
                           {'read-csv': set()})
    return new_dd_object(graph, name='read-csv', meta=meta, divisions=divisions)

In [None]:
toto = from_partitioned_parquet('./out/')
toto 

In [None]:
toto.compute()