# Merge Dask DataFrames

This post demonstrates how to merge Dask DataFrames and discusses important considerations when making large joins. 

You’ll learn:
1. Join a Dask DataFrame to a pandas DataFrame
2. Join two large Dask DataFrames


## Join Dask DataFrame to pandas DataFrame

Use the df.merge() API:

`join = dask_df.merge(pandas_df, how="left", on=["Name"])`

In [48]:
import dask.dataframe as dd
import pandas as pd

In [49]:
df = pd.DataFrame(
    {
        "Name": ["Azza", "Brandon", "Cedric", "Devonte", "Eli", "Fabio"], 
        "Age": [29, 30, 21, 57, 32, 19]
    }
)
dask_df = dd.from_pandas(df, npartitions=2)

In [50]:
dask_df.head()

Unnamed: 0,Name,Age
0,Azza,29
1,Brandon,30
2,Cedric,21


In [51]:
pandas_df = pd.DataFrame(
    {
        "Name": ["Azza", "Cedric", "Fabio"], 
        "City": ["Beirut", "Dublin", "Rosario"]
    }
)

In [52]:
pandas_df.head()

Unnamed: 0,Name,City
0,Azza,Beirut
1,Cedric,Dublin
2,Fabio,Rosario


In [53]:
join = dask_df.merge(pandas_df, how="left", on=["Name"])

In [54]:
join.compute()

Unnamed: 0,Name,Age,City
0,Azza,29,Beirut
1,Brandon,30,
2,Cedric,21,Dublin
0,Devonte,57,
1,Eli,32,
2,Fabio,19,Rosario


## 5. Merge two large Dask DataFrames

Use the same df.merge() API

```
large_join = large.merge(
    also_large, 
    how="left", 
    left_index=True, 
    right_index=True
)
```

Make sure to **set the index** if you're going to join against a large Dask DataFrame more than once.

In [55]:
large = dask.datasets.timeseries(
    start="1990-01-01", 
    end="2020-01-01", 
    freq="1s", 
    partition_freq="1M"
)

also_large = dask.datasets.timeseries(
    start="1990-01-01", 
    end="2020-01-01", 
    freq="1s", 
    partition_freq="1M", 
    dtypes={"foo": int}
)

In [56]:
large.head()

Unnamed: 0_level_0,id,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
1990-01-31 00:00:00,1030,Sarah,0.386822,0.541511
1990-01-31 00:00:01,993,Patricia,0.653083,-0.599545
1990-01-31 00:00:02,963,Sarah,-0.490853,-0.801122
1990-01-31 00:00:03,980,Quinn,0.933497,0.538505
1990-01-31 00:00:04,974,Edith,-0.380639,-0.591807


In [57]:
also_large.head()

Unnamed: 0_level_0,foo
timestamp,Unnamed: 1_level_1
1990-01-31 00:00:00,1033
1990-01-31 00:00:01,992
1990-01-31 00:00:02,1009
1990-01-31 00:00:03,1015
1990-01-31 00:00:04,1001


In [58]:
large_join = large.merge(
    also_large, 
    how="left", 
    left_index=True, 
    right_index=True
)

In [59]:
large_join.head()

Unnamed: 0_level_0,id,name,x,y,foo
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
1990-01-31 00:00:00,1030,Sarah,0.386822,0.541511,1033
1990-01-31 00:00:01,993,Patricia,0.653083,-0.599545,992
1990-01-31 00:00:02,963,Sarah,-0.490853,-0.801122,1009
1990-01-31 00:00:03,980,Quinn,0.933497,0.538505,1015
1990-01-31 00:00:04,974,Edith,-0.380639,-0.591807,1001


## 1. Launch Coiled Cluster

In [None]:
import coiled
import pandas as pd
import dask.dataframe as dd
import dask
import distributed

In [None]:
cluster = coiled.Cluster(
    name="dask-merge",
    n_workers=50,
    worker_memory='16Gib',
    backend_options={'spot':'True'},
    software='coiled-examples/numpy-zarr',
    shutdown_on_close=False,
    scheduler_options={'idle_timeout':'1 hour'}
)

In [None]:
from distributed import Client
client = Client(cluster)
client

## 2. Create Datasets

In [None]:
# create large dataset as parquet to S3
def submit_jobs():
    from distributed import get_client
    
    with get_client() as client:        
        large = dask.datasets.timeseries(start="1990-01-01", end="2020-01-01", freq="1s", partition_freq="1M")
        
        large.to_parquet(
            's3://coiled-datasets/dask-merge/large.parquet',
            engine="fastparquet"
        )

client.submit(submit_jobs).result()

In [None]:
# write small dataset as parquet to S3
def submit_jobs():
    from distributed import get_client
    
    with get_client() as client:        
        small = dask.datasets.timeseries(start="1990-01-31", end="2020-01-01", freq="1D", dtypes={"z": int}).compute()
        
        small.to_parquet(
            's3://coiled-datasets/dask-merge/small.parquet'
        )

client.submit(submit_jobs).result()

## 3. Merge Large Dask DataFrame to small pandas Dataframe

In [None]:
large = dd.read_parquet('s3://coiled-datasets/dask-merge/large.parquet')
small = pd.read_parquet('s3://coiled-datasets/dask-merge/small.parquet')

In [None]:
large.npartitions

In [None]:
small.npartitions

In [None]:
join = large.merge(
    small, 
    how="left", 
    on=["timestamp"]
)

In [None]:
join.head()

In [None]:
join.loc["1990-02-01"].compute()

## 5. Merge two large Dask DataFrames

In [None]:
large = dask.datasets.timeseries(start="1990-01-01", end="2020-01-01", freq="1s", partition_freq="1M")
also_large = dask.datasets.timeseries(start="1990-01-01", end="2020-01-01", freq="1s", partition_freq="1M", dtypes={"foo": int})

In [None]:
large.head()

In [None]:
large_join = large.merge(
    also_large, 
    how="left", 
    left_index=True, 
    right_index=True
)

In [None]:
import distributed

In [None]:
%%time
joined = large_join.persist()
distributed.wait(joined);

In [None]:
result2 = result1.merge(
    right_two, how="left", left_index=True, right_index=True)

In [None]:
left.head()

In [None]:
result1.head()

In [None]:
result2.head()

### 4.1. Benchmark with persisted dataframes

#### Join Large Dask to Small pandas

In [None]:
# benchmark join1

times_join1 = []

for i in range(0,5):
    client.restart()
    
    large = dd.read_parquet('s3://coiled-datasets/dask-merge/large.parquet').persist()
    small_df = pd.read_parquet('s3://coiled-datasets/dask-merge/small.parquet')
    small_ddf_1p = dd.from_pandas(small_df, npartitions=1).persist()
    small_ddf_10p = dd.from_pandas(small_df, npartitions=10).persist()
    
    distributed.wait([large, small_ddf_1p, small_ddf_10p])
    
    start = time.time()
    
    # merge dask dataframe with small pandas dataframe
    join1 = large.merge(
        small_df, 
        how="left", 
        on=["timestamp"]
    ).persist()

    distributed.wait(join1)
    
    elapsed_time = (time.time() - start)
    times_join1.append(elapsed_time)


In [None]:
#persisted
times_join1

In [None]:
df_join1 = pd.DataFrame(data=times_join1, columns=["time"])

In [None]:
fig,ax = plt.subplots(figsize=[10,10])
plt.scatter(x=df_join1.index, y=df_join1.time, s=100)
plt.axhline(y=df_join1.time.mean(), c="red", label=f"Mean Time: {df_join1.time.mean()}")
plt.xticks(ticks=df_join1.index)
plt.yticks(ticks=np.linspace(6, 13, num=15))
plt.title("Join Large Dask to Small Pandas", fontsize=20)
plt.xlabel("Run")
plt.ylabel("Time (s)")
plt.legend();

#### Join Large Dask to Small Dask (1 partition)

In [None]:
# benchmark join2

times_join2 = []

for i in range(0,5):
    client.restart()
    
    large = dd.read_parquet('s3://coiled-datasets/dask-merge/large.parquet').persist()
    small_df = pd.read_parquet('s3://coiled-datasets/dask-merge/small.parquet')
    small_ddf_1p = dd.from_pandas(small_df, npartitions=1).persist()
    small_ddf_10p = dd.from_pandas(small_df, npartitions=10).persist()
    
    distributed.wait([large, small_ddf_1p, small_ddf_10p])
    
    start = time.time()
    
    # merge dask dataframe with small dask dataframe (1 partition)
    join2 = large.merge(
        small_ddf_1p, 
        how="left", 
        on=["timestamp"]
    ).persist()

    distributed.wait(join2)
    
    elapsed_time = (time.time() - start)
    times_join2.append(elapsed_time)


In [None]:
times_join2

In [None]:
df_join2 = pd.DataFrame(times_join2, columns=["time"])

In [None]:
fig,ax = plt.subplots(figsize=[10,10])
plt.scatter(x=df_join2.index, y=df_join2.time, s=100)
plt.axhline(y=df_join2.time.mean(), c="red", label=f"Mean Time: {df_join2.time.mean()}")
plt.xticks(ticks=df_join2.index)
plt.yticks(ticks=np.linspace(6, 13, num=15))
plt.title("Join Large Dask to Small Dask, 1 partition", fontsize=20)
plt.xlabel("Run")
plt.ylabel("Time (s)")
plt.legend();

#### Join Large Dask to Small Dask (10 partitions)

In [None]:
# benchmark join3

times_join3 = []

for i in range(0,5):
    client.restart()
    
    large = dd.read_parquet('s3://coiled-datasets/dask-merge/large.parquet').persist()
    small_df = pd.read_parquet('s3://coiled-datasets/dask-merge/small.parquet')
    small_ddf_1p = dd.from_pandas(small_df, npartitions=1).persist()
    small_ddf_10p = dd.from_pandas(small_df, npartitions=10).persist()
    
    distributed.wait([large, small_ddf_1p, small_ddf_10p])
    
    start = time.time()
    
    # merge dask dataframe with small dask dataframe (10 partitions)
    join3 = large.merge(
        small_ddf_10p, 
        how="left", 
        on=["timestamp"]
    ).persist()

    distributed.wait(join3)
    
    elapsed_time = (time.time() - start)
    times_join3.append(elapsed_time)

In [None]:
times_join3

In [None]:
df_join3 = pd.DataFrame(times_join3, columns=["time"])

In [None]:
fig,ax = plt.subplots(figsize=[10,10])
plt.scatter(x=df_join3.index, y=df_join3.time, s=100)
plt.axhline(y=df_join3.time.mean(), c="red", label=f"Mean Time: {df_join3.time.mean()}")
plt.xticks(ticks=df_join3.index)
plt.yticks(ticks=np.linspace(6, 13, num=15))
plt.title("Join Large Dask to Small Dask, 10 partitions", fontsize=20)
plt.xlabel("Run")
plt.ylabel("Time (s)")
plt.legend();

### 4.2 Benchmark without persisting dataframes

#### Join Large Dask to Small Pandas

In [None]:
times_join1_unper = []

for i in range(0,5):
    client.restart()
    
    large = dd.read_parquet('s3://coiled-datasets/dask-merge/large.parquet')
    small_df = pd.read_parquet('s3://coiled-datasets/dask-merge/small.parquet')
    small_ddf_1p = dd.from_pandas(small_df, npartitions=1)
    small_ddf_10p = dd.from_pandas(small_df, npartitions=10)
    
    distributed.wait([large, small_ddf_1p, small_ddf_10p])
    
    start = time.time()
    
    # merge dask dataframe with small pandas dataframe
    join1 = large.merge(
        small_df, 
        how="left", 
        on=["timestamp"]
    ).persist()

    distributed.wait(join1)
    
    elapsed_time = (time.time() - start)
    times_join1_unper.append(elapsed_time)


In [None]:
times_join1_unper

In [None]:
df_join1_unper = pd.DataFrame(times_join1_unper, columns=["time"])

In [None]:
fig,ax = plt.subplots(figsize=[10,10])
plt.scatter(x=df_join1_unper.index, y=df_join1_unper.time, s=100)
plt.axhline(y=df_join1_unper.time.mean(), c="red", label=f"Mean Time: {df_join1_unper.time.mean()}")
plt.xticks(ticks=df_join1_unper.index)
plt.yticks(ticks=np.linspace(30, 45, num=31))
plt.title("Join Large Dask to Small Pandas, unpersisted", fontsize=20)
plt.xlabel("Run")
plt.ylabel("Time (s)")
plt.legend();

#### Join Large Dask to Small Dask (1 partition)

In [None]:
# benchmark join2

times_join2_unper = []

for i in range(0,5):
    client.restart()
    
    large = dd.read_parquet('s3://coiled-datasets/dask-merge/large.parquet')
    small_df = pd.read_parquet('s3://coiled-datasets/dask-merge/small.parquet')
    small_ddf_1p = dd.from_pandas(small_df, npartitions=1)
    small_ddf_10p = dd.from_pandas(small_df, npartitions=10)
    
    distributed.wait([large, small_ddf_1p, small_ddf_10p])
    
    start = time.time()
    
    # merge dask dataframe with small dask dataframe (1 partition)
    join2 = large.merge(
        small_ddf_1p, 
        how="left", 
        on=["timestamp"]
    ).persist()

    distributed.wait(join2)
    
    elapsed_time = (time.time() - start)
    times_join2_unper.append(elapsed_time)


In [None]:
times_join2_unper

In [None]:
df_join2_unper = pd.DataFrame(times_join2_unper, columns=["time"])

In [None]:
fig,ax = plt.subplots(figsize=[10,10])
plt.scatter(x=df_join2_unper.index, y=df_join2_unper.time, s=100)
plt.axhline(y=df_join2_unper.time.mean(), c="red", label=f"Mean Time: {df_join2_unper.time.mean()}")
plt.xticks(ticks=df_join2_unper.index)
plt.yticks(ticks=np.linspace(30, 45, num=31))
plt.title("Join Large Dask to Small Dask, 1 partition, unpersisted", fontsize=20)
plt.xlabel("Run")
plt.ylabel("Time (s)")
plt.legend();

#### Join Large Dask to Small Dask (10 partitions)

In [None]:
# benchmark join3

times_join3_unper = []

for i in range(0,5):
    client.restart()
    
    large = dd.read_parquet('s3://coiled-datasets/dask-merge/large.parquet')
    small_df = pd.read_parquet('s3://coiled-datasets/dask-merge/small.parquet')
    small_ddf_1p = dd.from_pandas(small_df, npartitions=1)
    small_ddf_10p = dd.from_pandas(small_df, npartitions=10)
    
    distributed.wait([large, small_ddf_1p, small_ddf_10p])
    
    start = time.time()
    
    # merge dask dataframe with small dask dataframe (10 partitions)
    join3 = large.merge(
        small_ddf_10p, 
        how="left", 
        on=["timestamp"]
    ).persist()

    distributed.wait(join3)
    
    elapsed_time = (time.time() - start)
    times_join3_unper.append(elapsed_time)


In [None]:
times_join3_unper

In [None]:
df_join3_unper = pd.DataFrame(times_join3_unper, columns=["time"])

In [None]:
fig,ax = plt.subplots(figsize=[10,10])
plt.scatter(x=df_join3_unper.index, y=df_join3_unper.time, s=100)
plt.axhline(y=df_join3_unper.time.mean(), c="red", label=f"Mean Time: {df_join3_unper.time.mean()}")
plt.xticks(ticks=df_join3_unper.index)
plt.yticks(ticks=np.linspace(30, 45, num=31))
plt.title("Join Large Dask to Small Dask, 10 partitions, unpersisted", fontsize=20)
plt.xlabel("Run")
plt.ylabel("Time (s)")
plt.legend();

### Plot Means

In [None]:
df_means_persisted = pd.DataFrame(
    data=[df_join1.time.mean(), df_join2.time.mean(), df_join3.time.mean()], 
    columns=['mean_time_per_join']
)

In [None]:
df_means_persisted['join_type'] = ['dask_to_pandas', 'dask_to_dask_1partition', 'dask_to_dask_10partitions']

In [None]:
df_means_persisted

In [None]:
df_means_unpersisted = pd.DataFrame(
    data=[df_join1_unper.time.mean(), df_join2_unper.time.mean(), df_join3_unper.time.mean()], 
    columns=['mean_time_per_join']
)

In [None]:
df_means_unpersisted['join_type'] = ['dask_to_pandas', 'dask_to_dask_1partition', 'dask_to_dask_10partitions']

In [None]:
df_means_unpersisted

In [None]:
fig,axs = plt.subplots(1,2, figsize=(10,10))

axs[0].scatter(
    x=df_means_persisted.join_type, 
    y=df_means_persisted.mean_time_per_join,
    s=100,
)
axs[0].tick_params(labelrotation=90)

axs[1].scatter(
    x=df_means_unpersisted.join_type, 
    y=df_means_unpersisted.mean_time_per_join,
    s=100,
)
axs[1].tick_params(labelrotation=90)


fig.suptitle("Persisted (left) vs Unpersisted (right)", fontsize=20);

    