# Australian Rainfall Data Acquisition and EDA

This notebook will be a quick run through of data acquisition and exploratory analysis to evaluate the impact of big data tools. These tools include Dask for out of core processing and Feather/Arrow/Parquet file formats to minimize memory usage and file size.

To run this notebook, use the `environment.yml` file in the root of the project to setup a Conda environment. You'll also need R > 4.0.0 installed and need to use:

```r
install.packages("arrow")
```
For using advanced file formats.

In [1]:
import re
import io
import os
import sys
import glob
import gc
import requests
import json
from urllib.request import urlretrieve
import zipfile
import pandas as pd
import numpy as np
import dask.dataframe as dd
# For different file types
import pyarrow.parquet as pq
import rpy2.rinterface
import rpy2_arrow.pyarrow_rarrow as pyra
import pyarrow.feather as feather

# Get helper functions
from scripts.utils import combine_australia_rainfall

%load_ext rpy2.ipython
%load_ext memory_profiler
%load_ext autoreload


# Constants
raw_data_directory_path = os.path.join("..", "data", "raw")
processed_data_directory_path = os.path.join("..", "data", "processed")

# Setup folders if not existing
os.makedirs(processed_data_directory_path, exist_ok=True)

First, download the zipped archive folder of all rainfall data - NOTE: THIS CAN TAKE ~ 45 minutes to run:

In [2]:
def download_progress(block_num, block_size, total_size):
    progress_size = int(block_num * block_size)
    percent = int(block_num * block_size * 100 / total_size)
    sys.stdout.write("\rDownloading... %d%%, %d MB of %d MB" %
                    (percent, progress_size / (1024 * 1024), total_size / (1024 * 1024)))
    sys.stdout.flush()


def download_from_figshare(article_id, files_to_dl, output_dir):
    url = f"https://api.figshare.com/v2/articles/{article_id}"
    response = requests.request("GET", url, headers={"Content-Type": "application/json"})
    data = json.loads(response.text)
    files = data["files"]
    for file in files:
        if file["name"] in files_to_dl:
            os.makedirs(output_dir, exist_ok=True)
            urlretrieve(file["download_url"], os.path.join(output_dir, file["name"]), download_progress)


download_from_figshare("14096681", ["data.zip"], raw_data_directory_path)

Downloading... 100%, 776 MB of 776 MB

Next, iterate through the zipped files, extracting the raw csv files:

In [3]:
with zipfile.ZipFile(os.path.join(raw_data_directory_path, "data.zip"), "r") as zf:
    file_list = zf.namelist()
    for f in file_list:
        if "__MACOSX" not in f:
            print(f"Processing File: {f}")
            zf.extract(f, path = raw_data_directory_path)


Processing File: MPI-ESM-1-2-HAM_daily_rainfall_NSW.csv
Processing File: AWI-ESM-1-1-LR_daily_rainfall_NSW.csv
Processing File: NorESM2-LM_daily_rainfall_NSW.csv
Processing File: ACCESS-CM2_daily_rainfall_NSW.csv
Processing File: FGOALS-f3-L_daily_rainfall_NSW.csv
Processing File: CMCC-CM2-HR4_daily_rainfall_NSW.csv
Processing File: MRI-ESM2-0_daily_rainfall_NSW.csv
Processing File: GFDL-CM4_daily_rainfall_NSW.csv
Processing File: BCC-CSM2-MR_daily_rainfall_NSW.csv
Processing File: EC-Earth3-Veg-LR_daily_rainfall_NSW.csv
Processing File: CMCC-ESM2_daily_rainfall_NSW.csv
Processing File: NESM3_daily_rainfall_NSW.csv
Processing File: MPI-ESM1-2-LR_daily_rainfall_NSW.csv
Processing File: ACCESS-ESM1-5_daily_rainfall_NSW.csv
Processing File: FGOALS-g3_daily_rainfall_NSW.csv
Processing File: INM-CM4-8_daily_rainfall_NSW.csv
Processing File: MPI-ESM1-2-HR_daily_rainfall_NSW.csv
Processing File: TaiESM1_daily_rainfall_NSW.csv
Processing File: NorESM2-MM_daily_rainfall_NSW.csv
Processing File:

Next we'll evaluate the impact of combining multiple large CSV files using Pandas vs. Dask. We'll check how fast Dask is when creating the task graph as well as materializing to a Pandas dataframe.

In [4]:
%%time
%%memit
df_rainfall_pandas = combine_australia_rainfall(base_folder_path=raw_data_directory_path)

peak memory: 5417.12 MiB, increment: 5220.13 MiB
CPU times: user 8min 58s, sys: 2min 28s, total: 11min 27s
Wall time: 15min 3s


In [5]:
%%time
%%memit
combine_australia_rainfall(base_folder_path=raw_data_directory_path, method="dask", delay_dask_compute=False)

peak memory: 4459.15 MiB, increment: 543.48 MiB
CPU times: user 5min 57s, sys: 3min 42s, total: 9min 40s
Wall time: 12min 28s


In [6]:
%%time
%%memit
df_rainfall_pandas.to_csv(os.path.join(processed_data_directory_path, "df_rainfall.csv"), index=False)

peak memory: 3453.01 MiB, increment: 3068.04 MiB
CPU times: user 11min 46s, sys: 1min 28s, total: 13min 14s
Wall time: 16min 14s


In [7]:
%%time
%%memit
feather.write_feather(df_rainfall_pandas, os.path.join(processed_data_directory_path, "df_rainfall.feather") )

peak memory: 2792.86 MiB, increment: 0.03 MiB
CPU times: user 34.9 s, sys: 1min 43s, total: 2min 18s
Wall time: 4min 36s


In [8]:
%%time
%%memit
df_rainfall_pandas.to_parquet(os.path.join(processed_data_directory_path, "df_rainfall.parquet"),engine='pyarrow', compression='gzip')

peak memory: 2364.34 MiB, increment: 1768.98 MiB
CPU times: user 4min 4s, sys: 1min 43s, total: 5min 48s
Wall time: 9min 40s


Looks like using Dask is ~ 30% faster to read and concatentate multiple large CSV files. We also noted that allowing Dask to parallelize and chunk reading CSV files allowed the memory usage to stay lower as well.


# Reading Large Files

We'll read in the combined dataset now looking at different file types/chunk options/columns required.

In [4]:
%%time
%%memit
df_rainfall_pandas = pd.read_parquet(os.path.join(processed_data_directory_path, "df_rainfall.parquet"),engine='pyarrow')

peak memory: 4231.67 MiB, increment: 3996.91 MiB
CPU times: user 42.5 s, sys: 20.9 s, total: 1min 3s
Wall time: 48.8 s


In [5]:
%%time
%%memit
df_rainfall_pandas = pd.read_parquet(os.path.join(processed_data_directory_path, "df_rainfall.parquet"),columns=["time", "rain (mm/day)", "model"],engine='pyarrow')

peak memory: 3700.44 MiB, increment: 3156.72 MiB
CPU times: user 37.7 s, sys: 13.5 s, total: 51.2 s
Wall time: 44 s


In [6]:
%%time
%%memit
df_rainfall_pandas = pd.read_feather(os.path.join(processed_data_directory_path, "df_rainfall.feather"))

peak memory: 3214.44 MiB, increment: 2989.17 MiB
CPU times: user 21.7 s, sys: 17 s, total: 38.7 s
Wall time: 41.3 s


In [7]:
%%time
%%memit
df_rainfall_pandas = pd.read_feather(os.path.join(processed_data_directory_path, "df_rainfall.feather"), columns=["time", "rain (mm/day)", "model"])

peak memory: 3344.96 MiB, increment: 3010.81 MiB
CPU times: user 17.8 s, sys: 7.73 s, total: 25.5 s
Wall time: 33.3 s


In [8]:
%%time
%%memit
df_rainfall_pandas = pd.read_csv(os.path.join(processed_data_directory_path, "df_rainfall.csv"), index_col=False)

peak memory: 2690.53 MiB, increment: 2351.20 MiB
CPU times: user 1min 31s, sys: 33.1 s, total: 2min 4s
Wall time: 3min 3s


In [9]:
%%time
%%memit
df_rainfall_pandas = dd.read_csv(os.path.join(processed_data_directory_path, "df_rainfall.csv")).compute()

peak memory: 3269.80 MiB, increment: 1459.37 MiB
CPU times: user 2min 13s, sys: 43.4 s, total: 2min 56s
Wall time: 2min 11s


In [10]:
# For delayed calculations
df_rainfall_dask_delay = dd.read_csv(os.path.join(processed_data_directory_path, "df_rainfall.csv"))

# EDA

Next, we'll look at the speed of determining counts of values by model from the raw dataframe.


In [11]:
%%time
%%memit
df_rainfall_pandas["model"].value_counts()


peak memory: 2561.77 MiB, increment: 292.16 MiB
CPU times: user 8.2 s, sys: 504 ms, total: 8.71 s
Wall time: 10.7 s


In [12]:
%%time
%%memit
model_counts = df_rainfall_dask_delay["model"].value_counts().compute()

peak memory: 3244.85 MiB, increment: 682.98 MiB
CPU times: user 2min 10s, sys: 24.6 s, total: 2min 35s
Wall time: 1min 32s


In [13]:
model_counts 

MPI-ESM1-2-HR       5154240
TaiESM1             3541230
NorESM2-MM          3541230
CMCC-CM2-HR4        3541230
CMCC-CM2-SR5        3541230
CMCC-ESM2           3541230
SAM0-UNICON         3541153
FGOALS-f3-L         3219300
GFDL-CM4            3219300
GFDL-ESM4           3219300
EC-Earth3-Veg-LR    3037320
MRI-ESM2-0          3037320
BCC-CSM2-MR         3035340
MIROC6              2070900
ACCESS-CM2          1932840
ACCESS-ESM1-5       1610700
INM-CM5-0           1609650
INM-CM4-8           1609650
KIOST-ESM           1287720
FGOALS-g3           1287720
MPI-ESM1-2-LR        966420
NESM3                966420
AWI-ESM-1-1-LR       966420
MPI-ESM-1-2-HAM      966420
NorESM2-LM           919800
BCC-ESM1             551880
CanESM5              551880
Name: model, dtype: int64

In [14]:
df_rainfall_pandas.shape

(62467843, 7)

In [15]:
df_rainfall_pandas.head()

Unnamed: 0,time,lat_min,lat_max,lon_min,lon_max,rain (mm/day),model
0,1889-01-01 12:00:00,-35.439867,-33.574619,141.5625,143.4375,4.244226e-13,MPI-ESM-1-2-HAM
1,1889-01-02 12:00:00,-35.439867,-33.574619,141.5625,143.4375,4.217326e-13,MPI-ESM-1-2-HAM
2,1889-01-03 12:00:00,-35.439867,-33.574619,141.5625,143.4375,4.498125e-13,MPI-ESM-1-2-HAM
3,1889-01-04 12:00:00,-35.439867,-33.574619,141.5625,143.4375,4.251282e-13,MPI-ESM-1-2-HAM
4,1889-01-05 12:00:00,-35.439867,-33.574619,141.5625,143.4375,4.270161e-13,MPI-ESM-1-2-HAM


We can see that allowing Dask to optimize the task graph speeds up getting counts of values by model fairly significantly. To load the CSV with Pandas, then compute the value counts by group it took ~ (1 min 42 seconds + 15 seconds = 1 min 57 seconds) while for Dask delaying the computation it only took 1 min 19 seconds, an ~ 30% improvement.

# Transfer between Python and R and EDA in R

> We first begin by loading neccessary libraries.

In [2]:
%%R
library(arrow)
library(dplyr)

R[write to console]: 
Attaching package: ‘dplyr’


R[write to console]: The following objects are masked from ‘package:stats’:

    filter, lag


R[write to console]: The following objects are masked from ‘package:base’:

    intersect, setdiff, setequal, union




> Next we attempt to transfer the data using feather and carry out a simple EDA.

In [5]:
%%time
%%R
r_feather <- arrow::read_feather("../data/processed/df_rainfall.feather")

CPU times: user 26.3 s, sys: 24.3 s, total: 50.6 s
Wall time: 57 s


In [9]:
%%time
%%R

r_feather %>%
group_by(model) %>%
count()

[90m# A tibble: 27 x 2[39m
[90m# Groups:   model [27][39m
   model                  n
   [3m[90m<chr>[39m[23m              [3m[90m<int>[39m[23m
[90m 1[39m ACCESS-CM2       1[4m9[24m[4m3[24m[4m2[24m840
[90m 2[39m ACCESS-ESM1-5    1[4m6[24m[4m1[24m[4m0[24m700
[90m 3[39m AWI-ESM-1-1-LR    [4m9[24m[4m6[24m[4m6[24m420
[90m 4[39m BCC-CSM2-MR      3[4m0[24m[4m3[24m[4m5[24m340
[90m 5[39m BCC-ESM1          [4m5[24m[4m5[24m[4m1[24m880
[90m 6[39m CMCC-CM2-HR4     3[4m5[24m[4m4[24m[4m1[24m230
[90m 7[39m CMCC-CM2-SR5     3[4m5[24m[4m4[24m[4m1[24m230
[90m 8[39m CMCC-ESM2        3[4m5[24m[4m4[24m[4m1[24m230
[90m 9[39m CanESM5           [4m5[24m[4m5[24m[4m1[24m880
[90m10[39m EC-Earth3-Veg-LR 3[4m0[24m[4m3[24m[4m7[24m320
[90m# … with 17 more rows[39m
CPU times: user 4.83 s, sys: 3.48 s, total: 8.3 s
Wall time: 11.7 s


> We also attempt to transfer the data using parquet and conducting a simple EDA.

In [10]:
%%time
%%R
r_parquet <- arrow::read_parquet("../data/processed/df_rainfall.parquet")


CPU times: user 53.2 s, sys: 35.4 s, total: 1min 28s
Wall time: 1min 26s


In [11]:
%%time
%%R

r_parquet %>%
group_by(model) %>%
count()

[90m# A tibble: 27 x 2[39m
[90m# Groups:   model [27][39m
   model                  n
   [3m[90m<chr>[39m[23m              [3m[90m<int>[39m[23m
[90m 1[39m ACCESS-CM2       1[4m9[24m[4m3[24m[4m2[24m840
[90m 2[39m ACCESS-ESM1-5    1[4m6[24m[4m1[24m[4m0[24m700
[90m 3[39m AWI-ESM-1-1-LR    [4m9[24m[4m6[24m[4m6[24m420
[90m 4[39m BCC-CSM2-MR      3[4m0[24m[4m3[24m[4m5[24m340
[90m 5[39m BCC-ESM1          [4m5[24m[4m5[24m[4m1[24m880
[90m 6[39m CMCC-CM2-HR4     3[4m5[24m[4m4[24m[4m1[24m230
[90m 7[39m CMCC-CM2-SR5     3[4m5[24m[4m4[24m[4m1[24m230
[90m 8[39m CMCC-ESM2        3[4m5[24m[4m4[24m[4m1[24m230
[90m 9[39m CanESM5           [4m5[24m[4m5[24m[4m1[24m880
[90m10[39m EC-Earth3-Veg-LR 3[4m0[24m[4m3[24m[4m7[24m320
[90m# … with 17 more rows[39m
CPU times: user 4.55 s, sys: 5.73 s, total: 10.3 s
Wall time: 17.4 s


### Final choice

> We decided to use feather as the preferred method of making the transfer between Python and R. We made this decision based on our reading of [the material](https://ursalabs.org/blog/2020-feather-v2/) provided in the course notes as well as our empirical results using the `%%time` cell magic. Both parquet and feather are binary file formats, however, feather outperforms parquet on solid state drives due to its simpler compression scheme. Empirically, it was about 65% faster to read the feather file compared to the parquet file. Given the aims and priorities of this milestone, we decided to choose feather as the means for file transfer.

# Challenges Encountered

Working with larger data sets on laptop can be difficult due to memory restrictions when loading a dataset. In the exercises in this lab we ran into difficulties with:

- Downloading large zip archives:
    - Getting the initial data downloaded was difficult as we were downloading one zip archive, and we couldn't parallelize this process. If there had been multiple files we could have used `multiprocessing` to download much quicker.
- Combining the dataset / EDA:
    - When combining the files with `Pandas` we were loading the entire dataset to memory and this caused issues for some group members with lower RAM. 
    - When saving the dataset, run times and memory usage were highly dependent on file type. Writing the dataset to CSV is memory intensive and takes > 3x as long as writing compressed Parquet files and  ~ 30x as long as writing a Feather file. 
    - Reading a single large file is much quicker with Feather or parquet file types - even parallelizing reading a CSV with Dask is slower than these more optimized file types.
    - If we knew in advance what calculations we wanted to carry out, we could have delayed execution and used Dask to build the graph and then run an optimized `compute()` step at the end. This can be seen above when combining all files and calculating counts of readings per model using lazy evaluation. If our calculation can be done by chunk - we could also read the CSV using Pandas in chunks and calculate our summary statistic. 