# Big Data in Python with Dask DataFrames
### by [Jason DeBacker](https://jasondebacker.com), October 2024

In this notebook, we will explore how to use Dask DataFrames to work with big data in Python. Dask is a flexible parallel computing library for analytic computing. It is designed to scale from single machines to large clusters and it provides a familiar DataFrame interface that can scale to big data.

Dask DataFrames are a high-level interface to distributed computing and can handle large datasets that don't fit into memory. Dask DataFrames are built on top of Pandas DataFrames and provide a similar API. This makes it easy to switch between Pandas and Dask DataFrames.  

Note that Dask is not a panacea and there one wants to be aware of [when to use and not use Dask DataFrames](https://docs.dask.org/en/stable/dataframe.html#when-not-to-use-dask-dataframes).

Files used in this notebook can be found in my Dropbox [here](https://www.dropbox.com/scl/fo/w24rc5qw87n8g6d1cm3nr/AFqZZELatCOF4Bryg6g7KSo?rlkey=f1hb8chhbd30cz6zdo48n90h9&st=b6f87a0o&dl=0).  Note files are up to 10GB in size.

In [1]:
# imports
import pandas as pd
import numpy as np
import time
import dask.dataframe as dd
import os
# from memory_profiler import profile
import memory_profiler


In [2]:
# to allow use of memit for memory profiling
%load_ext memory_profiler

In [3]:
# set paths to data
CUR_DIR = os.getcwd()
DATA_DIR = os.path.join(CUR_DIR, 'data')
DATA_FILE = os.path.join(DATA_DIR, 'trade_data_1962_to_2022.csv')
DATA_FILE2 = os.path.join(DATA_DIR, 'sitc_country_country_product_year_4_2022.dta')


In [None]:
# Read large data file into Pandas DataFrame
start_time = time.time()
trade_data_df = pd.read_csv(DATA_FILE)
print(f"Time taken to read data into Pandas DataFrame: {time.time() - start_time}")
# Read large datafile into Dask DataFrame
start_time = time.time()
trade_data_dd = dd.read_csv(DATA_FILE)
print(f"Time taken to read data into Dask DataFrame: {time.time() - start_time}")

In [None]:
# How big are these data?
print(f"Size of trade_data_df: {trade_data_df.shape}")

In [None]:
trade_data_dd.head()

While the below is running, check out dask DataFrames documentation: [https://docs.dask.org/en/stable/dataframe.html](https://docs.dask.org/en/stable/dataframe.html)

In [None]:
# Set up a Dask client
from dask.distributed import Client
client = Client(n_workers=8)

In [None]:
# Time how long to describe data with Pandas and Dask
start_time = time.time()
trade_data_df.describe()
print(f"Time taken to describe data with Pandas: {time.time() - start_time}")
start_time = time.time()
trade_data_dd.describe().compute(client=client)
print(f"Time taken to describe data with Dask: {time.time() - start_time}")

In [None]:
trade_data_dd.npartitions

## Group data

Here we apply the `groupby` operation to Dask and Pandas DataFrames to produce some plots.

Note that Pandas is faster than Dask because the shuffling of data necessary for the `groupby` operation is slow when done across workers.

In [None]:
# create a plot with export volume by country over the years
start_time = time.time()
fig = trade_data_dd.groupby(['country_id'])['export_value'].sum().nlargest(10).compute().plot.bar()
# read in csv to get country names
iso_codes = pd.read_csv("https://gist.githubusercontent.com/radcliff/f09c0f88344a7fcef373/raw/2753c482ad091c54b1822288ad2e4811c021d8ec/wikipedia-iso-country-codes.csv")
# put names iso code to names in dictionary
country_dict = iso_codes.set_index('Numeric code')['English short name lower case'].to_dict()
# update x labels with country names
fig.set_xticklabels(labels=[country_dict[int(x.get_text())] for x in fig.get_xticklabels()])
print(f"Time taken to plot data with Dask: {time.time() - start_time}")


In [None]:
# create a plot with export volume by country over the years
start_time = time.time()
fig = trade_data_df.groupby(['country_id'])['export_value'].sum().nlargest(10).plot.bar()
# read in csv to get country names
iso_codes = pd.read_csv("https://gist.githubusercontent.com/radcliff/f09c0f88344a7fcef373/raw/2753c482ad091c54b1822288ad2e4811c021d8ec/wikipedia-iso-country-codes.csv")
# put names iso code to names in dictionary
country_dict = iso_codes.set_index('Numeric code')['English short name lower case'].to_dict()
# update x labels with country names
fig.set_xticklabels(labels=[country_dict[int(x.get_text())] for x in fig.get_xticklabels()])
print(f"Time taken to plot data with Pandas: {time.time() - start_time}")


## Memory profiling

Below does some memory profiling to see how much memory is used to read in and merge data with Pandas and Dask.

In [4]:
#########################################
# Merge example using pandas
#########################################

def pd_profile_func(trade_data):

    # read in two data set to merge with pandas
    # trade_data = pd.read_stata(DATA_FILE2)

    # create partner data with eci, coi for partner
    partner_data = trade_data[['country_id', 'product_id', 'year', 'eci', 'coi']]

    # merge in pandas
    merged = trade_data.merge(right=partner_data, left_on="partner_country_id", right_on="country_id")

    # write the merged data out
    merged.to_csv(os.path.join(DATA_DIR, "trade_merged_pd.csv"))

In [None]:
df = pd.read_stata(DATA_FILE2, nrows=300_000)
%memit pd_profile_func(df)

In [59]:
#########################################
# Merge example using dask
#########################################

def dd_profile_func(trade_data):

    # read in data set to merge with dask
    # trade_data = dd.read_csv(DATA_FILE2)

    # create partner data with eci, coi for partner
    partner_data = trade_data[['country_id', 'product_id', 'year', 'eci', 'coi']]
    print(type(trade_data))
    print(type(partner_data))

    # merge in dask
    merged_dd = trade_data.merge(right=partner_data, left_on="partner_country_id", right_on="country_id")

    # write the merged data out
    merged_dd.to_csv(os.path.join(DATA_DIR, "trade_merged_dd.csv"))


In [None]:
ddf = dd.from_pandas(pd.read_stata(DATA_FILE2, nrows=300_000))
%memit dd_profile_func(ddf)

In [None]:
!python pandas-mem-profile.py

The above may take a while to run. Here's the output:
```
Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
     8    122.5 MiB    122.5 MiB           1   @profile
     9                                         def profile_func():
    10                                         
    11                                             # packages
    12    122.5 MiB      0.0 MiB           1       CUR_DIR = os.getcwd()
    13    122.5 MiB      0.0 MiB           1       DATA_DIR = os.path.join(CUR_DIR, 'data')
    14                                         
    15                                             # read in two data set to merge with pandas
    16                                             # trade_data = pd.read_csv(os.path.join(DATA_DIR, "trade_data_1962_to_2022.csv"))
    17    497.0 MiB    374.4 MiB           1       trade_data = pd.read_stata(os.path.join(DATA_DIR, "sitc_country_country_product_year_4_2022.dta"))
    18                                             # # keep if year > 2021
    19                                             # trade_data = trade_data[trade_data['year'] > 2021]
    20                                             # keep just first 1000 rows
    21    497.0 MiB      0.0 MiB           1       trade_data = trade_data.head(100_000)
    22    497.0 MiB      0.0 MiB           1       print("Num obs = ", len(trade_data))
    23                                         
    24                                             # create partner data with eci, coi for partner
    25    498.9 MiB      1.9 MiB           1       partner_data = trade_data[['country_id', 'product_id', 'year', 'eci', 'coi']]
    26                                         
    27                                             # merge in pandas
    28   6258.9 MiB   5760.0 MiB           1       merged = trade_data.merge(right=partner_data, left_on="partner_country_id", right_on="country_id")
    29                                         
    30                                             # write the merged data out
    31   6295.3 MiB     36.4 MiB           1       merged.to_csv(os.path.join(DATA_DIR, "trade_merged_pd.csv"))
```

In [None]:
!python dask-mem-profile.py

The above may take a while to run. Here's the output:
```
Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
     9    163.8 MiB    163.8 MiB           1   @profile
    10                                         def profile_func():
    11
    12                                             # packages
    13    163.8 MiB      0.0 MiB           1       CUR_DIR = os.getcwd()
    14    163.8 MiB      0.0 MiB           1       DATA_DIR = os.path.join(CUR_DIR, 'data')
    15
    16                                             # read in two data set to merge with pandas
    17    166.3 MiB      2.4 MiB           1       trade_data = dd.read_csv(os.path.join(DATA_DIR, "trade_data_2022.csv"))
    18                                             # keep just first rows
    19    584.5 MiB    418.2 MiB           1       trade_data = trade_data.head(100_000)
    20    584.5 MiB      0.0 MiB           1       print("Num obs = ", len(trade_data))
    21
    22                                             # create partner data with eci, coi for partner
    23    584.7 MiB      0.2 MiB           1       partner_data = trade_data[['country_id', 'product_id', 'year', 'eci', 'coi']]
    24
    25                                             # merge in pandas
    26  10126.3 MiB   9541.6 MiB           1       merged = trade_data.merge(right=partner_data, left_on="partner_country_id", right_on="country_id")
    27
    28                                             # write the merged data out
    29  10158.1 MiB     31.8 MiB           1       merged.to_csv(os.path.join(DATA_DIR, "trade_merged_pd.csv"))
```

## Merge on index

Merges on index are faster than merges on columns.  When possible, set the index of the DataFrames to be merged on to the same column and merge on index.

In [27]:
trade_data = pd.read_csv(os.path.join(DATA_DIR, "trade_data_2022.csv"))
# drop duplicates of country_id
trade_data.drop_duplicates(subset="country_id", inplace=True)  ## data is by country-product -- but eci only vary by country
# create partner data with eci, coi for partner
partner_data = trade_data[['country_id', 'eci']].copy()
# drop duplicates
partner_data.drop_duplicates(inplace=True)
# merge in pandas
start_time = time.time()
merged = trade_data.merge(right=partner_data, left_on="partner_country_id", right_on="country_id", how="left")
print(f"Time taken to merge data with Pandas merge: {time.time() - start_time}")

# set country_id as index
trade_data2 = trade_data.copy()
partner_data2 = partner_data.copy()
trade_data2.set_index("partner_country_id", inplace=True)
partner_data2.set_index("country_id", inplace=True)
start_time = time.time()
merged = trade_data2.merge(partner_data2, left_index=True, right_index=True, how="left")
print(f"Time taken to merge data with Pandas merge on index: {time.time() - start_time}")

Time taken to merge data with Pandas merge: 0.0056188106536865234
Time taken to merge data with Pandas merge on index: 0.0007998943328857422


In [25]:
%timeit merged = trade_data.merge(right=partner_data, left_on="partner_country_id", right_on="country_id", how="left")
%timeit merged = trade_data2.merge(partner_data2, left_index=True, right_index=True, how="left")

144 ms ± 638 μs per loop (mean ± std. dev. of 7 runs, 10 loops each)
176 ms ± 1.13 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [None]:
# clean up files saved for this exercise
os.remove(os.path.join(DATA_DIR, "trade_merged_pd.csv"))
os.remove(os.path.join(DATA_DIR, "trade_merged_dd.csv"))