In [1]:
import dask.array as da
import dask.dataframe as dd
import pandas
import numpy
from sklearn.datasets import make_classification
N_SAMPLES = 50_000_000
N_FEATURES = 20
DATE_SPLITS = {
    "train": {
        "start": "2013-01-01",
        "end": "2016-12-31",
    },
    "test1": {
        "start": "2017-01-01",
        "end": "2018-12-31",
    },
    "test2": {
        "start": "2019-01-01",
        "end": "2020-12-31",
    }
}


def random_date(start, end):
    delta = pandas.Timestamp(end) - pandas.Timestamp(start)
    int_delta = delta.days * 24 * 60 * 60 + delta.seconds
    return pandas.Timestamp(start) + pandas.Timedelta(seconds=numpy.random.randint(int_delta))


def make_dataset(chunks="100MiB"):
    norm_chunks = da.core.normalize_chunks(
        chunks,
        shape=(N_SAMPLES, N_FEATURES),
        dtype="float32",
    )
    max_chunk = max(norm_chunks[0])
    print("Constructing seed dataset...", end="")
    X, y = make_classification(
        n_samples=max_chunk,
        n_features=N_FEATURES,
    )
    df = pandas.concat(
        [pandas.Series(y, name="class"), pandas.DataFrame(X)],
        axis=1,
    )
    date_range=["2013-01-01", "2020-12-31"]
    df["date"] = pandas.to_datetime(
        pandas.Series([random_date(*date_range) for _ in range(len(df))])
    )
    print("Done")
    ddf = dd.from_pandas(df, npartitions=1).persist()
    df_list = [
        ddf.sample(frac=float(samples/max_chunk), replace=True, random_state=i)
        for i, samples in enumerate(norm_chunks[0])
    ]
    if len(df_list) > 1:
        ddf = dd.concat(df_list, axis=0)
    return ddf.repartition(partition_size=chunks)

In [2]:
from dask.distributed import Client
client = Client()
client

0,1
Connection method: Cluster object,Cluster type: LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Status: running,Using processes: True
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads:  8,Total memory:  16.00 GiB

0,1
Comm: tcp://127.0.0.1:54677,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads:  8
Started:  Just now,Total memory:  16.00 GiB

0,1
Comm: tcp://127.0.0.1:54749,Total threads: 2
Dashboard: http://127.0.0.1:54750/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:54680,
Local directory: /Users/fjetter/workspace/distributed/dask-worker-space/worker-7et5xh0o,Local directory: /Users/fjetter/workspace/distributed/dask-worker-space/worker-7et5xh0o

0,1
Comm: tcp://127.0.0.1:54740,Total threads: 2
Dashboard: http://127.0.0.1:54744/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:54682,
Local directory: /Users/fjetter/workspace/distributed/dask-worker-space/worker-83xjix3t,Local directory: /Users/fjetter/workspace/distributed/dask-worker-space/worker-83xjix3t

0,1
Comm: tcp://127.0.0.1:54741,Total threads: 2
Dashboard: http://127.0.0.1:54743/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:54679,
Local directory: /Users/fjetter/workspace/distributed/dask-worker-space/worker-s8kbh253,Local directory: /Users/fjetter/workspace/distributed/dask-worker-space/worker-s8kbh253

0,1
Comm: tcp://127.0.0.1:54739,Total threads: 2
Dashboard: http://127.0.0.1:54742/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:54681,
Local directory: /Users/fjetter/workspace/distributed/dask-worker-space/worker-edj4qxy8,Local directory: /Users/fjetter/workspace/distributed/dask-worker-space/worker-edj4qxy8


In [3]:
ddf = make_dataset(chunks="50MiB")
ddf

Constructing seed dataset...Done


Unnamed: 0_level_0,class,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,date
npartitions=240,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1
,int64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,datetime64[ns]
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [4]:

import os
try:
    os.mkdir("tmp")
except OSError:
    pass

In [5]:
outs = []

for k, d in DATE_SPLITS.items():
    print(f"Computing {k}")
    sub_ddf = ddf[(ddf["date"] >= d["start"]) & (ddf["date"] < d["end"])]
    out = sub_ddf.to_parquet(f"tmp/{d['start']}/foo.parquet", engine="pyarrow", overwrite=True, compute=False)
    outs.append(out)
del ddf

Computing train
Computing test1
Computing test2


In [6]:
print(client.dashboard_link)

http://127.0.0.1:8787/status


In [7]:
import dask
dask.compute(outs)
del outs

  table = pa.Table.from_pandas(
  table = pa.Table.from_pandas(
  table = pa.Table.from_pandas(
  table = pa.Table.from_pandas(


([None, None, None],)

In [9]:
client.run(lambda dask_worker: list(dask_worker.tasks.keys()))

{'tcp://127.0.0.1:54739': ["('from_pandas-3691bc9bb48acb6abbe801455ff56154', 0)",
  "('ge-91d4c45f60e980d845e0ca89ecc331a7', 152)",
  "('getitem-d109ffac6f01aeda47924f5294ed58fa', 152)",
  "('ge-1e6b8984933e08ed998bcdd7b8ad830b', 195)",
  "('getitem-d109ffac6f01aeda47924f5294ed58fa', 195)"],
 'tcp://127.0.0.1:54740': ["('from_pandas-3691bc9bb48acb6abbe801455ff56154', 0)",
  "('concat-6b736e34491870e07e3d67e3673eb51f', 26)",
  "('sample-1b7bf85d4ba281916208d61fdd71a9aa', 0)",
  "('concat-6b736e34491870e07e3d67e3673eb51f', 42)",
  "('sample-13f656994a40ac1b51583340822bae8a', 0)",
  "('sample-e7f7c1d566ec63d31a8af32b232ea542', 0)",
  "('concat-6b736e34491870e07e3d67e3673eb51f', 32)",
  "('split-e9e899d5d887ae72c7c7bd9fa5be6c15', 32)",
  "('repartition-split-52428800-bfe6a4ada42af93a4646377d48a025fe', 97)",
  "('repartition-52428800-60f7dad0bb679e52a99fe1c8f7d952bf', 97)",
  "('ge-91d4c45f60e980d845e0ca89ecc331a7', 102)",
  "('getitem-d109ffac6f01aeda47924f5294ed58fa', 102)",
  "('ge-91d4c