# Imbalanced Data Join

## Background

We've run into some applied situations in the wild where joins on highly imbalanced data behave poorly in Dask. In some situtions the run time is more than desired, and in others workers may die unexpectedly. The purpose of this notebook is to create two simulated data sets that exhibits the type of problematic imbalance that we've encountred in the wild. We'll have a few parameters in here so we can create datasets of different sizes that exhibit the gnarly imbalance and these can be used for practical test cases. 



In [None]:
import pandas as pd
import scipy.stats as st
import numpy as np

np.random.seed(42)

## Constants

These are constants used below, they are described in the text

In [None]:
# df1 constants:

bucket_n =  40_000   
key_n    =  7000  
group1_n = 10
group2_n = 20
group3_n = 30
group4_n = 40

P_maj1 = (160, 125125)     # parameters of the majority exp dist
P_min1 = (1006402, 8200726) # parameters of the minority exp dist

# df2 constants:

df2_groupings_n = 11578  

P_maj2 = (1.0, 1.5 )      # parameters for the majority exp dist
P_min2 = (101.0, 554.0 )  # parameters for the minority exp dist

## `df1` - data with large record count

The first dataframe is going to be  `df1` and it is the larger table in terms of record count. The fields in `df1` are as follows:

`key`: the key used to join with `df2` later. The number of unique keys is low compared to the number of records in each key

`bucket1`: Every key has the same number of unique values in `bucket1`. That number is set by the constant `bucket_n` below

`group1`: There are 4 groupings. Each of these groupings is assinged a key and they are later used in a group by 

`group2`: see above

`group3`: see above

`group4`: see above

`value`: This is the value we will sum later. It's random and has no impact on the process in any way. 

***side note:*** not a bad idea to allow an arbitrary number of groupings sent by a parameter for testing. May iterate on that later. 



## `df2` - smaller dataset that puts `df1` into some groupings


`df2_grouping`: every key is in at least one, and often many groupings. 

`key`: Key to join to `df1` - not unique in *either* df1 nor df2. This is a many to many join on highly imbalanced groupings. 


## Simulate `df1`

Let's build `df1` first:




Using the original problematic data, the number of records per bucket is distributed like a double exponential distribution (i.e. two exponential distributions mixed together). 95% of the buckets get their draw from a somewhat shorter tailed distribution, and 5% get their number of record draws from a much longer tail distribution.

This will give us roughly half a million sims per key if we use the exponential parameters set up above (`P_maj1` & `P_min1` ) and 40,000 buckets per key.

95% first:


In [None]:

draws_per_key_majority = st.expon.rvs(*P_maj1, size= round(.95 * key_n)).astype(int)

then the 5%:

In [None]:

draws_per_key_minority = st.expon.rvs(*P_min1,  size= round(.05 * key_n)).astype(int)

note that doing fractions from one dist then another fraction from another can end up with an off by one error. We might have 10 keys but only end up simulating 9. The probability of this happening goes down as number of keys goes up


In [None]:
draws_per_key = np.concatenate((draws_per_key_majority, draws_per_key_minority))

Total number of records that will be in `df1`:

In [None]:
sum(draws_per_key) 

So now we know how many buckets and how many records per bucket. So the simulation of `df1` will be to loop over the `draws_per_bucket` and draw that many observations with random groups. This could all be vectorized but I'm keeping this a loop to keep it readable

In [None]:
def create_df1_files(draws, key):
    
    bucket_n =  40_000   
    group1_n = 10
    group2_n = 20
    group3_n = 30
    group4_n = 40
        
    df_key = np.resize(key, draws)
    
    df_bucket = np.resize(np.arange(1, bucket_n + 1), draws)

    df_group1 = np.random.randint(low=1, high=group1_n, size=draws)
    df_group2 = np.random.randint(low=1, high=group2_n, size=draws)
    df_group3 = np.random.randint(low=1, high=group3_n, size=draws)
    df_group4 = np.random.randint(low=1, high=group4_n, size=draws)
    df_value = np.random.random(size=draws)
    
    df = pd.DataFrame(
        dict(
            zip(
                ["key", "bucket", "group1", "group2", "group3", "group4", "value"],
                [df_key, df_bucket, df_group1, df_group2, df_group3, df_group4, df_value],
            )
        )
    )

    df.to_parquet(f's3://test-imbalanced-join/df1/key_{str(key).zfill(5)}.parquet')
    

In [None]:
from dask.distributed import Client

In [None]:
from coiled import Cluster

In [None]:
cluster = Cluster(name="imbalanced_data",
                  n_workers=50, 
                  package_sync=True,
                  scheduler_options={"idle_timeout": "2 hour"})

In [None]:
client = Client(cluster)

In [None]:
client

In [None]:
futures = client.map(
            lambda draws, key: create_df1_files(draws, key), draws_per_key, range(1, key_n+1)
        )