# Milestone 1
## DSCI 525 Web and Cloud Computing
## Group 16
### Authors: Jared Splinter, Steffen Pentelow, Chen Zhao, Ifeanyi Anene

This notebook downloads various observed and simulated rainfall data sets from New South Wales, Australia over the period of 1889 - 2014.  The data are then combined and basic exporatory data analyses are conducted using both Python and R programming languages.

## Import packages

In [1]:
import re
import os
import zipfile
import requests
from urllib.request import urlretrieve
import json
import rpy2.rinterface
import dask.dataframe as dd
import pandas as pd
from memory_profiler import memory_usage
import pyarrow.dataset as ds
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.feather as feather
import rpy2.rinterface
import rpy2_arrow.pyarrow_rarrow as pyra

In [2]:
%load_ext rpy2.ipython
%load_ext memory_profiler



In [3]:
%%R
library(dplyr)
library(arrow)

R[write to console]: 
Attaching package: 'dplyr'


R[write to console]: The following objects are masked from 'package:stats':

    filter, lag


R[write to console]: The following objects are masked from 'package:base':

    intersect, setdiff, setequal, union




## Data Download
The following code chunk downloads the data used in the subsequent analyses.  The data are downloaded from 'figshare.com'.  The file 'data.zip' is saved to a local directory called 'data'.

In [4]:
%%time
%%memit
# Print out time and memory taken for downloading data

# This code is adapted from DSCI 525 lecture demonstration notebook (Gittu George, 2021,
# https://github.ubc.ca/MDS-2020-21/DSCI_525_web-cloud-comp_students/blob/master/Lectures/Lecture_1_2.ipynb)
url = f"https://api.figshare.com/v2/articles/14096681"
headers = {"Content-Type": "application/json"}
output_directory = "../data/"

response = requests.request("GET", url, headers=headers)
data = json.loads(response.text)
files = data["files"]

for file in files:
    if file["name"] in "data.zip":
        os.makedirs(output_directory, exist_ok=True)
        urlretrieve(file["download_url"], output_directory + file["name"])

peak memory: 292.42 MiB, increment: 3.66 MiB
Wall time: 1min 11s


After it has been downloaded locally, 'data.zip' is extracted and stored in the 'data' directory.

In [5]:
%%time
%%memit
# Print out time and memory taken to extract data

with zipfile.ZipFile(os.path.join(output_directory, "data.zip"), "r") as f:
    f.extractall(output_directory)

peak memory: 297.81 MiB, increment: 5.44 MiB
Wall time: 37.5 s


### Discussion: 

Loading each .csv file into memory, concatenating it with the other .csv files, then writing back to disc is computationally and memory intensive. To load each .csv file, it must be deserialized (a process that takes some time with large files), then stored in RAM while being manipulated. Assuming a computer has sufficient RAM to hold all of the files, they can be concatenated together (e.g., using Pandas or Numpy), but then must be serialized again to be saved as a large .csv file. It would be better to be able to concatenate the files directly on disk or at least have them saved in a format where they could be read and written directly to and from disk without serialization/deserialization.

## Combining Data
The following code chunk combines all of the unzipped rainfall data .csv files into a single file called 'combined_data.csv'.  This process is accomplished by creating a pandas dataframe called `full_df`, then one by one loading each .csv file and concatenating it with `full_df`.  This requires that all of the .csv files be read into a pandas dataframe variable and held in RAM at once.  In this case, this requires that almost 7 GB of data be held in RAM and manipulated.  Some computers will not be able to perform this data combining operation because they do not have sufficient RAM.  Even for systems which have sufficient RAM, performing simple operations (such as concatenation) on on a variable of this size are time consuming.  To demonstrate this, below the code chunk, we have included screen shots of the time and memory usage for the execution of this data combining operation.  To summarize, the time taken to complete this operation on each system are listed below (along with some general hardware specifications):
1. Wall time: 7min 9s; Peak memory: 6891.53 MiB
    - Processor: i7-10510U (4 cores, up to 4.90 GHz)
    - RAM: 16 GB
2. Wall time: 9min 46s; Peak memory: 3097.45 MiB
    - Processor: i5
    - RAM: 8 GB
3. Wall time: 6min 5s; Peak memory: 7265.16 MiB
    - Processor: i7-8700K (6 cores, up to 3.70 GHz)
    - RAM: 16 GB
4. Wall time: 23min 43s; Peak memory: 2541.36 MiB
    - Current Computer
    - Processor i5-5300U (4 cores)
    - RAM: 8 GB

In [6]:
%%time
%%memit
# Print out time and memory taken to merge and save csv files

file_names = os.listdir(output_directory)
file_names = [file for file in file_names if file[-4:] == ".csv"]

cols = ["lat_min", "lat_max", "lon_min", "lon_max", "rain (mm/day)"]
full_df = pd.DataFrame(columns=["model"] + cols)
full_df.index.rename("time", inplace=True)

for file in file_names:
    result = re.search("^.*(?=_daily)", file)
    if result:
        model_name = result.group(0)
        full_df = pd.concat(
            [
                full_df,
                pd.read_csv(output_directory + file, index_col=0).assign(
                    model=model_name
                ),
            ]
        )

full_df.to_csv(output_directory + "combined_data.csv")

peak memory: 2541.36 MiB, increment: 2249.50 MiB
Wall time: 23min 43s


> **NOTE:** This notebook was last run with a i5-5300U CPU @ 2.30 GHz with 4 CPUS and 8GB of RAM and thus the run times in this notebook are worse. The runs for the above code cell on other machines are shown below.

1. Processor: i7-10510U (4 cores, up to 4.90 GHz); RAM: 16 GB

![](../img/i7-10510_16GB-SP.png)

2. Processor: 2.3 GHz Quad-Core Intel Core i5; RAM: 8GB

![](../img/i5_8GB.png)

3. Processor: i7-8700K (6 cores, up to 3.70 GHz); RAM: 16 GB

![](../img/i7-8700K_16GB_CZ.png)

## Task 5. Load the combined CSV to memory and perform a simple EDA

### 1. Investigate at least 2 approaches and perform a simple EDA

In [7]:
full_df.head()

Unnamed: 0_level_0,model,lat_min,lat_max,lon_min,lon_max,rain (mm/day)
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
1889-01-01 12:00:00,ACCESS-CM2,-36.25,-35.0,140.625,142.5,3.293256e-13
1889-01-02 12:00:00,ACCESS-CM2,-36.25,-35.0,140.625,142.5,0.0
1889-01-03 12:00:00,ACCESS-CM2,-36.25,-35.0,140.625,142.5,0.0
1889-01-04 12:00:00,ACCESS-CM2,-36.25,-35.0,140.625,142.5,0.0
1889-01-05 12:00:00,ACCESS-CM2,-36.25,-35.0,140.625,142.5,0.01047658


In [8]:
full_df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 62513863 entries, 1889-01-01 12:00:00 to 2014-12-31 12:00:00
Data columns (total 6 columns):
 #   Column         Dtype  
---  ------         -----  
 0   model          object 
 1   lat_min        float64
 2   lat_max        float64
 3   lon_min        float64
 4   lon_max        float64
 5   rain (mm/day)  float64
dtypes: float64(5), object(1)
memory usage: 3.3+ GB


In [9]:
full_df.dtypes

model             object
lat_min          float64
lat_max          float64
lon_min          float64
lon_max          float64
rain (mm/day)    float64
dtype: object

#### Method 1: Loading in Chunks

In [10]:
%%time
%%memit
import dask.dataframe as dd

### Code adapted from DSCI 525 Lecture ipynb notebook (Gittu George, 2021)
counts = pd.Series(dtype=int)
for chunk in pd.read_csv("../data/combined_data.csv", chunksize=10_000_000):
    counts = counts.add(chunk["model"].value_counts(), fill_value=0)
print(counts.astype(int))

ACCESS-CM2          1932840
ACCESS-ESM1-5       1610700
AWI-ESM-1-1-LR       966420
BCC-CSM2-MR         3035340
BCC-ESM1             551880
CMCC-CM2-HR4        3541230
CMCC-CM2-SR5        3541230
CMCC-ESM2           3541230
CanESM5              551880
EC-Earth3-Veg-LR    3037320
FGOALS-f3-L         3219300
FGOALS-g3           1287720
GFDL-CM4            3219300
GFDL-ESM4           3219300
INM-CM4-8           1609650
INM-CM5-0           1609650
KIOST-ESM           1287720
MIROC6              2070900
MPI-ESM-1-2-HAM      966420
MPI-ESM1-2-HR       5154240
MPI-ESM1-2-LR        966420
MRI-ESM2-0          3037320
NESM3                966420
NorESM2-LM           919800
NorESM2-MM          3541230
SAM0-UNICON         3541153
TaiESM1             3541230
observed              46020
dtype: int32
peak memory: 1660.71 MiB, increment: 1372.91 MiB
Wall time: 3min 12s


#### Method 2: Using Dask

In [11]:
%%time
%%memit

### Code adapted from DSCI 525 Lecture ipynb notebook (Gittu George, 2021)

dask_df = dd.read_csv("../data/combined_data.csv")
print(dask_df["model"].value_counts().compute())

MPI-ESM1-2-HR       5154240
TaiESM1             3541230
CMCC-CM2-HR4        3541230
CMCC-CM2-SR5        3541230
CMCC-ESM2           3541230
NorESM2-MM          3541230
SAM0-UNICON         3541153
FGOALS-f3-L         3219300
GFDL-CM4            3219300
GFDL-ESM4           3219300
EC-Earth3-Veg-LR    3037320
MRI-ESM2-0          3037320
BCC-CSM2-MR         3035340
MIROC6              2070900
ACCESS-CM2          1932840
ACCESS-ESM1-5       1610700
INM-CM4-8           1609650
INM-CM5-0           1609650
KIOST-ESM           1287720
FGOALS-g3           1287720
MPI-ESM-1-2-HAM      966420
MPI-ESM1-2-LR        966420
NESM3                966420
AWI-ESM-1-1-LR       966420
NorESM2-LM           919800
CanESM5              551880
BCC-ESM1             551880
observed              46020
Name: model, dtype: int64
peak memory: 1144.84 MiB, increment: 853.45 MiB
Wall time: 1min 43s


#### Method 3: Loading just columns what we want

In [12]:
%%time
%%memit

# The only column we want is the model column
model_df = pd.read_csv("../data/combined_data.csv", usecols=["model"])
print(model_df["model"].value_counts())

MPI-ESM1-2-HR       5154240
CMCC-CM2-SR5        3541230
CMCC-CM2-HR4        3541230
TaiESM1             3541230
CMCC-ESM2           3541230
NorESM2-MM          3541230
SAM0-UNICON         3541153
FGOALS-f3-L         3219300
GFDL-CM4            3219300
GFDL-ESM4           3219300
EC-Earth3-Veg-LR    3037320
MRI-ESM2-0          3037320
BCC-CSM2-MR         3035340
MIROC6              2070900
ACCESS-CM2          1932840
ACCESS-ESM1-5       1610700
INM-CM4-8           1609650
INM-CM5-0           1609650
FGOALS-g3           1287720
KIOST-ESM           1287720
AWI-ESM-1-1-LR       966420
NESM3                966420
MPI-ESM-1-2-HAM      966420
MPI-ESM1-2-LR        966420
NorESM2-LM           919800
BCC-ESM1             551880
CanESM5              551880
observed              46020
Name: model, dtype: int64
peak memory: 1023.29 MiB, increment: 719.91 MiB
Wall time: 1min 46s


### 2. Observations discussion.

- Loading just the column we want seems to have the shortest CPU times (user 30.9 s, sys: 2.3 s, total: 33.2 s) and wall time (33.9 s). 
    - **NOTE:** These numbers come from a previous run, for this run Dask had the shortest run time with wall time of 1min 43s, 3 seconds faster than loading in the column


- Loading the combined data using Dask has a shorter wall time (40.6 s) than loading in chunks, however, it has longer CPU times (user 1min 24s, sys: 18.6 s, total: 1min 43s) than loading in chunks (user 59.6 s, sys: 7.12 s, total: 1min 6s).

- Loading just the column we want has the minimum peak memory and increment used (1166.77 MiB and 780.00 MiB), whilst loading in chunks has the maximum peak memory and increment (1873.48 MiB, increment: 1458.30 MiB). 

- It is also worth noting that the memory usage from full_df.info() was memory usage: 3.3+ GB. Thus, using these methods to load the data all saved us considerable memory space. 

- In conclusion, loading just the column we want gives us the optimal time and space savings. 
    - **NOTE:** In the last run, Dask had a slightly faster wall time, however loading in the column we want is still better considering space. Furthermore, as the wall times between Dask and loading in column method, we can conclude that loading in the column may be preferred.
    

## Task 6. Perform a simple EDA in R

### 1. Store data in different format

Here we will write the data in 2 more different formats to compare the running time and ocuppied storage between different formats. All formats of data in this section including:
- csv format
- feather format
- parquet format

In [13]:
%%time
dataset = ds.dataset("../data/combined_data.csv", format="csv")
table = dataset.to_table()

Wall time: 1min 21s


#### Method 1: Feather Format

In [14]:
%%time
feather.write_feather(table, "../data/example.feather")

Wall time: 39.7 s


#### Method 2: Parquet format

In [15]:
%%time
pq.write_table(table, "../data/example.parquet")

Wall time: 1min 9s


#### Check the size of data in all different formats

In [16]:
%%sh
du -sh ../data/combined_data.csv
du -sh ../data/example.feather
du -sh ../data/example.parquet

5.7G	../data/combined_data.csv
1.1G	../data/example.feather
542M	../data/example.parquet


### Discussion:

- The parquet file format of data takes the least storage, while feather file is twice as big as the parquest file. The csv file is the largest, which we may not want to store on local machines

- It takes less time to write the feather file than it takes to write the parquet file.

- The parquet file takes more time to write but uses less memory when loaded. This is because the data needs more layers of encoding and compressing during the saving process. If we are limited by the storage capability, the parquet file may be ideal. However, if we are more concerned about time, storing data as feather format may be more suitable.

### 2. Transfer the Dataframe from Python to R and perform EDA

Here we will experiment 3 exchange approaches to transfer the loaded dataset from python to R and perform EDA. In the end, we will pick one appropriate approach over others. All exchange approaches in this section including:
- Arrow exchange
- feather file exchange
- parquet file exchange

#### Arrow exchange and EDA

In [17]:
%%time
r_table = pyra.converter.py2rpy(table)

5756
rarrow.ChunkedArray: 0.08099365234375
5756
rarrow.ChunkedArray: 0.08300495147705078
5756
rarrow.ChunkedArray: 0.05200505256652832
5756
rarrow.ChunkedArray: 0.04401063919067383
5756
rarrow.ChunkedArray: 0.05300617218017578
5756
rarrow.ChunkedArray: 0.07300305366516113
5756
rarrow.ChunkedArray: 0.06899619102478027
Wall time: 1min 22s


In [18]:
%%time
%%R -i r_table
start_time <- Sys.time()
head_df <- head(r_table)
glimpse_df <- glimpse(r_table)
model_count <- r_table %>% collect() %>% count(model)
end_time <- Sys.time()
print(class(r_table))
print(head_df)
print(glimpse_df)
print(model_count)
print(end_time - start_time)

Classes 'Table', 'ArrowObject', 'R6' <Table>
  Inherits from: <ArrowObject>
  Public:
    .:xp:.: externalptr
    AddColumn: function (i, new_field, value) 
    cast: function (target_schema, safe = TRUE, options = cast_options(safe)) 
    clone: function (deep = FALSE) 
    column: function (i) 
    ColumnNames: function () 
    columns: active binding
    Equals: function (other, check_metadata = FALSE, ...) 
    field: function (i) 
    Filter: function (i, keep_na = TRUE) 
    GetColumnByName: function (name) 
    initialize: function (xp) 
    invalidate: function () 
    metadata: active binding
    num_columns: active binding
    num_rows: active binding
    pointer: function () 
    print: function (...) 
    RemoveColumn: function (i) 
    RenameColumns: function (value) 
    schema: active binding
    SelectColumns: function (indices) 
    serialize: function (output_stream, ...) 
    set_pointer: function (xp) 
    SetColumn: function (i, new_field, value) 
    Slice: functi

#### Feather file exchange and EDA

In [21]:
%%time
%%R
start_time <- Sys.time()
r_table <- arrow::read_feather("../data/example.feather")
head_df <- head(r_table)
glimpse_df <- glimpse(r_table)
model_count <- r_table %>% count(model)
end_time <- Sys.time()
print(class(r_table))
print(head_df)
print(glimpse_df)
print(model_count)
print(end_time - start_time)

Rows: 62,513,863
Columns: 7
$ time            <dttm> 1889-01-01 04:00:00, 1889-01-02 04:00:00, 1889-01-...
$ model           <chr> "ACCESS-CM2", "ACCESS-CM2", "ACCESS-CM2", "ACCESS-C...
$ lat_min         <dbl> -36.25, -36.25, -36.25, -36.25, -36.25, -36.25, -36...
$ lat_max         <dbl> -35, -35, -35, -35, -35, -35, -35, -35, -35, -35, -...
$ lon_min         <dbl> 140.625, 140.625, 140.625, 140.625, 140.625, 140.62...
$ lon_max         <dbl> 142.5, 142.5, 142.5, 142.5, 142.5, 142.5, 142.5, 14...
$ `rain (mm/day)` <dbl> 3.293256e-13, 0.000000e+00, 0.000000e+00, 0.000000e...
[1] "tbl_df"     "tbl"        "data.frame"
# A tibble: 6 x 7
  time                model      lat_min lat_max lon_min lon_max `rain (mm/day)`
  <dttm>              <chr>        <dbl>   <dbl>   <dbl>   <dbl>           <dbl>
1 1889-01-01 04:00:00 ACCESS-CM2   -36.2     -35    141.    142.        3.29e-13
2 1889-01-02 04:00:00 ACCESS-CM2   -36.2     -35    141.    142.        0.      
3 1889-01-03 04:00:00 ACCESS-CM2  

#### Parquet file exchange and EDA

In [22]:
%%time
%%R
start_time <- Sys.time()
r_table <- arrow::read_parquet("../data/example.parquet")
head_df <- head(r_table)
glimpse_df <- glimpse(r_table)
model_count <- r_table %>% count(model)
end_time <- Sys.time()
print(class(r_table))
print(head_df)
print(glimpse_df)
print(model_count)
print(end_time - start_time)

Rows: 62,513,863
Columns: 7
$ time            <dttm> 1889-01-01 04:00:00, 1889-01-02 04:00:00, 1889-01-...
$ model           <chr> "ACCESS-CM2", "ACCESS-CM2", "ACCESS-CM2", "ACCESS-C...
$ lat_min         <dbl> -36.25, -36.25, -36.25, -36.25, -36.25, -36.25, -36...
$ lat_max         <dbl> -35, -35, -35, -35, -35, -35, -35, -35, -35, -35, -...
$ lon_min         <dbl> 140.625, 140.625, 140.625, 140.625, 140.625, 140.62...
$ lon_max         <dbl> 142.5, 142.5, 142.5, 142.5, 142.5, 142.5, 142.5, 14...
$ `rain (mm/day)` <dbl> 3.293256e-13, 0.000000e+00, 0.000000e+00, 0.000000e...
[1] "tbl_df"     "tbl"        "data.frame"
# A tibble: 6 x 7
  time                model      lat_min lat_max lon_min lon_max `rain (mm/day)`
  <dttm>              <chr>        <dbl>   <dbl>   <dbl>   <dbl>           <dbl>
1 1889-01-01 04:00:00 ACCESS-CM2   -36.2     -35    141.    142.        3.29e-13
2 1889-01-02 04:00:00 ACCESS-CM2   -36.2     -35    141.    142.        0.      
3 1889-01-03 04:00:00 ACCESS-CM2  

### Discussion:

- Although the running time for arrow exchange and EDA seems to be the shortest, we need to count in the time of loading the arrow dataframe as well

- The total time consumption for arrow exhange and EDA, feather file loading and EDA, and parquet file loading and EDA is roughly the same
    - Considering the size of the data we are working with all 3 methods tested run quickly  


- Arrow exchange and the functions associated are still in development. Some functions and applications are limited and may be unstable. Further development is likely needed.

- Exchanging data by writing it to parquet from Python and then reading in R is an efficiently optimized way to deal with large size of data, and it is widely used within industry.

### 3. Summary



Based on our observations, we will pick parquet file format to transfer data from Python to R for the following reasons:

1. The Arrow memory format is an unified way to represent memory for efficient analytic operations. It saves the time and storage for serialization and deserialization. Parquet is a columnar based file format which can write arrow to disk and deal with big data efficiently

2. Earlier we concluded a parquet file can store the data using much less space and memory. This can speed up the process largely when working with the data.

3. Although it takes more time to write a parquet file compared to a feather file, the parquet file features more layers of encoding and compression. When dealing with data size as in this excercise, it is not too much. However, as the parquet file takes the least amount of space and memory compared to other methods it is the fastest when wrangling data.

4. Earlier we concluded that the total time consumption for arrow exhange and EDA, feather file loading and EDA, and parquet file loading and EDA is roughly the same considering the size of data we are working with. However, parquet still manages the be the quickest. When working with large data, these small changes could potentially be really important.

5. Although feather is another good option to work with large data and is very fast, it has only recently been developed. The package is not mature enough and some functions may be unstable.

In conclusion, when dealing with big data, and we want to storage the data for long-term, we will pick parquet file format, as it saves storage and is very efficient when analyzing big data due to its columnar based format. 


# Final Thoughts and Conclusions

> The challenge of working with large data is apparent. Loading in such a large data set proved to be a challenge among every laptop; taking up incredible amounts of space, pushing our laptop CPU's to the limit, and causing long code run times. 
>
>In particular, in one situation, while re-running this notebook the memory on my laptop was completely used up and the kernel crashed. I needed to delete every picture from my laptop to overcome the storage requirements. A few times a process would crash and progress would be lost. In another occassion, an error kept persisting in a code cell one for not enough memory allocation and one for a process being used elsewhere. After some searching online it became apparent that my laptop was not capable of handling everything at once and a restart was needed. There is not much more demoralizing than realizing you need to load in a massive data set again and sit through and wait for the code to run because there is not enough memory, or a windows error. Perhaps that is what I deserve for having a laptop that uses Windows. 
>
> It also became clear throughout this milestone that with large data a simple csv file and traditional data wrangling methods become very long and ineffective. Fortunately, there are other methods such as Dask, Feather/Parquet files, and Arrow exchange that can make working with such a large data set much easier and vastly improve run times. However, the run times when using the most efficient methods explored here can still take some time and memory to run. 
>
> Finally, the importance of a good computer increases when working with large data. Among the machines tested in our group, those with better processors and more RAM were able to run code much faster. If one is working with large data frequently it may be a good investment to get a powerful computer with lots of RAM.