# Dask 대용량 데이터 처리

In [19]:
import time
import pandas as pd
import numpy as np
import dask.dataframe as dd

In [20]:
def create_datesets(nrows: int, ncols: int) -> tuple[pd.DataFrame, dd.DataFrame]:
    main_date = {f"col_{i}": np.random.rand(nrows) for i in range(ncols)}
    ref_data = {f"col_{i}": np.random.rand(nrows // 10) for i in range(ncols)}
    main_df = pd.DataFrame(main_date)
    ref_df = pd.DataFrame(ref_data)
    return main_df, ref_df

In [21]:
def pandas_operations(main_df: pd.DataFrame, ref_df:pd.DataFrame)->tuple[float, float]:
    start_time_agg = time.time()
    grouped = main_df.groupby("col_0").mean()
    end_time_agg = time.time()
    
    start_time_join = time.time()
    joined = main_df.merge(ref_df, on="col_0", how="left")
    end_time_join = time.time()
    
    return end_time_agg - start_time_agg, end_time_join - start_time_join

In [22]:
def dask_operations(
        main_df: dd.DataFrame,
        ref_df: dd.DataFrame,
        npartitions: int
)->tuple[float, float]:
    dmain_df = dd.from_pandas(main_df, npartitions=npartitions)
    dref_df = dd.from_pandas(ref_df, npartitions=npartitions)
    
    start_time_agg = time.time()
    grouped_task = dmain_df.groupby("col_0").mean()
    grouped = grouped_task.compute()
    end_time_agg = time.time()
    grouped_task.visualize("grouped_task.svg")
    
    start_time_join = time.time()
    joined_task = dmain_df.merge(dref_df, on="col_0", how="left")
    joined = joined_task.compute()
    end_time_join = time.time()
    joined_task.visualize("joined_task.svg")
    
    return end_time_agg - start_time_agg, end_time_join - start_time_join
    
    return end_time_agg - start_time_agg, end_time_join - start_time_join

In [23]:
main_df, ref_df = create_datesets(10_000_000, 5)

In [24]:
pandas_agg_time, pandas_join_time = pandas_operations(main_df, ref_df)
dask_agg_time, dask_join_time = dask_operations(main_df, ref_df, npartitions=10)

In [25]:
print("Pandas Aggregation Time: ", pandas_agg_time)
print("Pandas Join Time: ", pandas_join_time)

print("Dask Aggregation Time: ", dask_agg_time)
print("Dask Join Time: ", dask_join_time)

Pandas Aggregation Time:  4.432865142822266
Pandas Join Time:  1.8928120136260986
Dask Aggregation Time:  5.052574872970581
Dask Join Time:  1.7457740306854248
