In [None]:
import numpy as np
import pandas as pd
from dask.distributed import Client, wait
import dask

In [None]:
import coiled

In [None]:
import zarr as zr
#from dask.sizeof import sizeof

## Save sample_id3 as array in s3

Turns out that for the case of `N=1e10`, `sample_id3 = [f"id{str(x).zfill(10)}" for x in range(1, int(N / K) + 1)]` is a list of ~7GB. Producing this list in the workers is not only slow but also puts a lot of preassure on the memory. 

Doing a quick test, converting this list to a numpy array result in a 4.5GB array and
then smapling from that array is faster than sampling from the list. 

Since this array does not change accross partitons, we wrote the array with zarr in S3 and then we read it in the workers. 

### Array creation 

This code was run to create the array, the following is for documentation purposes.
Needs zarr installed.
```python
client = Client()

N = int(1e10)
K = int(1e2)
sample_id3_list = [f"id{str(x).zfill(10)}" for x in range(1, int(N / K) + 1)]
sample_id3_np = np.array(sample_id3_list)
sample_id3_da =  da.from_array(sample_id3_np)

dir_h2o = "s3://coiled-datasets/h2o-benchmark/sample_id3_arr"
da.to_zarr(sample_id3_da, dir_h2o)
```

In [None]:
# @dask.delayed
# def create_sample_id3():
#     x = zr.open("s3://coiled-datasets/h2o-benchmark/sample_id3_arr")
#     return x[:]

In [None]:
#@dask.delayed
def create_single_df(N, K, nfiles, dir, i):
    """
    Creates a single pandas dataframe that contains nrows=N/nfiles

    Parameters
    ----------
    N: int,
     Total number of rows
    K: int,
     Number of groups
    nfiles: int,
     Number of output files
    dir: str,
     Output directory
    i: int,
     Integer to assign to the multiple files e.g. range(nfiles)
    """
    
    
    nrows = int(N / nfiles)
    print(nrows)

    sample_id12 = [f"id{str(x).zfill(3)}" for x in range(1, K + 1)]
    
    #sample_id3 = da.from_zarr("s3://coiled-datasets/h2o-benchmark/sample_id3_arr").compute()
    
    sample_id3 = zr.open("s3://coiled-datasets/h2o-benchmark/sample_id3_arr")[:]
    
    id1 = np.random.choice(sample_id12, size=nrows, replace=True)
    id2 = np.random.choice(sample_id12, size=nrows, replace=True)
    id3 = np.random.choice(sample_id3, size=nrows, replace=True)
    id4 = np.random.choice(K, size=nrows, replace=True)
    id5 = np.random.choice(K, size=nrows, replace=True)
    id6 = np.random.choice(int(N / K), size=nrows, replace=True)
    v1 = np.around(np.random.choice(5, size=nrows, replace=True), decimals=6)
    v2 = np.around(np.random.choice(15, size=nrows, replace=True), decimals=6)
    v3 = np.around(np.random.uniform(0, 100, size=nrows), decimals=6)

    df = pd.DataFrame(
        dict(
            zip(
                [f"id{x}" for x in range(1, 7)] + ["v1", "v2", "v3"],
                [id1, id2, id3, id4, id5, id6, v1, v2, v3],
            )
        )
    )

    df = df.astype({
            "id1": "string[pyarrow]", #this was category before
            "id2": "string[pyarrow]", #this was category before
            "id3": "string[pyarrow]", #this was category before
            "id4": "Int32",
            "id5": "Int32",
            "id6": "Int32",
            "v1": "Int32",
            "v2": "Int32",
            "v3": "float64",
        })

    N_pretty = ''.join(f"{N:.0E}".split("+"))
    K_pretty = ''.join(f"{K:.0E}".split("+0"))

    df.to_parquet(
        f"{dir}/groupby-N_{N_pretty}_K_{K_pretty}_file_{i}.parquet",
    )

### Cluster details

We use 1 thread and m6i.xlarge (16GiB per worker) because we need enough memory to read the sample_id3 array, it's a 4.8GB array. We can get away with just 8GiB, but there is a bit of spilling. 

With 100 workers, we can write the whole dataset in ~5.5 min.  

In [None]:
cluster = coiled.Cluster(n_workers=100,
                         name="create_groupby_500",
                         scheduler_vm_types="m6i.xlarge",
                         worker_vm_types = "m6i.xlarge", 
                         worker_options={"nthreads": 1}, #16GB per worker
                         package_sync=True,
                         scheduler_options={"idle_timeout": "2 hour"})

In [None]:
client = Client(cluster)
client

In [None]:
N = 1e10
K = 1e2

In [None]:
s3_dir = "s3://coiled-datasets/h2o-benchmark/"

In [None]:
N = int(N)
K = int(K)
nfiles = 5000 #this creates pq files that in memory take 109MB
dir = s3_dir + "pyarrow_strings/N_1e10_K_1e2" 

In [None]:
%%time
futures = client.map(
            lambda i: create_single_df(N, K, nfiles, dir, i), range(nfiles), 
            pure=False
        )
wait(futures)