# DSCI 525 - Web and Cloud Computing
## Project: Daily Rainfall Over NSW, Australia
## Milestone 1: Tackling Big Data on Your Laptop 
### Authors: Group 24 Huanhuan Li, Nash Makhija and Nicholas Wu

In [6]:
import re
import os
import glob
import zipfile
import requests
from urllib.request import urlretrieve
import json
import pandas as pd
from memory_profiler import memory_usage
import dask.dataframe as dd

In [7]:
## install the packages https://arrow.apache.org/docs/python/install.html
import pyarrow.dataset as ds
import pyarrow as pa
import pyarrow.parquet as pq
## How to install put instructions https://anaconda.org/conda-forge/rpy2
import rpy2.rinterface
# install this https://pypi.org/project/rpy2-arrow/#description  pip install rpy2-arrow
# have to install this as well conda install -c conda-forge r-arrow 
import rpy2_arrow.pyarrow_rarrow as pyra
### instruction
import pyarrow.feather as feather

In [8]:
%load_ext rpy2.ipython
%load_ext memory_profiler

The rpy2.ipython extension is already loaded. To reload it, use:
  %reload_ext rpy2.ipython
The memory_profiler extension is already loaded. To reload it, use:
  %reload_ext memory_profiler


In [9]:
# Code for this notebook was adapted from DSCI 525 course notes

## 1) Downloading the data

In [10]:
# Necessary metadata
article_id = 14096681  # this is the unique identifier of the article on figshare
url = f"https://api.figshare.com/v2/articles/{article_id}"
headers = {"Content-Type": "application/json"}
output_directory = "../data/"

In [11]:
response = requests.request("GET", url, headers=headers)
data = json.loads(response.text)  # this contains all the articles data, feel free to check it out
files = data["files"]             # this is just the data about the files, which is what we want
files

[{'is_link_only': False,
  'name': 'daily_rainfall_2014.png',
  'supplied_md5': 'fd32a2ffde300a31f8d63b1825d47e5e',
  'computed_md5': 'fd32a2ffde300a31f8d63b1825d47e5e',
  'id': 26579150,
  'download_url': 'https://ndownloader.figshare.com/files/26579150',
  'size': 58863},
 {'is_link_only': False,
  'name': 'environment.yml',
  'supplied_md5': '060b2020017eed93a1ee7dd8c65b2f34',
  'computed_md5': '060b2020017eed93a1ee7dd8c65b2f34',
  'id': 26579171,
  'download_url': 'https://ndownloader.figshare.com/files/26579171',
  'size': 192},
 {'is_link_only': False,
  'name': 'README.md',
  'supplied_md5': '61858c6cc0e6a6d6663a7e4c75bbd88c',
  'computed_md5': '61858c6cc0e6a6d6663a7e4c75bbd88c',
  'id': 26586554,
  'download_url': 'https://ndownloader.figshare.com/files/26586554',
  'size': 5422},
 {'is_link_only': False,
  'name': 'data.zip',
  'supplied_md5': 'b517383f76e77bd03755a63a8ff83ee9',
  'computed_md5': 'b517383f76e77bd03755a63a8ff83ee9',
  'id': 26766812,
  'download_url': 'https://

In [12]:
%%time
files_to_dl = ["data.zip"]
for file in files:
    if file["name"] in files_to_dl:
        os.makedirs(output_directory, exist_ok=True)
        urlretrieve(file["download_url"], output_directory + file["name"])

Wall time: 1min 56s


In [13]:
%%time
with zipfile.ZipFile(os.path.join(output_directory, "data.zip"), 'r') as f:
    f.extractall(output_directory)

Wall time: 17.2 s


## 2) Combine data CSVs

CSVs were combined using `Pandas`.

In [14]:
### just listing to get an idea how individual file looks like 
use_cols = ['time', 'lat_min', 'lat_max', 'lon_min', 'lon_max', 'rain (mm/day)']
df = pd.read_csv("../data/ACCESS-CM2_daily_rainfall_NSW.csv", usecols=use_cols)
df

Unnamed: 0,time,lat_min,lat_max,lon_min,lon_max,rain (mm/day)
0,1889-01-01 12:00:00,-36.25,-35.00,140.625,142.50,3.293256e-13
1,1889-01-02 12:00:00,-36.25,-35.00,140.625,142.50,0.000000e+00
2,1889-01-03 12:00:00,-36.25,-35.00,140.625,142.50,0.000000e+00
3,1889-01-04 12:00:00,-36.25,-35.00,140.625,142.50,0.000000e+00
4,1889-01-05 12:00:00,-36.25,-35.00,140.625,142.50,1.047658e-02
...,...,...,...,...,...,...
1932835,2014-12-27 12:00:00,-30.00,-28.75,151.875,153.75,2.951144e-02
1932836,2014-12-28 12:00:00,-30.00,-28.75,151.875,153.75,2.257118e-01
1932837,2014-12-29 12:00:00,-30.00,-28.75,151.875,153.75,1.204670e-01
1932838,2014-12-30 12:00:00,-30.00,-28.75,151.875,153.75,2.632404e-02


In [15]:
%%time
%memit
# Shows time that regular python takes to merge file
# Join all data together
## here we are using a normal python way of merging the data 
files = glob.glob('../data/*NSW.csv')
df = pd.concat((pd.read_csv(file, index_col=0, usecols=use_cols)
                .assign(model=file[8:file.index("_daily")])
                for file in files)
              )
df.to_csv("../data/combined_data.csv")

peak memory: 427.43 MiB, increment: 0.14 MiB
Wall time: 5min 48s


In [16]:
%%time

df_pandas = pd.read_csv("../data/combined_data.csv")

Wall time: 1min 7s


In [17]:
df_pandas.head()

Unnamed: 0,time,lat_min,lat_max,lon_min,lon_max,rain (mm/day),model
0,1889-01-01 12:00:00,-36.25,-35.0,140.625,142.5,3.293256e-13,ACCESS-CM2
1,1889-01-02 12:00:00,-36.25,-35.0,140.625,142.5,0.0,ACCESS-CM2
2,1889-01-03 12:00:00,-36.25,-35.0,140.625,142.5,0.0,ACCESS-CM2
3,1889-01-04 12:00:00,-36.25,-35.0,140.625,142.5,0.0,ACCESS-CM2
4,1889-01-05 12:00:00,-36.25,-35.0,140.625,142.5,0.01047658,ACCESS-CM2


### 2. Summary of Observation on Run Times and Memory Usage Comparison on Different Machines

#### Team member comparison:

Huanhuan: Total time taken to concatenate and create combined_data.csv was 5min 48s with peak memory usage of 427 MiB. Time taken to read combined_data.csv into pandas was 1min 7s.

Nash: Total time taken to concatenate and create combined_data.csv was 6min 7s with peak memory usage of 359.25 MiB. Time taken to read combined_data.csv into pandas was 1min 15s. Initially had storage issues due to hard drive being close to full storage, had to free up space before I was successfully able to create combined_data.csv

Nicholas: Total time taken to concatenate and create combined_data.csv was 4min 44s with peak memory usage of 397 MiB. Time taken to read combined_data.csv into pandas was 50.5s.

## 3) Load the Combined CSV to Memory and Perform a Simple EDA


### Approach 1. Load the Entire Data to Memory

In [18]:
%%time
%%memit
#simple pandas - This is how we do normally ,which means we are loading the entire data to the memory
df_pandas = pd.read_csv("../data/combined_data.csv")
print(df_pandas["model"].value_counts())

MPI-ESM1-2-HR       5154240
CMCC-ESM2           3541230
NorESM2-MM          3541230
CMCC-CM2-SR5        3541230
CMCC-CM2-HR4        3541230
TaiESM1             3541230
SAM0-UNICON         3541153
FGOALS-f3-L         3219300
GFDL-CM4            3219300
GFDL-ESM4           3219300
MRI-ESM2-0          3037320
EC-Earth3-Veg-LR    3037320
BCC-CSM2-MR         3035340
MIROC6              2070900
ACCESS-CM2          1932840
ACCESS-ESM1-5       1610700
INM-CM5-0           1609650
INM-CM4-8           1609650
KIOST-ESM           1287720
FGOALS-g3           1287720
MPI-ESM1-2-LR        966420
AWI-ESM-1-1-LR       966420
MPI-ESM-1-2-HAM      966420
NESM3                966420
NorESM2-LM           919800
BCC-ESM1             551880
CanESM5              551880
Name: model, dtype: int64
peak memory: 11705.17 MiB, increment: 5155.46 MiB
Wall time: 1min 8s



### Approach 2. Changing `dtype` of the data 

In [19]:
df_pandas.dtypes

time              object
lat_min          float64
lat_max          float64
lon_min          float64
lon_max          float64
rain (mm/day)    float64
model             object
dtype: object

In [20]:
print(f"Memory usage with float64: {df_pandas[['lat_min','lat_max','lon_min', 'lon_max', 'rain (mm/day)']].memory_usage().sum() / 1e6:.2f} MB")
print(f"Memory usage with float32: {df_pandas[['lat_min','lat_max','lon_min', 'lon_max', 'rain (mm/day)']].astype('float32', errors='ignore').memory_usage().sum() / 1e6:.2f} MB")

Memory usage with float64: 2498.71 MB
Memory usage with float32: 1249.36 MB


### Approach 3. Loading in chunks

In [21]:
%%time
%%memit
counts = pd.Series(dtype=int)
for chunk in pd.read_csv("../data/combined_data.csv", chunksize=10_000_000):
    counts = counts.add(chunk["model"].value_counts(), fill_value=0)
print(counts.astype(int))

ACCESS-CM2          1932840
ACCESS-ESM1-5       1610700
AWI-ESM-1-1-LR       966420
BCC-CSM2-MR         3035340
BCC-ESM1             551880
CMCC-CM2-HR4        3541230
CMCC-CM2-SR5        3541230
CMCC-ESM2           3541230
CanESM5              551880
EC-Earth3-Veg-LR    3037320
FGOALS-f3-L         3219300
FGOALS-g3           1287720
GFDL-CM4            3219300
GFDL-ESM4           3219300
INM-CM4-8           1609650
INM-CM5-0           1609650
KIOST-ESM           1287720
MIROC6              2070900
MPI-ESM-1-2-HAM      966420
MPI-ESM1-2-HR       5154240
MPI-ESM1-2-LR        966420
MRI-ESM2-0          3037320
NESM3                966420
NorESM2-LM           919800
NorESM2-MM          3541230
SAM0-UNICON         3541153
TaiESM1             3541230
dtype: int32
peak memory: 7283.21 MiB, increment: 2160.17 MiB
Wall time: 1min 3s


### Approach 4. Load using DASK

In [22]:
%%time
%%memit
# dask way

df_dask = dd.read_csv("../data/combined_data.csv")
print(df_dask["model"].value_counts().compute())

MPI-ESM1-2-HR       5154240
TaiESM1             3541230
NorESM2-MM          3541230
CMCC-CM2-HR4        3541230
CMCC-CM2-SR5        3541230
CMCC-ESM2           3541230
SAM0-UNICON         3541153
FGOALS-f3-L         3219300
GFDL-CM4            3219300
GFDL-ESM4           3219300
EC-Earth3-Veg-LR    3037320
MRI-ESM2-0          3037320
BCC-CSM2-MR         3035340
MIROC6              2070900
ACCESS-CM2          1932840
ACCESS-ESM1-5       1610700
INM-CM5-0           1609650
INM-CM4-8           1609650
KIOST-ESM           1287720
FGOALS-g3           1287720
MPI-ESM1-2-LR        966420
NESM3                966420
AWI-ESM-1-1-LR       966420
MPI-ESM-1-2-HAM      966420
NorESM2-LM           919800
BCC-ESM1             551880
CanESM5              551880
Name: model, dtype: int64
peak memory: 8528.86 MiB, increment: 3188.64 MiB
Wall time: 40 s


### 3. Discussion on Observations

- Loading the entire data to memory takes the longest in wall time. 
- If we change those columns with float64 data type to float32, the memory usage reduced significantly from 2,498 MB to 1,249 MB.
- Loading in chunks reduced total CPU and sys time as well as wall time. 
- Loading with DASK reduced wall time significantly by almost half, but we also noticed that the CPU and sys time became greater than wall time, which is likely because DASK loads the data parallely. 

## 4) Perform a Simple EDA in R

### 1. Store the data in different formats

In [23]:
%%R
#just seeing if its available
library("arrow")
library("dplyr")

R[write to console]: 
Attaching package: 'dplyr'


R[write to console]: The following objects are masked from 'package:stats':

    filter, lag


R[write to console]: The following objects are masked from 'package:base':

    intersect, setdiff, setequal, union




In [24]:
%%time
%%memit
## read more on the datasets here  https://arrow.apache.org/docs/python/dataset.html
dataset = ds.dataset("../data/combined_data.csv", format="csv")
## this is of arrow table format
table = dataset.to_table()

peak memory: 9313.46 MiB, increment: 3875.98 MiB
Wall time: 25.3 s


#### feather format

In [6]:
%%time
# experiment in writing in feather format 
feather.write_feather(table, '../data/example.feather')

Wall time: 2.37 s


#### parquet format

In [7]:
%%time
## writing as a single parquet 
pq.write_table(table, '../data/example.parquet')

Wall time: 13 s


In [8]:
%%time
## writing as a partitioned parquet 
pq.write_to_dataset(table, '../data/example_partitioned.parquet',partition_cols=['model'])

Wall time: 31.1 s


In [9]:
%%sh
# Check the size of different format
du -sh ../data/combined_data.csv
du -sh ../data/example.feather
du -sh ../data/example.parquet
du -sh ../data/example_partitioned.parquet

5.7G	../data/combined_data.csv
1.1G	../data/example.feather
542M	../data/example.parquet
3.8G	../data/example_partitioned.parquet


### 2. Experimenting different approaches

#### Approach 1. Pandas Exchange

In [25]:
%%time
%%memit
#simple pandas: read the entire dataset into memory
df = pd.read_csv("../data/combined_data.csv")

peak memory: 12008.25 MiB, increment: 2700.09 MiB
Wall time: 1min 8s


In [26]:
##I comment out the pandas exchange due to memory limitation.
#%%time
#%%R -i df
### Transferring the python dataframe to R
#start_time <- Sys.time()
#library(dplyr)
#print(class(df))
#result <- df %>% count(model)
#print(result)
#end_time <- Sys.time()
#print(end_time - start_time)

#### Approach 2. Arrow Exchange

In [27]:
%%time
%%memit
dataset = ds.dataset("../data/combined_data.csv", format="csv")
## this is of arrow table format
table = dataset.to_table()

peak memory: 8697.53 MiB, increment: 3488.50 MiB
Wall time: 28.9 s


In [28]:
%%time
%%memit
## Here we are loading the arrow dataframe that we have loaded previously
r_table = pyra.converter.py2rpy(table)

5754
rarrow.ChunkedArray: 0.04399371147155762
5754
rarrow.ChunkedArray: 0.03220534324645996
5754
rarrow.ChunkedArray: 0.02962040901184082
5754
rarrow.ChunkedArray: 0.029552936553955078
5754
rarrow.ChunkedArray: 0.03757882118225098
5754
rarrow.ChunkedArray: 0.03741955757141113
5754
rarrow.ChunkedArray: 0.02851080894470215
peak memory: 8792.96 MiB, increment: 95.89 MiB
Wall time: 28.1 s


In [29]:
%%time
%%R -i r_table
start_time <- Sys.time()
print(class(r_table))
##add details on collect here
library(dplyr)
# Arrow only support some operations check this out https://arrow.apache.org/docs/r/articles/dataset.html
result <- r_table %>% collect() %>% count(model)
print(class(r_table %>% collect()))
end_time <- Sys.time()
print(result)
print(end_time - start_time)

[1] "Table"       "ArrowObject" "R6"         
[1] "tbl_df"     "tbl"        "data.frame"
# A tibble: 27 x 2
   model                  n
   <chr>              <int>
 1 ACCESS-CM2       1932840
 2 ACCESS-ESM1-5    1610700
 3 AWI-ESM-1-1-LR    966420
 4 BCC-CSM2-MR      3035340
 5 BCC-ESM1          551880
 6 CanESM5           551880
 7 CMCC-CM2-HR4     3541230
 8 CMCC-CM2-SR5     3541230
 9 CMCC-ESM2        3541230
10 EC-Earth3-Veg-LR 3037320
# ... with 17 more rows
Time difference of 8.545663 secs
Wall time: 8.7 s


#### Approach 3. Feather File

In [None]:
%%time
%%R
library(arrow)
start_time <- Sys.time()
r_table <- arrow::read_feather("../data/example.feather")
print(class(r_table))
library(dplyr)
result <- r_table %>% count(model)
end_time <- Sys.time()
print(result)
print(end_time - start_time)

#### Approach 4. Parquet File

In [30]:
%%time
%%R
library(arrow)
start_time <- Sys.time()
r_table <- arrow::read_parquet("../data/example.parquet")
print(class(r_table))
library(dplyr)
result <- r_table %>% count(model)
end_time <- Sys.time()
print(result)
print(end_time - start_time)

[1] "tbl_df"     "tbl"        "data.frame"
# A tibble: 27 x 2
   model                  n
   <chr>              <int>
 1 ACCESS-CM2       1932840
 2 ACCESS-ESM1-5    1610700
 3 AWI-ESM-1-1-LR    966420
 4 BCC-CSM2-MR      3035340
 5 BCC-ESM1          551880
 6 CanESM5           551880
 7 CMCC-CM2-HR4     3541230
 8 CMCC-CM2-SR5     3541230
 9 CMCC-ESM2        3541230
10 EC-Earth3-Veg-LR 3037320
# ... with 17 more rows
Time difference of 12.38835 secs
Wall time: 12.5 s


### 3. Discussion on Observations
- Comparing to 5.7G of csv file, feather file formate takes 1.1G, while parquet file formate only takes 542M. Both feather file and parquet file are more space efficent than csv file.
- Exchanging data to R with Pandas, my computer ran out of memory and failed to exchange the data.
- Exchanging data to R with Arrow Exchange only took 8.7s. 
- Exchanging data to R with Feather File failed to run on my computer. 
- Exchanging data to R with Parquet File took 12.5s.       

In this case, we wound choose Arrow Exchange because it is the fastest approach.      
If we want to store the data in hard disk, Parquet File format would be the best choice because it used the least space. Exchanging data to R using Parquet File is also fast. The Parquet is column-oriented data storage format. We are only counting by the column `model`. Parquet only read the column `model` therefore greatly minimized the processing time.