In [30]:
import dask.dataframe as dd
import dask.array as da
import pandas as pd

In [1]:
from dask.distributed import Client

client = Client("tcp://127.0.0.1:40985")
client

0,1
Client  Scheduler: tcp://127.0.0.1:40985  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 16  Memory: 31.01 GiB


# Generate increment across partitions

In [19]:
d = {'col1': ["a", "b", "c", "d"]}
df = dd.from_pandas(pd.DataFrame(data=d), npartitions=2)
df = df.reset_index(drop=True)

In [20]:
df.compute()

Unnamed: 0,col1
0,a
1,b
0,c
1,d


In [26]:
chunks = df.map_partitions(len).compute().tolist()
chunks

[2, 2]

In [28]:
len_df = sum(chunks)
len_df

4

In [31]:
df["new_idx"] = da.arange(1, len_df + 1, chunks=chunks)

In [33]:
df.compute()

Unnamed: 0,col1,new_idx
0,a,1
1,b,2
0,c,3
1,d,4


# Cull empty partitions

In [60]:
def cull_empty_partitions(df):                                                                      
    ll = list(df.map_partitions(len).compute())                                                     
    df_delayed = df.to_delayed()                                                                    
    df_delayed_new = list()                                                                         
    pempty = None                                                                                   
    for ix, n in enumerate(ll):                                                                     
        if n == 0:                                                                                  
            pempty = df.get_partition(ix)                                                           
            print(f'culling partition {ix}')                                                 
        else:                                                                                       
            df_delayed_new.append(df_delayed[ix])                                                   
    if pempty is not None:                                                                          
        df = dd.from_delayed(df_delayed_new, meta=pempty)                                           
    return df

In [50]:
d = {'col1': ["a", "b", "c", "c"]}
df = dd.from_pandas(pd.DataFrame(data=d), npartitions=2)
df.compute()

Unnamed: 0,col1
0,a
1,b
2,c
3,c


In [52]:
df.partitions[0].compute()

Unnamed: 0,col1
0,a
1,b


In [55]:
df_filtered = df[df["col1"]!="c"]
df_filtered.compute()

Unnamed: 0,col1
0,a
1,b


In [56]:
df_filtered.npartitions

2

In [61]:
cull_empty_partitions(df_filtered).npartitions

culling partition 1


1

# Merge tiny partitions

In [None]:
def merge_tiny_partitions(df, index_length_threshold=100):                                          
    ''' Useful after concatenating with interleave_partitions at True                               
    First partition is merged with the next one, other partitions are merged with the previous ones 
    '''                                                                                             
    divisions = df.divisions                                                                        
    assert isinstance(divisions[0], (int, np.integer)), 'Not implemented'                           
    new_divisions = [divisions[0]]                                                                  
    if len(divisions) <= 2:                                                                         
        return df                                                                                   
    for i in divisions[1:]:                                                                         
        if i - new_divisions[-1] > index_length_threshold:                                          
            new_divisions.append(i)                                                                 
    if len(new_divisions) == 1:                                                                     
        new_divisions.append(divisions[-1])                                                         
    else:                                                                                           
        new_divisions[-1] = divisions[-1]                                                           
                                                                                                    
    if len(new_divisions) < len(divisions):                                                         
        df = df.repartition(divisions=new_divisions)                                                
    return df