# Rainfall Analysis - 525 Group 22

## Usage

For easiest readability, please view the notebook as a webpage [here](https://ubc-mds.github.io/rainfall_group22/), or download and open the raw [html](https://github.com/UBC-MDS/rainfall_group22/blob/main/notebooks/milestone1.html).

This notebook is to be run in the 525 conda environment. I experienced some issues with `r2py`, but was able to solve them by adding `python=3.8.6` to the yaml file and recreate the environment.

We have used some helper functions, so before running please make sure to clone the repo: [https://github.com/UBC-MDS/rainfall_group22](https://github.com/UBC-MDS/rainfall_group22).

Below we will install some extra dependencies:

In [12]:
# Installations
import sys

# used for pretty printing summary table colours
%conda install --yes --prefix {sys.prefix} seaborn

# our package from 524, used to pretty-print file sizes during download. Not super necessary but fun to use.
%pip install --index-url https://test.pypi.org/simple/ --extra-index-url https://pypi.org/simple nicenumber

# used to show file download progress bar
%pip install tqdm

Collecting package metadata (current_repodata.json): done
Solving environment: done


  current version: 4.9.2
  latest version: 4.10.0

Please update conda by running

    $ conda update -n base conda



# All requested packages already installed.


Note: you may need to restart the kernel to use updated packages.
Looking in indexes: https://test.pypi.org/simple/, https://pypi.org/simple
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [52]:
from src import download as dl
from src import functions as f
from nicenumber import nicenumber as nn

import pandas as pd
import numpy as np
from collections import defaultdict

import rpy2.rinterface
import dask.dataframe as dd

# install the packages https://arrow.apache.org/docs/python/install.html
import pyarrow.dataset as ds
import pyarrow as pa
import pyarrow.parquet as pq

# How to install put instructions https://anaconda.org/conda-forge/rpy2
import rpy2.rinterface

# install this https://pypi.org/project/rpy2-arrow/#description  pip install rpy2-arrow
# have to install this as well conda install -c conda-forge r-arrow 
import rpy2_arrow.pyarrow_rarrow as pyra

import pyarrow.feather as feather

In [27]:
%load_ext rpy2.ipython
%load_ext memory_profiler

The memory_profiler extension is already loaded. To reload it, use:
  %reload_ext memory_profiler


## 1. Download Data

In [4]:
%%time
%%memit

# download and unzip data files
files = dl.download_files('data.zip', chunk_size=10)

if files:
    dl.unzip(p=files[0], p_dst='csv', delete=False)

INFO    58   src.download               Downloading 814MB file in 10MB chunks.
100%|██████████| 814M/814M [04:19<00:00, 3.14MiB/s]
INFO    69   src.download               File downloaded to: /Users/Jayme/OneDrive/MDS/525/rainfall_group22/data/data.zip
INFO    126  src.download               Unpacking zip to: /Users/Jayme/OneDrive/MDS/525/rainfall_group22/data/csv


peak memory: 936.94 MiB, increment: 660.67 MiB
CPU times: user 15.8 s, sys: 7.58 s, total: 23.4 s
Wall time: 4min 40s


## 2. Combine CSVs

In [3]:
# set download directories
p_data = dl.p_data # top level data dir
p_csv = p_data / 'csv' # sub dir for saving loose csvs
p_combined = p_data / 'rainfall.csv' # main csv file to use

In [5]:
%%time
%%memit

# combine csvs with pandas
csvs = [p for p in p_csv.glob('*.csv')]
dfs = []

# load individual dfs and save to list
for p in csvs:
    model_name = p.name.split('_')[0]

    df = pd.read_csv(p) \
        .assign(model=model_name)

    dfs.append(df)

# concat all dfs
df = pd.concat(dfs) \
    .rename(columns={'rain (mm/day)': 'rain'})

peak memory: 13064.04 MiB, increment: 12683.50 MiB
CPU times: user 54.9 s, sys: 6.91 s, total: 1min 1s
Wall time: 1min 2s


#### Runtimes

Times to combine dataframe csvs for each team member:

|User|OS|Processor|RAM|Load Time (s)|
|:--|:--|:--|:--|--:|
|Jayme|Mac OS|2.4 GHz 8-Core Intel Core i9|32 GB 2667 MHz DDR4| 62|
|Zhiyong| | | |
|Marc| | | |

In [6]:
print(df.shape)
df.head()

(62513863, 7)


Unnamed: 0,time,lat_min,lat_max,lon_min,lon_max,rain,model
0,1889-01-01 12:00:00,-35.439867,-33.574619,141.5625,143.4375,4.244226e-13,MPI-ESM-1-2-HAM
1,1889-01-02 12:00:00,-35.439867,-33.574619,141.5625,143.4375,4.217326e-13,MPI-ESM-1-2-HAM
2,1889-01-03 12:00:00,-35.439867,-33.574619,141.5625,143.4375,4.498125e-13,MPI-ESM-1-2-HAM
3,1889-01-04 12:00:00,-35.439867,-33.574619,141.5625,143.4375,4.251282e-13,MPI-ESM-1-2-HAM
4,1889-01-05 12:00:00,-35.439867,-33.574619,141.5625,143.4375,4.270161e-13,MPI-ESM-1-2-HAM


In [7]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 62513863 entries, 0 to 3541152
Data columns (total 7 columns):
 #   Column   Dtype  
---  ------   -----  
 0   time     object 
 1   lat_min  float64
 2   lat_max  float64
 3   lon_min  float64
 4   lon_max  float64
 5   rain     float64
 6   model    object 
dtypes: float64(5), object(2)
memory usage: 3.7+ GB


In [19]:
# save combined data back to csv
df.to_csv(p_combined, index=False)

In [98]:
# check size of full csv on disk
nn.to_human(p_combined.stat().st_size, prec=1, family='filesize')

'6.0GB'

In [22]:
%%time
%%memit
# Load csv with dask

ddf = dd.read_csv(p_combined, assume_missing=True)

peak memory: 16101.11 MiB, increment: 0.73 MiB
CPU times: user 79.4 ms, sys: 66.1 ms, total: 145 ms
Wall time: 1.63 s


In [23]:
ddf.head()

Unnamed: 0,time,lat_min,lat_max,lon_min,lon_max,rain,model
0,1889-01-01 12:00:00,-35.439867,-33.574619,141.5625,143.4375,4.244226e-13,MPI-ESM-1-2-HAM
1,1889-01-02 12:00:00,-35.439867,-33.574619,141.5625,143.4375,4.217326e-13,MPI-ESM-1-2-HAM
2,1889-01-03 12:00:00,-35.439867,-33.574619,141.5625,143.4375,4.498125e-13,MPI-ESM-1-2-HAM
3,1889-01-04 12:00:00,-35.439867,-33.574619,141.5625,143.4375,4.251282e-13,MPI-ESM-1-2-HAM
4,1889-01-05 12:00:00,-35.439867,-33.574619,141.5625,143.4375,4.270161e-13,MPI-ESM-1-2-HAM


## 3. Python EDA

Here we will investigate and summarize the following approaches to reduce memory usage while performing a simple EDA (find maximum rainfall):

1. Baseline
2. Load data in chunks
3. Load only columns of interest
4. Dask

### 3.1 Baseline

Naive approach, read all columns with pandas `read_csv`

In [4]:
print_max_rain = lambda x: print(f'Max rainfall: {x:.2f} mm/day')

In [28]:
%%time
%%memit

max_rain_baseline = pd.read_csv(p_combined).rain.max()
print_max_rain(max_rain_baseline)

Max rainfall: 432.94 mm/day
peak memory: 14973.23 MiB, increment: 2598.79 MiB
CPU times: user 54.6 s, sys: 4.27 s, total: 58.9 s
Wall time: 59.7 s


### 3.2 Load data in chunks

In [5]:
%%time
%%memit

max_rain_chunks = np.finfo('float64').min

for df_chunk in pd.read_csv(p_combined, chunksize=1_000_000):
    cur_max = df_chunk.rain.max()
    if cur_max > max_rain_chunks:
        max_rain_chunks = cur_max

print_max_rain(max_rain_chunks)

Max rainfall: 432.94 mm/day
peak memory: 1322.71 MiB, increment: 1035.25 MiB
CPU times: user 52.1 s, sys: 2.77 s, total: 54.8 s
Wall time: 55.3 s


### 3.3 Load only columns of interest

In [9]:
%%time
%%memit

df_one_col = pd.read_csv(p_combined, usecols=['rain'])
max_rain_one = df_one_col.rain.max()
print_max_rain(max_rain_one)

Max rainfall: 432.94 mm/day
peak memory: 5452.88 MiB, increment: 890.59 MiB
CPU times: user 25.8 s, sys: 1.14 s, total: 27 s
Wall time: 27.4 s


### 3.4 Dask

In [None]:
%%time
%%memit

ddf = dd.read_csv(p_combined)
max_rain_dask = ddf.rain.max().compute()

print_max_rain(max_rain_dask)

Max rainfall: 432.94 mm/day
peak memory: 5555.89 MiB, increment: 920.82 MiB
CPU times: user 1min 15s, sys: 12.1 s, total: 1min 28s
Wall time: 24.8 s
Max rainfall: 432.94 mm/day
peak memory: 5518.82 MiB, increment: 995.95 MiB
CPU times: user 1min 16s, sys: 12.5 s, total: 1min 28s
Wall time: 25 s


#### Summary
The following table summarizes memory usage and execution time for the simple EDA:

In [19]:
m_results = dict(
    baseline=[14973, 65],
    chunks=[1323, 55],
    single_column=[5452, 27],
    dask=[5529, 25])

pd.DataFrame \
    .from_dict(
        m_results,
        orient='index',
        columns=['Peak Memory Usage (MB)', 'Execution Time (S)']) \
    .rename_axis('Method') \
    .style.pipe(f.bg, rev=False)

Unnamed: 0_level_0,Peak Memory Usage (MB),Execution Time (S)
Method,Unnamed: 1_level_1,Unnamed: 2_level_1
baseline,14973,65
chunks,1323,55
single_column,5452,27
dask,5529,25


#### Observations
- To find the maximum rainfall, in this case we only needed one column in the table, therefor loading all columns was redundant.
- Loading data in chunks reduced our execution time slightly, and greatly reduced peak memory usage.
- Both loading only a single column and Dask had similar memory usage, with Dask executing slightly faster (25s).
- Overall Dask reduced our memory usage by ~1/3 and execution time by ~60%.

## 4. R EDA

### 4.1 Save data to multiple formats
Here we will compare different methods to make the dataframe available in R:

1. Feather
2. Parquet

In [20]:
# load csv and create arrow table
dataset = ds.dataset(p_combined, format='csv')
arrow_table = dataset.to_table()

In [22]:
%%time
%%memit

# save feather file

p_feather = p_data / 'data.feather'
feather.write_feather(arrow_table, p_feather)

peak memory: 9417.54 MiB, increment: 0.36 MiB
CPU times: user 3.24 s, sys: 5.83 s, total: 9.07 s
Wall time: 5.82 s


In [46]:
%%time
%%memit

# save parquet file
# NOTE parquet saves the file in a directory, need to get file path after

p_parquet_dir = p_data / 'parquet'
pq.write_to_dataset(arrow_table, p_parquet_dir)

# get parquet file path
p_parquet = list(p_parquet_dir.glob('*'))[0]

peak memory: 12929.43 MiB, increment: 132.63 MiB
CPU times: user 8.69 s, sys: 1.48 s, total: 10.2 s
Wall time: 10.9 s


In [92]:
# check filesizes on disk
m_results = defaultdict(dict)
m_files = dict(
    feather=p_feather,
    parquet=p_parquet)

for name, p in m_files.items():
    size = p.stat().st_size
    size_human = nn.to_human(size, prec=1, family='filesize')

    m = m_results[name]
    m['size_disk'] = size

    print(f'{name}: {size_human}')


feather: 1.1GB
parquet: 566.1MB


### 4.2 Load data and perform EDA

In [62]:
%%R

library(arrow)
library(dplyr)

# set filepaths in R
p_feather = "data/data.feather"
p_parquet = Sys.glob("data/parquet/*")

In [63]:
%%time
%%R

# load feather
df_feather <- arrow::read_feather(p_feather)

CPU times: user 8.61 s, sys: 22.2 s, total: 30.8 s
Wall time: 12.4 s


In [64]:
%%time
%%R

# load parquet
df_parquet <- arrow::read_parquet(p_parquet)

CPU times: user 9.9 s, sys: 7.23 s, total: 17.1 s
Wall time: 8.9 s


In [66]:
%%R

print(class(df_feather))

max_rainfall <- df_feather %>% select('rain') %>% max()

print(max_rainfall)

[1] "tbl_df"     "tbl"        "data.frame"
[1] NA


#### Summary

In [95]:
# compare results metrics

m_feather = dict(
    peak_memory=9417,
    time_write=5.82,
    time_load=12.4)

m_parquet = dict(
    peak_memory=12929,
    time_write=10.9,
    time_load=8.9)
    
m_results['feather'].update(m_feather)
m_results['parquet'].update(m_parquet)

pd.DataFrame \
    .from_dict(m_results).T \
    .rename_axis('Method') \
    .sort_index(axis=1) \
    .assign(time_total=lambda x: x.time_write + x.time_load) \
    .style \
    .pipe(f.bg, rev=False) \
    .format('{:,.0f}') \
    .format('{:.2f}', subset=['time_write', 'time_load', 'time_total'])

Unnamed: 0_level_0,peak_memory,size_disk,time_load,time_write,time_total
Method,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
feather,9417,1096622306,12.4,5.82,18.22
parquet,12929,566053909,8.9,10.9,19.8


Our team chose to use the `Feather` file format to transfer data between python and R over `Parquet`.

Both file types had similar write, load, and total times to transfer data, and parquet even used half the memory on disk. However, the parquet file type is a bit more involved to deal with (have to handle more complex file paths), as it is primarily mean for long term storage. If we were optimizing for storage size rather than ease of completing a simple EDA, we would have chosen parquet.