In [1]:
import re
import os
import glob
import zipfile
import requests
from urllib.request import urlretrieve
import json
import pandas as pd
from memory_profiler import memory_usage


import dask.dataframe as dd


import pyarrow.dataset as ds
import rpy2_arrow.pyarrow_rarrow as pyra
import pyarrow.feather as feather
import pyarrow.parquet as pq

In [2]:
%load_ext rpy2.ipython
%load_ext memory_profiler

# Milestone 1: Tackling big data on your laptop


---

## 1. Downloading the data

1). Download the data from [figshare](https://figshare.com/articles/dataset/Daily_rainfall_over_NSW_Australia/14096681) to the local computer using the [figshare API](https://docs.figshare.com) using the `requests` library.

In [3]:
article_id = 14096681
url = f"https://api.figshare.com/v2/articles/{article_id}"
headers = {"Content-Type": "application/json"}
output_directory = "figshare_rainfall"

In [4]:
response = requests.request("GET", url, headers=headers)
data = json.loads(response.text)
files = data["files"]
files

[{'is_link_only': False,
  'name': 'daily_rainfall_2014.png',
  'supplied_md5': 'fd32a2ffde300a31f8d63b1825d47e5e',
  'computed_md5': 'fd32a2ffde300a31f8d63b1825d47e5e',
  'id': 26579150,
  'download_url': 'https://ndownloader.figshare.com/files/26579150',
  'size': 58863},
 {'is_link_only': False,
  'name': 'environment.yml',
  'supplied_md5': '060b2020017eed93a1ee7dd8c65b2f34',
  'computed_md5': '060b2020017eed93a1ee7dd8c65b2f34',
  'id': 26579171,
  'download_url': 'https://ndownloader.figshare.com/files/26579171',
  'size': 192},
 {'is_link_only': False,
  'name': 'README.md',
  'supplied_md5': '61858c6cc0e6a6d6663a7e4c75bbd88c',
  'computed_md5': '61858c6cc0e6a6d6663a7e4c75bbd88c',
  'id': 26586554,
  'download_url': 'https://ndownloader.figshare.com/files/26586554',
  'size': 5422},
 {'is_link_only': False,
  'name': 'data.zip',
  'supplied_md5': 'b517383f76e77bd03755a63a8ff83ee9',
  'computed_md5': 'b517383f76e77bd03755a63a8ff83ee9',
  'id': 26766812,
  'download_url': 'https://

In [5]:
%%time
# all the CSV files we are interested in are located in data.zip file
files_to_dl = ["data.zip"]
for file in files:
    if file["name"] in files_to_dl:
        os.makedirs(output_directory, exist_ok=True)
        urlretrieve(file["download_url"], os.path.join(output_directory, file["name"]))

CPU times: user 1.42 s, sys: 2.56 s, total: 3.97 s
Wall time: 1min 8s


2). Extract the zip file programmatically

In [6]:
%%time
with zipfile.ZipFile(os.path.join(output_directory, "data.zip"), 'r') as f:
    f.extractall(output_directory)

CPU times: user 14 s, sys: 2.67 s, total: 16.7 s
Wall time: 27.2 s


---

## 2. Combining data CSVs

- We used `pandas` approach to combine multiple CSVs into a single CSV.
- An extra column called "model" is added which identifies the model eg: for file name "SAM0-UNICON_daily_rainfall_NSW.csv", the model name is labelled as `SAM0-UNICON`
- The comparison of run times and memory usages using `pandas` approach on different machines within our team is documented in the GitHub [issue](](https://github.com/UBC-MDS/grp15_rainfall_analysis/issues/5).
- We observed that memory increment is quite low on all the machines, which means combining multiple CSVs into a single CSV file requires fairly small memory.
- Run times for combining CSV files range between 5~7 minutes.  
    - *Junghoo*
        - peak memory: 359.83 MiB, increment: 0.09 MiB
        - CPU times: user 4min 52s, sys: 9.55 s, total: 5min 22s
        - Wall time: 5min 4s
    - *Micah*
         - peak memory: 347.55 MiB, increment: 0.18 MiB
         - CPU times: user 7min 14s, sys: 26 s, total: 7min 40s
         - Wall time: 7min 57s
    - *Chuang*
         - peak memory: 464.66 MiB, increment: 0.10 MiB
         - CPU times: user 6min 19s, sys: 17 s, total: 6min 36s
         - Wall time: 6min 51s 
    - *Pan*
         - peak memory: 328.88 MiB, increment: 0.25 MiB
         - CPU times: user 5min 31s, sys: 21.3 s, total: 5min 52s
         - Wall time: 5min 59s  



In [7]:
### just listing to get an idea how individual file looks like 
sample_df = pd.read_csv(os.path.join(output_directory, "ACCESS-CM2_daily_rainfall_NSW.csv"))
sample_df.columns

Index(['time', 'lat_min', 'lat_max', 'lon_min', 'lon_max', 'rain (mm/day)'], dtype='object')

In [8]:
### making sure that all files have the same columns
files = glob.glob(os.path.join(output_directory, "*.csv"))
use_cols = list(sample_df.columns)
for file in files:
    try:
        pd.read_csv(file, index_col=0, usecols=use_cols)
    except:
        df = pd.read_csv(file, index_col=0)
        print(f"{os.path.basename(file)} does not have all columns. {os.path.basename(file)} only has {df.columns.to_list()} columns.")

observed_daily_rainfall_SYD.csv does not have all columns. observed_daily_rainfall_SYD.csv only has ['rain (mm/day)'] columns.


In [9]:
%%time
%%memit
# "figshare_rainfall/observed_daily_rainfall_SYD.csv" is missing 'lat_min', 'lat_max', 'lon_min', 'lon_max' columns
files = glob.glob(os.path.join(output_directory, "*_NSW.csv"))

# combining using pandas method
df = pd.concat((pd.read_csv(file, index_col=0, usecols=use_cols)
                .assign(model = os.path.basename(file).split("_")[0])
                for file in files)
              )
df.to_csv(os.path.join(output_directory, "combined_data.csv"))

peak memory: 7293.44 MiB, increment: 6857.14 MiB
CPU times: user 4min 41s, sys: 8.12 s, total: 4min 49s
Wall time: 4min 54s


In [10]:
# free up memory
df = None

---

## 3. Load the combined CSV to memory and perform a simple EDA


#### 1). simple pandas - loading the entire data to the memory


#### ***Observation***
- We can see that the CPU time and Wall time are really close to each other, which might mean that there might be only one CPU processing the work.
- We can also observe that the memory increment 6049.38 MiB is quite high, which means loading the entire dataset consumes a lot of memory.

In [11]:
%%time
%%memit
df = pd.read_csv("figshare_rainfall/combined_data.csv")
print(df["model"].value_counts())

MPI-ESM1-2-HR       5154240
NorESM2-MM          3541230
CMCC-CM2-SR5        3541230
TaiESM1             3541230
CMCC-CM2-HR4        3541230
CMCC-ESM2           3541230
SAM0-UNICON         3541153
GFDL-ESM4           3219300
GFDL-CM4            3219300
FGOALS-f3-L         3219300
EC-Earth3-Veg-LR    3037320
MRI-ESM2-0          3037320
BCC-CSM2-MR         3035340
MIROC6              2070900
ACCESS-CM2          1932840
ACCESS-ESM1-5       1610700
INM-CM4-8           1609650
INM-CM5-0           1609650
KIOST-ESM           1287720
FGOALS-g3           1287720
NESM3                966420
MPI-ESM1-2-LR        966420
AWI-ESM-1-1-LR       966420
MPI-ESM-1-2-HAM      966420
NorESM2-LM           919800
BCC-ESM1             551880
CanESM5              551880
Name: model, dtype: int64
peak memory: 9968.23 MiB, increment: 8463.94 MiB
CPU times: user 46 s, sys: 5.23 s, total: 51.2 s
Wall time: 52.2 s


In [12]:
# see how many rows in the dataset
len(df.index)

62467843

In [13]:
# have a look at the first couple of rows of the data
df.head()

Unnamed: 0,time,lat_min,lat_max,lon_min,lon_max,rain (mm/day),model
0,1889-01-01 12:00:00,-35.439867,-33.574619,141.5625,143.4375,0.03129635,AWI-ESM-1-1-LR
1,1889-01-02 12:00:00,-35.439867,-33.574619,141.5625,143.4375,1.083881e-13,AWI-ESM-1-1-LR
2,1889-01-03 12:00:00,-35.439867,-33.574619,141.5625,143.4375,1.056313e-13,AWI-ESM-1-1-LR
3,1889-01-04 12:00:00,-35.439867,-33.574619,141.5625,143.4375,1.08051e-13,AWI-ESM-1-1-LR
4,1889-01-05 12:00:00,-35.439867,-33.574619,141.5625,143.4375,9.914916e-14,AWI-ESM-1-1-LR


In [14]:
df.dtypes

time              object
lat_min          float64
lat_max          float64
lon_min          float64
lon_max          float64
rain (mm/day)    float64
model             object
dtype: object

#### 2). Changing dtype of the data to reduce memory usage while performing

#### ***Observation***
- By changing the dtype of the 5 out of 7 columns from `float64` to `float32`, we saved half space.

In [15]:
df_reduced_dtype = df[['lat_min', 'lat_max', 'lon_min', 'lon_max', 'rain (mm/day)']].astype('float32', errors='ignore')

In [16]:
print(f"Memory usage with float64: {df[['lat_min', 'lat_max', 'lon_min', 'lon_max', 'rain (mm/day)']].memory_usage().sum() / 1e6:.2f} MB")
print(f"Memory usage with float32: {df_reduced_dtype.memory_usage().sum() / 1e6:.2f} MB")

Memory usage with float64: 2498.71 MB
Memory usage with float32: 1249.36 MB


#### 3). Loading in chunks
#### ***Observation***
- memory increment: 1116.05 MiB is dramatically decreased (it was 6049.38 MiB using pandas) when we load data in small chunks.
- But the CPU time and Wall time are still close to each other, which means the work is still ***not*** executed parallely.

In [17]:
%%time
%%memit
counts = pd.Series(dtype=int)
for chunk in pd.read_csv("figshare_rainfall/combined_data.csv", chunksize=10_000_000):
    counts = counts.add(chunk["model"].value_counts(), fill_value=0)
print(counts.astype(int))

ACCESS-CM2          1932840
ACCESS-ESM1-5       1610700
AWI-ESM-1-1-LR       966420
BCC-CSM2-MR         3035340
BCC-ESM1             551880
CMCC-CM2-HR4        3541230
CMCC-CM2-SR5        3541230
CMCC-ESM2           3541230
CanESM5              551880
EC-Earth3-Veg-LR    3037320
FGOALS-f3-L         3219300
FGOALS-g3           1287720
GFDL-CM4            3219300
GFDL-ESM4           3219300
INM-CM4-8           1609650
INM-CM5-0           1609650
KIOST-ESM           1287720
MIROC6              2070900
MPI-ESM-1-2-HAM      966420
MPI-ESM1-2-HR       5154240
MPI-ESM1-2-LR        966420
MRI-ESM2-0          3037320
NESM3                966420
NorESM2-LM           919800
NorESM2-MM          3541230
SAM0-UNICON         3541153
TaiESM1             3541230
dtype: int64
peak memory: 9242.20 MiB, increment: 1543.43 MiB
CPU times: user 46.8 s, sys: 3.76 s, total: 50.6 s
Wall time: 55.3 s


#### 4). Dask Way

> using `dask` to read that csv file. Internally its loading chunks and doing it parallely.

#### ***Observation***
- memory increment: 1291.08 MiB is dramatically decreased (it was 1116.05 MiB using pandas) when we load data in small chunks.
- However, the CPU time is much greater than the Wall time now, which means the work was done by several processors concurrently.

In [18]:
%%time
%%memit
ddf = dd.read_csv('figshare_rainfall/combined_data.csv')
print(ddf["model"].value_counts().compute())

MPI-ESM1-2-HR       5154240
TaiESM1             3541230
NorESM2-MM          3541230
CMCC-CM2-HR4        3541230
CMCC-CM2-SR5        3541230
CMCC-ESM2           3541230
SAM0-UNICON         3541153
FGOALS-f3-L         3219300
GFDL-CM4            3219300
GFDL-ESM4           3219300
EC-Earth3-Veg-LR    3037320
MRI-ESM2-0          3037320
BCC-CSM2-MR         3035340
MIROC6              2070900
ACCESS-CM2          1932840
ACCESS-ESM1-5       1610700
INM-CM5-0           1609650
INM-CM4-8           1609650
KIOST-ESM           1287720
FGOALS-g3           1287720
MPI-ESM1-2-LR        966420
NESM3                966420
AWI-ESM-1-1-LR       966420
MPI-ESM-1-2-HAM      966420
NorESM2-LM           919800
BCC-ESM1             551880
CanESM5              551880
Name: model, dtype: int64
peak memory: 10653.23 MiB, increment: 2904.00 MiB
CPU times: user 1min 6s, sys: 9.24 s, total: 1min 15s
Wall time: 29.4 s


In [19]:
# free up  memory
df = None
chunks = None
ddf = None

---

## 4. Perform a simple EDA in R

### 1). Loading Datasets into Different Formats

#### a). pandas

In [20]:
%%time
%%memit
df = pd.read_csv("figshare_rainfall/combined_data.csv", nrows=1_000_000)

peak memory: 8511.69 MiB, increment: 7.84 MiB
CPU times: user 795 ms, sys: 165 ms, total: 960 ms
Wall time: 1.25 s


#### b). arrow table format

In [21]:
%%time
%%memit

#loading datasets

dataset = ds.dataset("figshare_rainfall/combined_data.csv", format="csv")
table = dataset.to_table()

peak memory: 12589.49 MiB, increment: 4077.18 MiB
CPU times: user 22.8 s, sys: 2.82 s, total: 25.7 s
Wall time: 23.9 s


#### c). feather format

In [22]:
%%time
%%memit
# writing to feather format
feather.write_feather(table, 'figshare_rainfall/combined.feather')

peak memory: 12592.06 MiB, increment: 0.45 MiB
CPU times: user 2.72 s, sys: 1.12 s, total: 3.83 s
Wall time: 3.32 s


#### d). parquet format

In [23]:
%%time
%%memit
## writing as a single parquet 
pq.write_table(table, 'figshare_rainfall/combined.parquet')

peak memory: 12714.81 MiB, increment: 138.79 MiB
CPU times: user 6.86 s, sys: 477 ms, total: 7.33 s
Wall time: 7.84 s


#### Compare the size of the data in 3 different format.

In [24]:
%%sh
# I am just seeing the size of the csv data
du -sh figshare_rainfall/combined_data.csv
# I am just seeing the size of the feather data
du -sh figshare_rainfall/combined.feather
# I am just seeing the size of the parquet data
du -sh figshare_rainfall/combined.parquet

5.6G	figshare_rainfall/combined_data.csv
1.1G	figshare_rainfall/combined.feather
539M	figshare_rainfall/combined.parquet


### 2). Transfering to R and trying to do simple EDA - `count(model)`

#### a). pandas

> Note: we tried to load the entire dataset into R from python pandas object, but it takes ages :| Therefore, we decided to load just 1 million rows of the data, which is just 1.6% of the whole dataset. However, loading the 1.6% of the whole dataset and performing `count(model)` took same amount of time as it does for the whole dataset in arrow table format. This indicates that the exchange of data between python pandas object and R is a really expensive operation because of the serialization and deserialization.

In [25]:
%%time
%%R -i df
## transferring the python dataframe to R
start_time <- Sys.time()
library(dplyr)
print(class(df))
result <- df %>% count(model)
print(result)
end_time <- Sys.time()
print(end_time - start_time)

R[write to console]: 
Attaching package: ‘dplyr’


R[write to console]: The following objects are masked from ‘package:stats’:

    filter, lag


R[write to console]: The following objects are masked from ‘package:base’:

    intersect, setdiff, setequal, union




[1] "data.frame"
           model      n
1 AWI-ESM-1-1-LR 966420
2        TaiESM1  33580
Time difference of 0.2850685 secs
CPU times: user 22.7 s, sys: 414 ms, total: 23.1 s
Wall time: 25.2 s


#### b). in arrow table format

In [26]:
%%time
%%memit
## Here we are loading the arrow dataframe that we have loaded previously

r_table = pyra.converter.py2rpy(table)

5695
rarrow.ChunkedArray: 0.020232439041137695
5695
rarrow.ChunkedArray: 0.01874542236328125
5695
rarrow.ChunkedArray: 0.019854068756103516
5695
rarrow.ChunkedArray: 0.025022268295288086
5695
rarrow.ChunkedArray: 0.028842687606811523
5695
rarrow.ChunkedArray: 0.026859045028686523
5695
rarrow.ChunkedArray: 0.022825002670288086
peak memory: 12679.73 MiB, increment: 1.15 MiB
CPU times: user 22.3 s, sys: 270 ms, total: 22.5 s
Wall time: 22.7 s


In [None]:
%%time
%%R -i r_table

## arrow Speed
start_time <- Sys.time()
print(class(r_table))
##add details on collect here
library(dplyr)
# Arrow speed
result <- r_table %>% collect() %>% count(model)
print(class(r_table %>% collect()))
end_time <- Sys.time()
print(result)
print(end_time - start_time)

[1] "Table"       "ArrowObject" "R6"         


#### c). in Feather format

In [None]:
%%time
%%R

### Feather speed

library(arrow)
start_time <- Sys.time()
r_table <- arrow::read_feather("figshare_rainfall/combined.feather")
print(class(r_table))
library(dplyr)
result <- r_table %>% count(model)
end_time <- Sys.time()
print(result)
print(end_time - start_time)

#### d). in Parquet format

In [None]:
%%time
%%R

### Parquet speed

library(arrow)
start_time <- Sys.time()
r_table <- arrow::read_parquet("figshare_rainfall/combined.parquet")
print(class(r_table))
library(dplyr)
result <- r_table %>% count(model)
end_time <- Sys.time()
print(result)
print(end_time - start_time)

### 3). Transfering to R and trying to do simple EDA - `summary()`

#### a). pandas

> Note: Again, we tried to load the entire dataset into R from python pandas object, but it takes ages :| Therefore, we decided to load just 1 million rows of the data, which is just 1.6% of the whole dataset. However, loading the 1.6% of the whole dataset and performing `summary()` took same amount of time as it does for the whole dataset in arrow table format. This indicates that the exchange of data between python pandas object and R is a really expensive operation because of the serialization and deserialization.

In [None]:
%%time
%%R -i df

start_time <- Sys.time()
library(dplyr)
print(class(df))
result <- df %>% collect() %>% summary()
end_time <- Sys.time()
print(result)
print(end_time - start_time)

#### b). in arrow table format

In [None]:
%%time
%%R -i r_table

## arrow Speed - running summary()
start_time <- Sys.time()
print(class(r_table))
##add details on collect here
library(dplyr)
# Arrow speed
result <- r_table %>% collect() %>% summary()
print(class(r_table %>% collect()))
end_time <- Sys.time()
print(result)
print(end_time - start_time)

#### c). in Feather format

In [None]:
%%time
%%R

### Feather speed - running summary()

library(arrow)
start_time <- Sys.time()
r_table <- arrow::read_feather("figshare_rainfall/combined.feather")
print(class(r_table))
library(dplyr)
result <- r_table %>% summary()
end_time <- Sys.time()
print(result)
print(end_time - start_time)

#### d). in Parquet format

In [None]:
%%time
%%R

### Parquet speed - running summary()

library(arrow)
start_time <- Sys.time()
r_table <- arrow::read_parquet("figshare_rainfall/combined.parquet")
print(class(r_table))
library(dplyr)
result <- r_table %>% summary()
end_time <- Sys.time()
print(result)
print(end_time - start_time)

### **4). Decision**

Our group chose to go with parquet to transfer the dataframe from Python to R. The file size for our combined data in parquet format was 544mb compared to 5.6GB and 1.0GB for csv and feather formats respectively. Meanwhile in the task of reading the file in R and running a simple `count(model)`, in parquet it had a wall time of 39 seconds which was compared to 40.6 seconds for arrow format and 1 minute and 23 seconds with feather format. 

While speed wise it seems that parquet and arrow do not differ too much, when we consider the file size of a parquet file compared to a csv it becomes the parquet format seems to be the better option.  Thus we decided to go with the parquet format to transfer the dataframe in R. 

We did not consider simply importing the pandas dataframe through `%%R -i` because of the amount of serialization and deserialization that would have to be done when transferring it from Python to R.

### 5) Reflection

#### Challenges or difficulties faced when dealing with this large data:

1. When trying to run this Jupyter notebook without freeing up the memory used by different instances of the dataframe, the notebook kernel kept crashing due to memory issues. This required a workaround by occasionally reassigning `None` to the variables to free up memories taken by unused dataframes, e.g. 

```python
df = None
ddf = None
chunks = None
```

2. Trying to open `combined_data.csv` file using LibreOffice to manually inspect the saved file resulted in the following error message: 
> `The data could not be loaded completely because the maximum number of columns per sheet was exceeded.`