# Milestone 1 - Tackling big data on a laptop

#### Yuanzhe Marco Ma, Matthew Pin, Mark Wang, Tingyu Zhang

* [Download data](#1.-Download-the-data)
* [Combine data](#2.-Combine-the-data)
* [EDA with Python](#3.-EDA-with-python)
* [EDA with R](#4.-EDA-with-R)
* [Reflection](#5.-Reflection)

In [2]:
import re
import os
import glob
import zipfile
import requests
from urllib.request import urlretrieve
import json
import pandas as pd

In [3]:
%load_ext rpy2.ipython
%load_ext memory_profiler

# 1. Download the data 

In [3]:
article_id = 14096681
url = f"https://api.figshare.com/v2/articles/{article_id}"
headers = {"Content-Type": "application/json"}
output_directory = "../data/" # This notebook should be ran mannually

response = requests.request("GET", url, headers=headers)
data = json.loads(response.text)
files = data["files"]

for file in files:
    if file["name"] == "data.zip":
        os.makedirs(output_directory, exist_ok=True)
        urlretrieve(file["download_url"], output_directory + file["name"])

with zipfile.ZipFile(os.path.join(output_directory, "data.zip"), 'r') as f:
    f.extractall(output_directory)

# 2. Combine the data

A peek of the data:

In [4]:
import dask.dataframe as dd

see = dd.read_csv(
    "../data/ACCESS-CM2_daily_rainfall_NSW.csv",
    assume_missing=True,
)

see

Unnamed: 0_level_0,time,lat_min,lat_max,lon_min,lon_max,rain (mm/day)
npartitions=2,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
,object,float64,float64,float64,float64,float64
,...,...,...,...,...,...
,...,...,...,...,...,...


In [5]:
%%time
%%memit

combined_data = dd.from_pandas(pd.DataFrame({"time": [], "lat_min": [], 
                                             "lat_max": [], "lon_min": [], 
                                             "lon_max": [], "rain (mm/day)": [], 
                                             "model": []}), npartitions=1)

for filename in os.listdir(output_directory):
    if filename[-4: ] == ".csv":
        model = filename.partition('_daily_rainfall')[0]
        ddf = dd.read_csv(output_directory + filename, assume_missing=True)
        if len(ddf.columns) == 2:
            ddf['lat_min'] = None
            ddf['lat_max'] = None
            ddf['lon_min'] = None
            ddf['lon_max'] = None
            ddf = ddf[['time', 'lat_min', 'lat_max', 'lon_min', 'lon_max', 'rain (mm/day)']]
        ddf["model"] = model
        combined_data = dd.concat([combined_data, ddf], axis=0)

peak memory: 167.07 MiB, increment: 4.51 MiB
CPU times: user 717 ms, sys: 45.1 ms, total: 762 ms
Wall time: 1.34 s


In [6]:
%%time
%%memit

combined_data.to_csv(output_directory + "combined_data.csv")

peak memory: 1882.38 MiB, increment: 1715.30 MiB
CPU times: user 7min 30s, sys: 37.3 s, total: 8min 7s
Wall time: 7min 14s


### Discussion

**Compare run times and memory usages by using DASK on different machines within our team:**

- Macbook Air 13 inch 2019, 8GB RAM, 1.6 GHz Dual-Core Intel Core i5 
    - peak memory: 1122.01 MiB, increment: 1010.62 MiB, CPU times: user 12min 19s, sys: 55.2 s, total: 13min 14s, Wall time: 12min 14s
- Linux Desktop, 7.1G RAM, AMD Ryzen 5 5 46000h with radeon graphics x 12
    - peak memory: 3246.38 MiB, increment: 3074.11 MiB, CPU times: user 13min 40s, sys: 1min 6s, total: 14min 47s, Wall time: 12min 48s
- Macbook Mini 2018, 8GB RAM, 3.6 GHz-Quad-Core Intel Core i3
    - peak memory: 986.83 MiB, increment: 816.16 MiB, CPU times: user 8min 22s, sys: 31 s, total: 8min 53s, Wall time: 8min 2s
- Macbook Pro 2020 16GB RAM, 1.4 GHz Quad-Core Intel Core i5
    - peak memory: 1882.38 MiB, increment: 1715.30 MiB, CPU times: user 7min 30s, sys:  37.3 s, total: 8min 7s, Wall time: 7min 14s


In general, when we run the task of combining the data, the computer's random access memory (RAM) is heavily utilized. As you can see above, our computers took more than 8 minutes to run, and peak memories were all around 1000 MiB, but one of our team members' peak memories was around 3000 MiB (He uses Linux; everyone else in our team uses MAC System). 

# 3. EDA with python

### Load Data

In [4]:
%%timeit -n 1 -r 1
%%memit

import dask.dataframe as dd

combined_data = dd.read_csv("../data/combined_data.csv/*")
combined_data = combined_data.drop(['Unnamed: 0'], axis=1)

peak memory: 164.46 MiB, increment: 22.59 MiB
1.11 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [5]:
print(combined_data.columns)

Index(['time', 'lat_min', 'lat_max', 'lon_min', 'lon_max', 'rain (mm/day)',
       'model', 'Unnamed: 0.1'],
      dtype='object')


### Method 1: Using Dask only

Find geographical range of all records and mean rainfall level

In [6]:
%%timeit -n 1 -r 1
%%memit

print(f"Minimum lat_min is %0.4f" % 
      combined_data['lat_min'].astype('float64').min().compute())
print(f"Maximum lat_max is %0.4f" % 
      combined_data['lat_max'].astype('float64').max().compute())
print(f"Minimum lon_min is %0.4f" % 
      combined_data['lon_min'].astype('float64').min().compute())
print(f"Maximum lon_max is %0.4f" % 
      combined_data['lon_max'].astype('float64').max().compute())
print(f"Mean rainfall is %0.4f" % 
      combined_data['rain (mm/day)'].astype('float64').mean().compute())

Minimum lat_min is -36.4674
Maximum lat_max is -27.9061
Minimum lon_min is 140.6250
Maximum lon_max is 155.6250
Mean rainfall is 1.9018
peak memory: 1413.84 MiB, increment: 1257.12 MiB
6min 34s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


### Method 2: Using Dask and change datatypes

#### Default datatypes:

In [10]:
print(combined_data.dtypes)

time             object
lat_min          object
lat_max          object
lon_min          object
lon_max          object
rain (mm/day)    object
model            object
dtype: object


#### Changing to `float32`:

In [7]:
numeric_columns = ['lat_min', 'lat_max', 'lon_min', 'lon_max', 'rain (mm/day)']

print("Memory usage with 'object' dtype:")
print(combined_data[numeric_columns].memory_usage().compute())

# Change dtype
combined_data[numeric_columns] = combined_data[numeric_columns].astype('float32')
    
print("\nMemory usage after changing to float32:")
print(combined_data[numeric_columns].memory_usage().compute())

Memory usage with 'object' dtype:
Index                22016
lat_max          500110904
lat_min          500110904
lon_max          500110904
lon_min          500110904
rain (mm/day)    500110904
dtype: int64

Memory usage after changing to float32:
Index                22016
lat_max          250055452
lat_min          250055452
lon_max          250055452
lon_min          250055452
rain (mm/day)    250055452
dtype: int64


#### Perform the same EDA with `float32`

Find geographical range of all records and mean rainfall level

In [8]:
%%timeit -n 1 -r 1
%%memit

print(f"Minimum lat_min is %0.4f" % 
      combined_data['lat_min'].min().compute())
print(f"Maximum lat_max is %0.4f" % 
      combined_data['lat_max'].max().compute())
print(f"Minimum lon_min is %0.4f" % 
      combined_data['lon_min'].min().compute())
print(f"Maximum lon_max is %0.4f" % 
      combined_data['lon_max'].max().compute())
print(f"Mean rainfall is %0.4f" % 
      combined_data['rain (mm/day)'].mean().compute())

Minimum lat_min is -36.4674
Maximum lat_max is -27.9061
Minimum lon_min is 140.6250
Maximum lon_max is 155.6250
Mean rainfall is 1.9018
peak memory: 1533.98 MiB, increment: 1347.66 MiB
8min 37s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


### Observation

Loading:
- Since the `combined_data` was stored into partitions with Dask, we had to load them with Dask also. Dask takes care of loading in chunks and in parallel. 

EDA with Dask:
- Using Dask to perform the EDA, the peak memory usage was around 1413MB, and it took around 6.5 minutes to process all 5 columns. 

EDA with Dask and change dtype:
- Originally, Dask saved all columns as `object` datatype. After changing the numeric columns to `float32`, the memory usage went from around 500MB per column to around 250MB per column. <br> 
- Performing the same EDA yields the same result. The peak memory usage is around the same level ~1500MB. This is because `%memit` measures the memory usage of the operation, instead of the data size. So although the data size decreased, the memory stack occupied by this code cell didn't change much. 
The slight increase was probably due to other programs in the background affecting the memory I/O. 

# 4. EDA with R

## Creating feather file

In [13]:
%%time
%%memit

combined_data.compute().reset_index().to_feather('../data/combined_data_feather.feather')

peak memory: 8807.39 MiB, increment: 8536.95 MiB
CPU times: user 2min 37s, sys: 42.4 s, total: 3min 20s
Wall time: 2min 7s


In [14]:
%%sh
du -sh ../data/combined_data.csv
du -sh ../data/combined_data_feather.feather

6.5G	../data/combined_data.csv
1.2G	../data/combined_data_feather.feather


- We can see the file size is much smaller in a feather file

## Setting up R environment + importing feather file

In [15]:
%load_ext rpy2.ipython

The rpy2.ipython extension is already loaded. To reload it, use:
  %reload_ext rpy2.ipython


In [16]:
%%R

library(arrow)
library(dplyr)

R[write to console]: 
Attaching package: ‘arrow’


R[write to console]: The following object is masked from ‘package:utils’:

    timestamp


R[write to console]: 
Attaching package: ‘dplyr’


R[write to console]: The following objects are masked from ‘package:stats’:

    filter, lag


R[write to console]: The following objects are masked from ‘package:base’:

    intersect, setdiff, setequal, union




In [17]:
%%time
%%R
start_time <- Sys.time()

combined_data_r <- arrow::read_feather("../data/combined_data_feather.feather")
print(class(combined_data_r))
result <- combined_data_r %>% count(time)

end_time <- Sys.time()
print(result)
print(end_time - start_time)

[1] "tbl_df"     "tbl"        "data.frame"
[90m# A tibble: 92,040 x 2[39m
   time                    n
   [3m[90m<chr>[39m[23m               [3m[90m<int>[39m[23m
[90m 1[39m 1889-01-01             29
[90m 2[39m 1889-01-01 12:00:00  [4m1[24m330
[90m 3[39m 1889-01-02             29
[90m 4[39m 1889-01-02 12:00:00  [4m1[24m330
[90m 5[39m 1889-01-03             29
[90m 6[39m 1889-01-03 12:00:00  [4m1[24m330
[90m 7[39m 1889-01-04             29
[90m 8[39m 1889-01-04 12:00:00  [4m1[24m330
[90m 9[39m 1889-01-05             29
[90m10[39m 1889-01-05 12:00:00  [4m1[24m330
[90m# … with 92,030 more rows[39m
Time difference of 19.82407 secs
CPU times: user 18.7 s, sys: 8.44 s, total: 27.1 s
Wall time: 20 s


## perform the EDA with R

In [18]:
%%time
%%R

print(paste0("Number of rows:", nrow(combined_data_r)))
print(paste0("Number of cols:", ncol(combined_data_r)))
print("Column names are...")
print(paste0(colnames(combined_data_r)))

[1] "Number of rows:62513863"
[1] "Number of cols:8"
[1] "Column names are..."
[1] "index"         "time"          "lat_min"       "lat_max"      
[5] "lon_min"       "lon_max"       "rain (mm/day)" "model"        
CPU times: user 13 ms, sys: 6.06 ms, total: 19.1 ms
Wall time: 16 ms


In [19]:
%%time
%%R
class(combined_data_r)

[1] "tbl_df"     "tbl"        "data.frame"
CPU times: user 11.4 ms, sys: 4.79 ms, total: 16.2 ms
Wall time: 12.8 ms


In [20]:
%%time
%%R
head(combined_data_r)

[90m# A tibble: 6 x 8[39m
  index time          lat_min lat_max lon_min lon_max `rain (mm/day)` model     
  [3m[90m<int>[39m[23m [3m[90m<chr>[39m[23m           [3m[90m<dbl>[39m[23m   [3m[90m<dbl>[39m[23m   [3m[90m<dbl>[39m[23m   [3m[90m<dbl>[39m[23m           [3m[90m<dbl>[39m[23m [3m[90m<chr>[39m[23m     
[90m1[39m     0 1889-01-01 1…   -[31m35[39m[31m.[39m[31m4[39m   -[31m33[39m[31m.[39m[31m6[39m    142.    143.        4.24[90me[39m[31m-13[39m MPI-ESM-1…
[90m2[39m     1 1889-01-02 1…   -[31m35[39m[31m.[39m[31m4[39m   -[31m33[39m[31m.[39m[31m6[39m    142.    143.        4.22[90me[39m[31m-13[39m MPI-ESM-1…
[90m3[39m     2 1889-01-03 1…   -[31m35[39m[31m.[39m[31m4[39m   -[31m33[39m[31m.[39m[31m6[39m    142.    143.        4.50[90me[39m[31m-13[39m MPI-ESM-1…
[90m4[39m     3 1889-01-04 1…   -[31m35[39m[31m.[39m[31m4[39m   -[31m33[39m[31m.[39m[31m6[39m    142.    143.        4.25[90me[39

In [21]:
%%time
%%R
tail(combined_data_r)

[90m# A tibble: 6 x 8[39m
   index time           lat_min lat_max lon_min lon_max `rain (mm/day)` model   
   [3m[90m<int>[39m[23m [3m[90m<chr>[39m[23m            [3m[90m<dbl>[39m[23m   [3m[90m<dbl>[39m[23m   [3m[90m<dbl>[39m[23m   [3m[90m<dbl>[39m[23m           [3m[90m<dbl>[39m[23m [3m[90m<chr>[39m[23m   
[90m1[39m [4m1[24m[4m4[24m[4m1[24m958 2014-12-26 12…   -[31m30[39m[31m.[39m[31m2[39m   -[31m29[39m[31m.[39m[31m2[39m    153.    154.            4.44 SAM0-UN…
[90m2[39m [4m1[24m[4m4[24m[4m1[24m959 2014-12-27 12…   -[31m30[39m[31m.[39m[31m2[39m   -[31m29[39m[31m.[39m[31m2[39m    153.    154.            6.69 SAM0-UN…
[90m3[39m [4m1[24m[4m4[24m[4m1[24m960 2014-12-28 12…   -[31m30[39m[31m.[39m[31m2[39m   -[31m29[39m[31m.[39m[31m2[39m    153.    154.            7.86 SAM0-UN…
[90m4[39m [4m1[24m[4m4[24m[4m1[24m961 2014-12-29 12…   -[31m30[39m[31m.[39m[31m2[39m   -[31m29[39m[31m.[39

In [22]:
%%time
%%R
print("5 Number Summary lat_min:")
summary(na.omit(combined_data_r$lat_min))

[1] "5 Number Summary lat_min:"
   Min. 1st Qu.  Median    Mean 3rd Qu.    Max. 
 -36.47  -34.87  -33.00  -33.10  -31.40  -29.90 
CPU times: user 2.31 s, sys: 873 ms, total: 3.18 s
Wall time: 3.19 s


In [23]:
%%time
%%R
print("5 Number Summary lat_max:")
summary(na.omit(combined_data_r$lat_max))

[1] "5 Number Summary lat_max:"
   Min. 1st Qu.  Median    Mean 3rd Qu.    Max. 
 -36.00  -33.66  -32.04  -31.98  -30.16  -27.91 
CPU times: user 2.87 s, sys: 1.11 s, total: 3.98 s
Wall time: 3.99 s


In [24]:
%%time
%%R
print("5 Number Summary lon_min:")
summary(na.omit(combined_data_r$lon_min))

[1] "5 Number Summary lon_min:"
   Min. 1st Qu.  Median    Mean 3rd Qu.    Max. 
  140.6   143.4   146.9   146.9   150.2   153.8 
CPU times: user 2.39 s, sys: 1.24 s, total: 3.63 s
Wall time: 3.64 s


In [25]:
%%time
%%R
print("5 Number Summary lon_max:")
summary(na.omit(combined_data_r$lon_max))

[1] "5 Number Summary lon_max:"
   Min. 1st Qu.  Median    Mean 3rd Qu.    Max. 
  141.2   145.0   148.1   148.2   151.3   155.6 
CPU times: user 2.77 s, sys: 1.33 s, total: 4.1 s
Wall time: 4.11 s


In [26]:
%%time
%%R
print("5 Number Summary Rainfall (mm/day):")
summary(na.omit(combined_data_r$'rain (mm/day)'))

[1] "5 Number Summary Rainfall (mm/day):"
    Min.  1st Qu.   Median     Mean  3rd Qu.     Max. 
  0.0000   0.0000   0.0616   1.9018   1.0213 432.9395 
CPU times: user 3.18 s, sys: 1.54 s, total: 4.72 s
Wall time: 4.73 s


## Why we used a .feather file

A .feather file was used mainly due to its lightweight/portable nature.
- The format was quick for saving the data (less than 5 minutes to save the entire dataset) and loading the data (less than 24 seconds).
- The format was to push a very large dataframe into a significantly smaller file size than what was seen in our CSV file (1.4G vs 6.5G). 
- The format is language agnostic so we will be able to read the dataframe into either python.
- The format was something that we were familiar with from previous projects.

We chose feather instead of Parquet because...
- Technically a Parquet file could have compressed the data further but it would have been more computationally expensive and thus slower. 1.4G is a very reasonable size for a dataset this big.

# 5. Reflection

- Challenges with downloading the data...
    - Downloading data from flagshare took very long time due to internet connection reasons (I am using a VPN). Therefore, I let the program run overnight to obtain all the files.


- Challenges with combining the CSVs into one file...
    - (1) We are only able to use Dask  to read and combine the downloaded .csv files. Whenever pandas  is used, the notebook crashes. Therefore, we only choose Dask to handle the data in this milestone.
    - (2) The format of the saved csv  files is confusing. Although I used the to_csv method, it is saved in a folder with ".csv" in its name. There are multiple .part files in the folder. We spent some time to figure out that we need to read from the individual files in the ".csv" folder, instead of the folder per se.
    - (3) In order to see the first few observations in the combined dataset, I used the .head method. However, because datasets are stored in partitions in Dask , and by default there the head() method only obtains data from the first partition, there was initially not enough data to show. We specified npartitions=10 to have enough data. 

- Challenges with python EDA include...
    - Performing EDA with Dask means that we cannot read only the columns we want. So computing statistics on a specific column is still slow.

- Challenges with the R EDA included...
    - Converting the combined data Dask dataframe to the exact format needed to save it as a feather file