<a href="https://colab.research.google.com/github/adrianmrozo/BigData/blob/master/BigData.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
install.packages("RMySQL")
install.packages("knitr")
install.packages("bookdown")
install.packages("SparkR") #At the moment SparkR does not work 
#Package ‘SparkR’ was removed from the CRAN repository.
#Formerly available versions can be obtained from the archive.
#Archived on 2020-07-10 as check problems were not corrected im time.
#A summary of the most recent check results can be obtained from the check results archive.
#Please use the canonical form https://CRAN.R-project.org/package=SparkR to link to this page.

Installing package into ‘/usr/local/lib/R/site-library’
(as ‘lib’ is unspecified)

Installing package into ‘/usr/local/lib/R/site-library’
(as ‘lib’ is unspecified)

Installing package into ‘/usr/local/lib/R/site-library’
(as ‘lib’ is unspecified)

“package ‘SparkR’ is not available for this version of R

A version of this package for your version of R might be available elsewhere,
see the ideas at
https://cran.r-project.org/doc/manuals/r-patched/R-admin.html#Installing-packages”


Check here R version, in case some packages cannot be loaded:

In [8]:
version

               _                           
platform       x86_64-pc-linux-gnu         
arch           x86_64                      
os             linux-gnu                   
system         x86_64, linux-gnu           
status                                     
major          4                           
minor          0.4                         
year           2021                        
month          02                          
day            15                          
svn rev        80002                       
language       R                           
version.string R version 4.0.4 (2021-02-15)
nickname       Lost Library Book           

In [None]:
---
title: "Big Data Analytics"
subtitle: 'Lecture 8: Cloud Computing, Distributed Systems'
author: |
     | Prof. Dr. Ulrich Matter
     | (University of St. Gallen)
date: "29/04/2021"
output:
  html_document:
    highlight: tango
    theme: cerulean
    mathjax: "http://mathjax.rstudio.com/latest/MathJax.js?config=TeX-MML-AM_CHTML"
  pdf_document:
    pandoc_args:
    - --filter
    - ../../code/math.py
header-includes:
- \usepackage[T1]{fontenc}
- \usepackage{hyperref}
css: ../../style/notes_hsg.css
bibliography: ../references/bigdata.bib
---


```{r set-options, echo=FALSE, cache=FALSE, purl=FALSE}
options(width = 100)
library(knitr)
library(bookdown)
knitr::opts_chunk$set(fig.pos = 'htb!')

```


# Cloud Services for Big Data Analytics

So far we have focused on how to deal with large amounts of data and/or computationally demanding tasks on our local machines (desktop/laptop). A key aspect of this course has thus been in a first step to understand why our local machine is struggling with a data analysis task when there is a large amount of data to be processed. In a second step we have looked into practical solutions to these challenges. These solutions are in essence tools (in this course particularly tools provided in the R environment) to use the key components of our computing environment (CPU, RAM, mass storage) most efficiently:

 - Computationally intense tasks (but not pushing RAM to the limit): parallelization, using several CPU cores (nodes) in parallel.
 - Memory-intense tasks (data still fits into RAM): efficient memory allocation (`data.table`-package).
 - Memory-intense tasks (data does not fit into RAM): efficient use of virtual memory (use parts of mass storage device as virtual memory).
 - Big Data storage: efficient storage (avoid redundancies) and efficient access (speed) with RDBMSs (here: SQLite).
 
In practice, data sets might be too large for our local machine even if we take all of the techniques listed above into account. That is, a parallelized task might still take ages to complete because our local machine has too few cores available, a task involving virtual memory would use up way too much space on our hard-disk, running a large SQL database locally would use up too much resources, etc.

In such situations, we have to think about horizontal and vertical scaling beyond our local machine. That is, we outsource tasks to a bigger machine (or a cluster of machines) to which our local computer is connected over the Internet (or over a local network). While a decade or two ago most organizations had their own large centrally hosted machines (database servers, cluster computers) for such tasks, today they often rely on third-party solutions *'in the cloud'*. That is, specialized companies provide flexible access to computing resources that can be easily accessed via a broadband Internet-connection and rented on an hourly (or even minutes and seconds) basis. Given the obvious economies of scale in this line of business, a few large players have emerged who practically dominate most of the global market:

 - [Amazon Web Services (AWS)](https://aws.amazon.com/).
 - [Microsoft Azure](https://azure.microsoft.com/en-us/)
 - [Google Cloud Platform](https://cloud.google.com/)
 - [IBM Cloud](https://www.ibm.com/cloud/)
 - [Alibaba Cloud](https://www.alibabacloud.com/)
 - [Tencent Cloud](https://intl.cloud.tencent.com/)
 - ...
 
For details on what some of the largest platforms provide, see the overview in the online chapter to @walkowiak_2016 ['Pushing R Further'](https://www.packtpub.com/sites/default/files/downloads/5396_6457OS_PushingRFurther.pdf).

When we use such cloud services to *scale up* (vertical scaling) the computing resources, the transition from our local implementation of a data analytics task to the cloud implementation is often rather simple. Once we have set up a cloud instance and figured out how to communicate with it, we typically can even run the exact same R-script locally or in the cloud. This is usually the case for parallelized tasks (simply run the same script on a machine with more cores), in-memory tasks (rent a machine with more RAM but still use `data.table()` etc.), and working with an SQL database (simply set up the database in the cloud instead of locally).

However, for really memory-intense tasks, the cloud provides options to *scale out* (horizontal scaling). Meaning, a task is distributed among a cluster of computing instances/servers. The implementation of such data analytics tasks is based on a  paradigm that we rather do not encounter when working locally: *Map/Reduce* (implemented in the *Hadoop* framework).

In the following we look first at scaling up more familiar approaches with the help of the cloud and then look at the Map/Reduce concept and how it can be applied in *Hadoop* running on cloud instances.

# Scaling up in the Cloud

In the following examples we use different cloud services provided by AWS. See the online chapter to @walkowiak_2016 ['Pushing R Further'](https://www.packtpub.com/sites/default/files/downloads/5396_6457OS_PushingRFurther.pdf) for how to set up an AWS account and the basics for how to set up AWS instances. The examples below are based on the assumption that the EC2 instance and RStudio Server have been set up essentially as explained in ['Pushing R Further'](https://www.packtpub.com/sites/default/files/downloads/5396_6457OS_PushingRFurther.pdf), pages 22-38. However, AWS changed a few things regarding the way their linux machines are set up since the online chapter was published first. In order to install core R on your ec2 instance, use the following command in the terminal (instead of `sudo yum install R`):

```{bash eval=FALSE}
sudo amazon-linux-extras install R3.4
```

and confirm the installation of additional dependencies with `y`. In a second step, we install the latest CentOS 6-7 version of RStudio Server with the following commands.

```{bash eval=FALSE}
# April 2020
wget https://download2.rstudio.org/server/centos6/x86_64/rstudio-server-rhel-1.2.5033-x86_64.rpm
sudo yum install rstudio-server-rhel-1.2.5033-x86_64.rpm
```



## Parallelization with an EC2 instance

This short tutorial illustrates how to scale the computation of clustered standard errors shown in Lecture 3 up by running it on an AWS EC2 instance. Below we use the same source code as in the original example (see [`03_computation_memory.Rmd`](https://github.com/umatter/BigData/blob/master/materials/notes/03_computation_memory.Rmd)). Note that there are a few things that we need to keep in mind in order to make the script run on an AWS EC2 instance in RStudio Server. 

First, our EC2 instance is a Linux machine. Most of you are probably rather used to running R on a Mac or Windows PC. When running R on a Linux machine, there is an additional step to install R packages (at least for most of the packages): R packages need to be compiled before they can be installed. The command to install packages is exactly the same (`install.packages()`) and normally you only notice a slight difference in the output shown in the R console during installation (and the installation process takes a little longer than what you are used to). Apart from that, using R via RStudio Server in the cloud looks/feels very similar if not identical as when using R/RStudio locally.

Now, let's go through the bootstrap example. First, let's run the non-parallel implementation of the script. When executing the code below line-by-line, you will notice that essentially all parts of the script work exactly as on your local machine. This is one of the great advantages of running R/RStudio Server in the cloud. You can implement your entire data analysis locally (based on a small sample), test it locally, and then move it to the cloud and run it on a larger scale in exactly the same way (even with the same GUI).

```{r eval=FALSE}
# CASE STUDY: PARALLEL ---------------------------

# install packages
install.packages("data.table")
install.packages("doSNOW")

# load packages
library(data.table)


## ------------------------------------------------------------------------
stopdata <- read.csv("https://vincentarelbundock.github.io/Rdatasets/csv/carData/MplsStops.csv")

## ------------------------------------------------------------------------
# remove incomplete obs
stopdata <- na.omit(stopdata)
# code dependent var
stopdata$vsearch <- 0
stopdata$vsearch[stopdata$vehicleSearch=="YES"] <- 1
# code explanatory var
stopdata$white <- 0
stopdata$white[stopdata$race=="White"] <- 1

## ------------------------------------------------------------------------
model <- vsearch ~ white + factor(policePrecinct)

## ------------------------------------------------------------------------
fit <- lm(model, stopdata)
summary(fit)


# bootstrapping: normal approach

## ----message=FALSE-------------------------------------------------------

# set the 'seed' for random numbers (makes the example reproducible)
set.seed(2)

# set number of bootstrap iterations
B <- 50
# get selection of precincts
precincts <- unique(stopdata$policePrecinct)
# container for coefficients
boot_coefs <- matrix(NA, nrow = B, ncol = 2)
# draw bootstrap samples, estimate model for each sample
for (i in 1:B) {
  
  # draw sample of precincts (cluster level)
  precincts_i <- sample(precincts, size = 5, replace = TRUE)
  # get observations
  bs_i <- lapply(precincts_i, function(x) stopdata[stopdata$policePrecinct==x,])
  bs_i <- rbindlist(bs_i)
  
  # estimate model and record coefficients
  boot_coefs[i,] <- coef(lm(model, bs_i))[1:2] # ignore FE-coefficients
}

## ------------------------------------------------------------------------
se_boot <- apply(boot_coefs, 
                 MARGIN = 2,
                 FUN = sd)
se_boot



```


So far, we have only demonstrated that the simple implementation (non-parallel) works both locally and in the cloud. The real purpose of using an EC2 instance in this example is to make use of the fact that we can scale up our instance to have more CPU cores available for the parallel implementation of our bootstrap procedure. Recall that running the script below on our local machine will employ all cores available to an compute the bootstrap resampling in parallel on all these cores. Exactly the same thing happens when running the code below on our simple `t2.micro` instance. However this type of EC2 instance only has one core. You can check this when running the following line of code in RStudio Server (assuming the `doSNOW` package is installed and loaded):

```{r eval=FALSE}
parallel::detectCores()
```

When running the entire parallel implementation below, you will thus notice that it won't compute the bootstrap SE any faster than with the non-parallel version above. However, by simply initiating another EC2 type with more cores, we can distribute the workload across many CPU cores, using exactly the same R-script.

```{r eval=FALSE}

# bootstrapping: parallel approaach

## ----message=FALSE-------------------------------------------------------
# install.packages("doSNOW", "parallel")
# load packages for parallel processing
library(doSNOW)

# get the number of cores available
ncores <- parallel::detectCores()
# set cores for parallel processing
ctemp <- makeCluster(ncores) # 
registerDoSNOW(ctemp)


# set number of bootstrap iterations
B <- 50
# get selection of precincts
precincts <- unique(stopdata$policePrecinct)
# container for coefficients
boot_coefs <- matrix(NA, nrow = B, ncol = 2)

# bootstrapping in parallel
boot_coefs <- 
  foreach(i = 1:B, .combine = rbind, .packages="data.table") %dopar% {
    
    # draw sample of precincts (cluster level)
    precincts_i <- sample(precincts, size = 5, replace = TRUE)
    # get observations
    bs_i <- lapply(precincts_i, function(x) stopdata[stopdata$policePrecinct==x,])
    bs_i <- rbindlist(bs_i)
    
    # estimate model and record coefficients
    coef(lm(model, bs_i))[1:2] # ignore FE-coefficients
    
  }


# be a good citizen and stop the snow clusters
stopCluster(cl = ctemp)



## ------------------------------------------------------------------------
se_boot <- apply(boot_coefs, 
                 MARGIN = 2,
                 FUN = sd)
se_boot



```













### Mass Storage: MariaDB on an EC2 instance

Once we have set up RStudio Server on an EC2 instance, we can run the SQLite examples demonstrated locally in Lecture 7 on it. There are no additional steps needed to install SQLite. However, when using RDBMSs in the cloud, we typically have a more sophisticated implementation than SQLite in mind. Particularly, we want to set up an actual RDBMS-server running in the cloud to which several clients can connect (via RStudio Server). The following example, based on @walkowiak_2016, guides you through the first step to set up such a database in the cloud. To keep things simple, the example sets up a database of the same data set as shown in the first SQLite example in Lecture 7, but this time with MariaDB on an EC2 instance in the cloud. For most of the installation steps you are referred to the respective pages in @walkowiak_2016 (Chapter 5: 'MariaDB with R on a Amazon EC2 instance, pages 255ff). However, since some of the steps shown in the book are outdated, the example below hints to some alternative/additional steps needed to make the database run on an Ubuntu 18.04 machine.

After launching the EC2 instance on AWS, use the following terminal commands to install R:

```{bash eval=FALSE}
# update ubuntu packages
 sudo apt-get update
 sudo apt-get upgrade
```

```{bash eval=FALSE}
sudo apt-get install r-base
```

and to install RStudio Server (on Ubuntu 18.04, as of April 2020):

```{bash eval=FALSE}
sudo apt-get install gdebi-core
wget https://download2.rstudio.org/server/bionic/amd64/rstudio-server-1.2.5033-amd64.deb
sudo gdebi rstudio-server-1.2.5033-amd64.deb
```

Following @walkowiak_2016 (pages 257f), we first set up a new user and give it permissions to `ssh` directly to the EC2 instance (this way we can then more easily upload data 'for this user').

```{bash eval=FALSE}
# create user
sudo adduser umatter 
```

When prompted for additional information just hit enter (for default). Now we can grant the user the permissions

```{bash eval=FALSE}
sudo cp -r /home/ubuntu/.ssh /home/umatter/
cd /home/umatter/
sudo chown -R umatter:umatter .ssh
```

Then install MariaDB as follows.

```{bash eval= FALSE}
sudo apt update
sudo apt install mariadb-server
sudo apt install libmariadbclient-dev
sudo apt install libxml2-dev # needed later (dependency for some R packages)
```

If prompted to set a password for the root database user (user with all database priviledges), type in and confirm the chosen password.^[Below it is shown how to do this 'manually', if not promted at this step.]


#### Data import

With the permissions set above, we can send data from the local machine directly to the instance via `ssh`. We use this to first transfer the raw data to the instance and then import it to the database. 

The aim is to import the same simple data set `economics.csv` used in the local SQLite examples of Lecture 7. Following the instructions of @walkowiak_2016, pages 252 to 254, we upload the `economics.csv` file (instead of the example data used in @walkowiak_2016). Note that in all the code examples below, the username is `umatter`, and the IP-address will have to be replaced with the public IP-address of your EC2 instance.

Open a new terminal window and send the `economics.csv` data as follows to the instance.

```{bash eval= FALSE}
# from the directory where the key-file is stored...
scp -r -i "mariadb_ec2.pem" ~/Desktop/economics.csv umatter@ec2-184-72-202-166.compute-1.amazonaws.com:~/
```

Then switch back to the terminal connected to the instance and start the MariaDB server. 

```{bash eval=FALSE}
# start the MariaDB server
sudo service mysql start
# log into the MariaDB client as root 
sudo mysql -uroot 
```

If not prompted to do so when installing MariaDB (see above), add a new root user in order to login to MariaDB without the `sudo` (here we simply set the password to 'Password1'). 

```{sql eval = FALSE, purl=FALSE}
GRANT ALL PRIVILEGES on *.* to 'root'@'localhost' IDENTIFIED BY 'Password1';
FLUSH PRIVILEGES;
```

Restart the mysql server and log in with the database root user.

```{bash eval=FALSE}
# start the MariaDB server
sudo service mysql restart
# log into the MariaDB client as root 
mysql -uroot -p
```

Now we can initiate a new database called `data1`.

```{sql eval = FALSE, purl=FALSE}
CREATE database data1;
```

To work with the newly created database, we have to 'select' it.

```{sql eval = FALSE, purl=FALSE}
USE data1;
```

Then, we create the first table of our database and import data into it. Note that we only have to slightly adjust the former SQLite syntax to make this work (remove double quotes for field names). In addition, note that we can use the same field types as in the SQLite DB.^[However, MariaDB is a much more sophisticated RDBMS than SQLite and comes with many more field types, see the official [list of supported data types](https://mariadb.com/kb/en/library/data-types/).]

```{sql eval = FALSE, purl=FALSE}
-- Create the new table
CREATE TABLE econ(
date DATE,
pce REAL,
pop INTEGER,
psavert REAL,
uempmed REAL,
unemploy INTEGER
);

```
After following the steps in @walkowiak_2016, pages 259-262, we can import the `economics.csv`-file to the `econ` table in MariaDB (again, assuming the username is `umatter`). Note that the syntax to import data to a table is quite different from the SQLite example in Lecture 7.

```{sql eval = FALSE, purl=FALSE}
LOAD DATA LOCAL INFILE
'/home/umatter/economics.csv' 
INTO TABLE econ
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
IGNORE 1 ROWS;
```

Now we can start using the newly created database from within RStudio Server running on our EC2 instance (following @walkowiak_2016, pages 263ff). 

As in the SQLite examples in Lecture 7, we can now query the database from within the R console (this time using `RMySQL` instead of `RSQLite`, and using R from within RStudio Server in the cloud!). 

First, we need to connect to the newly created MariaDB database.


In [None]:
# install package
#install.packages("RMySQL")
# load packages
library(RMySQL)

# connect to the db
con <- dbConnect(RMySQL::MySQL(), 
                 user = "root",
                 password = "Password1",
                 host = "localhost",
                 dbname = "data1")


In our first query, we select all (`*`) variable values of the observation of January 1968.

In [None]:
# define the query
query1 <- 
"
SELECT * FROM econ
WHERE date = '1968-01-01';
"
# send the query to the db and get the result
jan <- dbGetQuery(con, query1)
jan


```{}
#        date   pce    pop psavert uempmed unemploy
# 1 1968-01-01 531.5 199808    11.7     5.1     2878
```

Now let's select all year/months in which there were more than 15 million unemployed, ordered by date.

In [None]:
query2 <-
"
SELECT date FROM econ 
WHERE unemploy > 15000
ORDER BY date;
"

# send the query to the db and get the result
unemp <- dbGetQuery(con, query2)
head(unemp)

```{}
#         date
# 1 2009-09-01
# 2 2009-10-01
# 3 2009-11-01
# 4 2009-12-01
# 5 2010-01-01
# 6 2010-02-01

```


When done working with the database, we close the connection to the MariaDB database with `dbDisconnect(con)`.

## Distributed Systems/MapReduce

### Map/Reduce Concept: Illustration in R

In order to better understand the basic concept behind the MapReduce-Framework on a distributed system, let's look at how we can combine the basic functions `map()` and `reduce()` in R to implement the basic MapReduce example shown in @walkowiak_2016, Chapter 4, pages 132-134 (this is just to illustrate the underlying idea, *not* to suggest that MapReduce actually is simply an application of the classical `map` and `reduce (fold)` functions in functional programming).^[For a more detailed discussion of what `map` and `reduce` have *actually* to do with MapReduce see [this post](https://medium.com/@jkff/mapreduce-is-not-functional-programming-39109a4ba7b2).] The overall aim of the program is to count the number of times each word is repeated in a given text. The input to the program is thus a text, the output is a list of key-value pairs with the unique words occurring in the text as keys and their respective number of occurrences as values.

In the code example, we will use the following text as input.



In [None]:
input_text <-
"Simon is a friend of Becky.
Becky is a friend of Ann.
Ann is not a friend of Simon."

#### Mapper

The Mapper first splits the text into lines, and then splits the lines into key-value pairs, assigning to each key the value `1`. For the first step we use `strsplit()` that takes a character string as input and splits it into a list of substrings according to the matches of a substring (here `"\n"`, indicating the end of a line).


In [None]:
# Mapper splits input into lines
lines <- as.list(strsplit(input_text, "\n")[[1]])
lines


In a second step, we apply our own function (`map_fun()`) to each line of text via `Map()`. `map_fun()` splits each line into words (keys) and assigns a value of `1` to each key.

In [None]:
# Mapper splits lines into Key-Value pairs
map_fun <-
     function(x){
          
          # remove special characters
          x_clean <- gsub("[[:punct:]]", "", x)
          # split line into words
          keys <- unlist(strsplit(x_clean, " "))
          # initiate key-value pairs
          key_values <- rep(1, length(keys))
          names(key_values) <- keys
          
          return(key_values)
     }

kv_pairs <- Map(map_fun, lines)

# look at the result
kv_pairs

### Reducer

The Reducer first sorts and shuffles the input from the Mapper and then reduces the key-value pairs by summing up the values for each key.

In [None]:
# order and shuffle
kv_pairs <- unlist(kv_pairs)
keys <- unique(names(kv_pairs))
keys <- keys[order(keys)]
shuffled <- lapply(keys,
                    function(x) kv_pairs[x == names(kv_pairs)])
shuffled

Now we can sum up the keys in order to get the word count for the entire input.

In [None]:
sums <- sapply(shuffled, sum)
names(sums) <- keys
sums

### Simpler example: Compute the total number of words


In [None]:
# assigns the number of words per line as value
map_fun2 <- 
     function(x){
          # remove special characters
          x_clean <- gsub("[[:punct:]]", "", x)
          # split line into words, count no. of words per line
          values <- length(unlist(strsplit(x_clean, " ")))
          return(values)
     }
# Mapper
mapped <- Map(map_fun2, lines)
mapped

# Reducer
reduced <- Reduce(sum, mapped)
reduced

## Hadoop Word Count

Example adapted from [this tutorial](https://www.digitalocean.com/community/tutorials/how-to-install-hadoop-in-stand-alone-mode-on-ubuntu-18-04#step-2-%E2%80%94-installing-hadoop) by Melissa Anderson and Hanif Jetha.


### Install Hadoop (on Ubuntu/Pop!_OS Linux)

```{bash eval=FALSE}
# download binary
wget https://downloads.apache.org/hadoop/common/hadoop-2.10.0/hadoop-2.10.0.tar.gz
# download checksum
wget https://www.apache.org/dist/hadoop/common/hadoop-2.10.0/hadoop-2.10.0.tar.gz.sha512

# run the verification
shasum -a 512 hadoop-2.10.0.tar.gz
# compare with value in mds file
cat hadoop-2.10.0.tar.gz.sha512

# if all is fine, unpack
tar -xzvf hadoop-2.10.0.tar.gz
# move to proper place
sudo mv hadoop-2.10.0 /usr/local/hadoop


# then point to this version from hadoop
# open the file /usr/local/hadoop/etc/hadoop/hadoop-env.sh
# in a text editor and add (where export JAVA_HOME=...)
export JAVA_HOME=$(readlink -f /usr/bin/java | sed "s:bin/java::")

# clean up
rm hadoop-2.10.0.tar.gz
rm hadoop-2.10.0.tar.gz.sha512
```


## Run Hadoop

```{bash eval=FALSE}
# check installation
/usr/local/hadoop/bin/hadoop
```

## Run example

The basic Hadoop installation comes with a few examples for very typical map/reduce programs.^[More sophisticated programs need to be custom made, written in Java.] Below we replicate the same word-count example as shown in simple R code above. 

In a first step, we create an input directory where we store the input file(s) to feed to Hadoop.

```{bash eval=FALSE}
# create directory for input files (typically text files)
mkdir ~/input
```

Then we add a textfile containing the same text as in the example as above (to make things simpler, we already remove special characters).

```{bash eval=FALSE}
echo "Simon is a friend of Becky
Becky is a friend of Ann
Ann is not a friend of Simon" >>  ~/input/text.txt

```

Now we can run the MapReduce/Hadoop word count as follows, storing the results in a new directory called `wordcount_example`.

```{bash eval=FALSE}
# run mapreduce word count
/usr/local/hadoop/bin/hadoop jar /usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.10.0.jar wordcount ~/input ~/wc_example
```


Show the output:

```{bash }
cat ~/wc_example/*
```

In [None]:
cat ~/wc_example/*

(Commented out code)
<!-- Next, we can use the following command to run the MapReduce hadoop-mapreduce-examples program, a Java archive with several options. We’ll invoke its grep program, one of the many examples included in hadoop-mapreduce-examples, followed by the input directory, input and the output directory grep_example. The MapReduce grep program will count the matches of a literal word or regular expression. Finally, we’ll supply the regular expression allowed[.]* to find occurrences of the word allowed within or at the end of a declarative sentence. The expression is case-sensitive, so we wouldn’t find the word if it were capitalized at the beginning of a sentence: -->

<!-- ```{bash} -->
<!-- # create directory for input files (typically text files) -->
<!-- mkdir ~/input -->
<!-- # copy example data to input directory -->
<!-- cp /usr/local/hadoop/etc/hadoop/*.xml ~/input -->
<!-- # run mapreduce grep  -->
<!-- # to find occurrences of the word 'allowed' (witin or at the end of a sentence) -->
<!-- /usr/local/hadoop/bin/hadoop jar /usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.10.0.jar grep ~/input ~/grep_example 'allowed[.]*' -->
<!-- ``` -->



<!-- ## Notes on Deploying Hortonworks Sandbox on Azure -->

<!-- The instructions in @walkowiak_2016 Chapter 4, pages 138ff are partly outdated. You will notice that the screenshots do not anymore correspond with the current Azure layout. The Hortonworks tutorial ['Deploying Hortonworks Sandbox on Microsoft Azure'](https://hortonworks.com/tutorial/sandbox-deployment-and-install-guide/section/4/#creating-the-sandbox) is more up to date.^[For a short description of how to set up Hortonworks Sandbox on AWS, see [here](https://community.hortonworks.com/articles/103754/hdp-sandbox-on-aws-1.html).] The layout of Microsoft Azure's dashboard has slightly changed. The settings are essentially the same but might occur an a different order than what you see in the Hortonworks tutorial. There are also small changes in the default settings. You might have to add an additional inbound rule in the 'Networking' settings (either when setting up the virtual machine or after starting it) that allows ssh (port 22) and HTTP (port 80) inbound traffic. -->

<!-- Once you've gone through all the steps in the ['Deploying Hortonworks Sandbox on Microsoft Azure'](https://hortonworks.com/tutorial/sandbox-deployment-and-install-guide/section/4/#creating-the-sandbox) tutorial, open a terminal window and connect to the virtual machine as suggested in the tutorial: -->

<!-- ```{bash eval=FALSE} -->
<!-- ssh azureSandbox -->
<!-- ``` -->

<!-- Open another terminal window and run the following command to connect with the hadoop sandbox. You will be asked for the password. The default password is `hadoop`. After entering the password you will be prompted to change the password (`Changing password for root. (current) UNIX password:`). In order to change it you have to re-enter the current password (`hadoop`) and then enter your new password for root. -->

<!-- ```{bash eval=FALSE} -->
<!-- ssh root@localhost -p 2222  -->
<!-- ``` -->


<!-- ## Notes on working with Hortonworks Sandbox/Hadoop on Azure -->
<!-- *(@walkowiak_2016: A word count example in Hadoop using Java)* -->

<!-- Now everything should be set up and running and you can continue with the tutorial in @walkowiak_2016, pages 152 (unlike in the example in the book, the prompt will read `[root@sandbox-hdp ~]`) with the line `hadoop version`. If you do not want to go through the tutorial as `root`, you can add a user as follows (replacing `<username>` with a user name of your choice). -->

<!-- ```{bash eval=FALSE} -->
<!-- useradd <username> -->
<!-- passwd <username> -->
<!-- ``` -->

<!-- Note that you will have to grant the user additional rights for some of the commands in the book's tutorial. Follow [these instructions](https://superuser.com/a/120342) to do so. -->

<!-- In the following I use the user name `umatter` (with `sudo` permission). In order to connect to the sandbox with the new user, open a new terminal window and type the following command. -->

<!-- ```{bash eval=FALSE} -->
<!-- ssh umatter@localhost -p 2222 -->
<!-- ``` -->


<!-- To transfer the data set as instructed on page 160 in @walkowiak_2016, use the following command in a new Terminal -->

<!-- ```{bash eval=FALSE} -->
<!-- scp -P 2222 -r ~/Desktop/twain_data.txt umatter@localhost:~/data -->
<!-- ``` -->


<!-- Similarly, for uploading the Java classes as instructed on page 162: -->

<!-- ```{bash eval=FALSE} -->
<!--  scp -P 2222 -r ~/Desktop/wordcount umatter@localhost:~/wordcount -->
<!-- ``` -->

<!-- And finally to download the final output of the word count exercise: -->

<!-- ```{bash eval=FALSE} -->
<!-- scp -P 2222 -r umatter@localhost:~/wordcount/wordcount.txt ~/Desktop/wordcount_final.txt -->
<!-- ``` -->


## References


#Lecture 9: Applied Econometrics with Spark; Machine Learning and GPUs'

---
title: "Big Data Analytics"
subtitle: 'Lecture 9: Applied Econometrics with Spark; Machine Learning and GPUs'
author: |
     | Prof. Dr. Ulrich Matter
     | (University of St. Gallen)
date: "06/05/2021"
output:
  bookdown::html_document2:
    highlight: tango
    theme: cerulean
    mathjax: "http://mathjax.rstudio.com/latest/MathJax.js?config=TeX-MML-AM_CHTML"
  pdf_document:
    pandoc_args:
    - --filter
    - ../../code/math.py
header-includes:
  - \usepackage[T1]{fontenc}
  - \usepackage{hyperref}
  - \usepackage{caption}
css: ../../style/notes_hsg.css
bibliography: ../references/bigdata.bib
---


In [3]:
options(width = 100)
library(knitr)
library(bookdown)
knitr::opts_chunk$set(fig.pos = 'htb!')


## Applied Econometrics with Apache Spark
(Commented out code)
<!-- TODO: -->
<!-- - cover http://localhost:4040 briefly -->
<!-- - cover more of sparklyr -->




### Spark basics

Building on the MapReduce model discussed in the previous lecture, [Apache Spark](https://spark.apache.org/) is a cluster computing platform particularly made for data analytics. From the technical perspective (in very simple terms), Spark improves some shortcomings of the older [Hadoop](https://hadoop.apache.org/) platform, further improving efficiency/speed. More importantly for our purposes, in contrast to Hadoop, Spark is specifically made for analytics tasks and therefore more easily accessible for people with an applied econometrics background but no substantial knowledge in MapReduce cluster computing. In particular, it comes with several high-level operators that make it rather easy to implement analytics tasks. Moreover (in contrast to basic Hadoop), its very easy to use interactively from R, Python, Scala, and SQL. This makes the platform much more accessible and worth the while for empirical economic research, even for relatively simple econometric analyses. 

The following figure illustrates the basic components of Spark. The main functionality, including memory management, task scheduling, and the implementation of Spark's capabilities to handle and manipulate data distributed across many nodes in parallel. Several built-in libraries extend the core implementation, covering specific domains of practical data analytics tasks (querying structured data via SQL, processing streams of data, machine learning, and network/graph analysis). The latter two provide various common functions/algorithms frequently used in data analytics/applied econometrics, such as generalized linear regression, summary statistics, and principal component analysis.

```{r sparkstack, echo=FALSE, out.width = "60%", fig.align='center', fig.cap= "(ref:sparkstack)", purl=FALSE}
include_graphics("../img/spark-stack.png")
```


(ref:sparkstack) Basic Spark stack (source: https://spark.apache.org/images/spark-stack.png)

At the heart of big data analytics with Spark is the fundamental data structure called 'resilient distributed dataset' (RDD). When loading/importing data into Spark, the data is automatically distributed across the cluster in RDDs (~ as distributed collections of elements) and manipulations are then executed in parallel in these RDDs. However, the entire Spark framework also works locally on a simple laptop or desktop computer. This is of great advantage when learning Spark and testing/debugging an analytics script on a small sample of the real dataset. 

### Spark in R 

There are two prominent packages to use Spark in connection to R: `SparkR` and RStudio's `sparklyr`, the former is in some ways closer to Spark's Python API, the latter is closer to the `dplyr`-type of data handling (and is 'compatible' with the '`tidyverse`').^[See [this blog post](https://cosminsanda.com/posts/a-compelling-case-for-sparkr/) for a more detailed comparison and discussion of advantages of either package.] For the very simple introductory examples below, either package could have been used equally well. For the general introduction we focus on `SparkR` and later have a look at a simple regression example based on `sparklyr`.

To install and use Spark from the R shell, only a few preparatory steps are needed. The following examples are based on installing/running Spark on a Linux machine with  the `SparkR` package. `SparkR` depends on Java (version 8). Thus, we first should make sure the right Java version is installed. If several Java versions are installed, we might have to select version 8 manually via the following terminal command (Linux). 

(Commented out code)

<!-- <!-- in Mac OS (after doing this: https://stackoverflow.com/questions/21964709/how-to-set-or-change-the-default-java-jdk-version-on-os-x) -->
<!-- ```{bash eval = FALSE} -->
<!-- cd  -->
<!-- source .bash_profile -->
<!-- j8 -->
<!-- ``` -->

<!-- ```{r} -->
<!-- system("cd -->
<!--        source .bash_profile -->
<!--        j8") -->

<!-- ``` -->



In [4]:
#might have to switch to java version 8 first
#sudo update-alternatives --config java (This does not work in Google Colab)


ERROR: ignored

With the right version of Java running, we can install `SparkR` as usual with `install.packages("SparkR")`. After installing `SparkR`, the call `SparkR::install.spark()` will download and install Apache Spark to a local directory. No we can start an interactive Spark session from within R.


In [7]:
# install.packages("SparkR")
# or, if temporarily not available on CRAN:
#if (!require('devtools')) install.packages('devtools')
#devtools::install_github('apache/spark@v2.x.x', subdir='R/pkg') # replace x.x with the version of your spark installation

# load packages
library(SparkR)

# start session
sparkR.session()


ERROR: ignored

By default this starts a local standalone session (no connection to a cluster computer needed). While the examples below are all intended to run on a local machine, it is straightforward to connect to a remote Spark cluster and run the same examples there.^[Simply set the `master` argument of `sparkR.session()` to the URL of the Spark master node of the remote cluster. Importantly, the local Spark and Hadoop versions should match the corresponding versions on the remote cluster.]

#### Data import and summary statistics

First, we want to have a brief look at how to perform the first few steps of a typical econometric analysis: import data and compute summary statistics. We analyze the already familiar `flights.csv` dataset. The basic Spark installation provides direct support to import common data formats such as CSV and JSON  via the `read.df()` function (for many additional formats, specific Spark libraries are available). To import`flights.csv`, we set the `source`-argument to `"csv"`.

In [None]:
# Import data and create a SparkDataFrame (a distributed collection of data, RDD)
flights <- read.df(path="../data/flights.csv", source = "csv", header="true")


# inspect the object
class(flights)
head(flights)


By default, all variables have been imported as type `character`. For several variables this is, of course, not the optimal data type to compute summary statistics. We thus first have to convert some columns to other data types with the `cast` function.


In [None]:
flights$dep_delay <- cast(flights$dep_delay, "double")
flights$dep_time <- cast(flights$dep_time, "double")
flights$arr_time <- cast(flights$arr_time, "double")
flights$arr_delay <- cast(flights$arr_delay, "double")
flights$air_time <- cast(flights$air_time, "double")
flights$distance <- cast(flights$distance, "double")

Suppose we only want to compute average  arrival delays per carrier for flights with a distance over 1000 miles. Variable selection and filtering of observations is implemented in `select()` and `filter()` (as in the `dplyr` package). 

In [None]:
# filter
long_flights <- select(flights, "carrier", "year", "arr_delay", "distance")
long_flights <- filter(long_flights, long_flights$distance >= 1000)
head(long_flights)

Now we summarize the arrival delays for the subset of long flights by carrier. This is the 'split-apply-combine' approach applied in `SparkR`.


In [None]:
# aggregation: mean delay per carrier
long_flights_delays<- summarize(groupBy(long_flights, long_flights$carrier),
                      avg_delay = mean(long_flights$arr_delay))
head(long_flights_delays)

Finally, we want to convert the result back into a usual `data.frame` (loaded in our current R session) in order to further process the summary statistics (output to LaTex table, plot, etc.). Note that as in the previous aggregation exercises with the `ff` package, the computed summary statistics (in the form of a table/df) are obviously much smaller than the raw data. However, note that converting a `SparkDataFrame` back into a native R object generally means all the data stored in the RDDs constituting the `SparkDataFrame` object are loaded into local RAM. Hence, when working with actual big data on a Spark cluster, this type of operation can quickly overflow local RAM.


In [None]:
# Convert result back into native R object
delays <- collect(long_flights_delays)
class(delays)
delays


#### Regression analysis with `sparklyr`

Suppose we want to conduct a correlation study of what factors are associated with more or less arrival delay. Spark provides via its built-in 'MLib' library several high-level functions to conduct regression analyses. When calling these functions via `sparklyr` (or `SparkR`), their usage is actually very similar to the usual R packages/functions commonly used to run regressions in R. 

As a simple point of reference, we first estimate a linear model with the usual R approach (all computed in the R environment). First, we load the data as a common `data.table`. We could also convert a copy of the entire `SparkDataFrame` object to a `data.frame` or `data.table` and get essentially the same outcome. However, collecting the data from the RDD structure would take much longer than parsing the csv with `fread`. In addition, we only import the first 300 rows. Running regression analysis with relatively large datasets in Spark on a small local machine might fail or be rather slow.^[Again, it is important to keep in mind that running Spark on a small local machine is only optimal for learning and testing code (based on relatively small samples). The whole framework is not optimized to be run on a small machine but for cluster computers.]


In [None]:
# flights_r <- collect(flights) # very slow!
flights_r <- data.table::fread("../data/flights.csv", nrows = 300) 


Now we run a simple linear regression (OLS) and show the summary output.


In [None]:
# specify the linear model
model1 <- arr_delay ~ dep_delay + distance
# fit the model with ols
fit1 <- lm(model1, flights_r)
# compute t-tests etc.
summary(fit1)

Now we aim to compute essentially the same model estimate in `sparklyr`.^[Most regression models commonly used in traditional applied econometrics are in some form provided in `sparklyr` or `SparkR`. See the package documentations for more details.] In order to use Spark via the `sparklyr` package we need to first load the package and establish a connection with Spark (similar to `SparkR::sparkR.session()`).


In [None]:
library(sparklyr)

# connect with default configuration
sc <- spark_connect(master = "local", 
                    version = "2.4.5")

We then copy the data.table `flights_r` (previously loaded into our R session) to Spark. Again, working on a normal laptop this seems trivial, but the exact same command would allow us (when connected with Spark on a cluster computer in the cloud) to properly load and distribute the data.table on the cluster. Finally, we then fit the model with `ml_linear_regression()` and compute 

In [None]:
# load data to spark
flights3 <- copy_to(sc, flights_r, "flights3")
# fit the model
fit1_spark <- ml_linear_regression(flights3, formula = model1)
# compute summary stats
summary(fit1_spark)

Alternatively, we can do essentially the same with the `SparkR` package:

In [None]:
# create SparkDataFrame
flights3 <- createDataFrame(flights_r)
# fit the model
fit2_spark <- spark.glm( formula = model1, data = flights3 , family="gaussian")
# compute t-tests etc.
summary(fit2_spark)

## GPUs for Scientific Computing

The success of the computer games industry in the late 1990s/early 2000s led to an interesting positive externality for scientific computing. The ever more demanding graphics of modern computer games and the huge economic success of the computer games industry set incentives for hardware producers to invest in research and development of more powerful 'graphic cards', extending a normal PC/computing environment with additional computing power solely dedicated to graphics. At the heart of these graphic cards are so-called GPUs (Graphic Processing Units), microprocessors specifically optimized for graphics processing. The image below depicts a modern graphics card with NVIDIA GPUs, which is quite common in today's 'gaming' PCs. 


```{r nvidiagpu, echo=FALSE, out.width = "60%", fig.align='center', purl=FALSE}
include_graphics("../img/nvidia_geeforce.png")
```


Why did the hardware industry not simply invest in the development of more powerful CPUs to deal with the more demanding PC games? The main reason is that the architecture of CPUs is designed not only for efficiency but also flexibility. That is, a CPU needs to perform well in all kind of computations, some parallel, some sequential, etc. Computing graphics is a comparatively narrow domain of computation and desining a processing unit architecture that is custom-made to excell just at this one task is thus much more cost efficient. Interestingly, this graphics-specific architecture  (specialized on highly parallel numerical [floating point] workloads) turns out to be also very usefull in some core scientific computing tasks. In particular, matrix multiplications (see @fatahalian_etal2004 for a detailed discussion of why that is the case). A key aspect of GPUs is that they are composed of several multiprocessor units, of which each has in turn several cores. GPUS thus can perform computations with hundreds or even thousands of threads in parallel. The figure below illustrates this point.


```{r nvidia_architecture, echo=FALSE, out.width = "60%", fig.align='center', fig.cap= "(ref:nvidiaarchitecture)", purl=FALSE}
include_graphics("../img/nvidia_gpu.png")
```



(ref:nvidiaarchitecture) Typical NVIDIA GPU architecture (illustration and notes by @hernandez_etal2013): The GPU is comprised of a set of Streaming MultiProcessors (SM). Each SM is comprised of several Stream Processor (SP) cores, as shown for the NVIDIA’s Fermi architecture (a). The GPU resources are controlled by the programmer through the CUDA programming model, shown in (b).

While, intially, programming GPUs for scientific computing required a very good understanding of the hardware. Graphics card producers have realized that there is an additional market for their products (in particular with the recent rise of deep learning), and provide several high-level APIs to use GPUs for other tasks than graphics processing. Over the last few years more  high-level software has been developed, which makes it much easier to use GPUs in parallel computing tasks. The following subsections shows some examples of such software in the R environment.^[Note that while these examples are easy to implement and run, setting up a GPU for scientific computing still can involve many steps and some knowledge of your computer's system. The examples presuppose that all installation and configuration steps (GPU drivers, CUDA, etc.) have already been completed successfully.]




### GPUs in R
(Commented out code)

<!-- ## Installation -->

<!-- This is for pop OS machines. Install drivers etc. for NVIDIA card -->
<!-- ```{bash eval=FALSE} -->
<!-- # sudo apt install tensorflow-cuda-latest -->
<!-- ``` -->

<!-- Install OpenCL -->

<!-- ```{bash eval=FALSE} -->
<!-- # sudo apt install tensorflow-cuda-latest -->
<!-- ``` -->


<!-- Install `gpuR` in R (`install.packages("gpuR")`). -->






### Example I: Matrix multiplication comparison (`gpuR`)

The `gpuR` package provides basic R functions to compute with GPUs from  within the R environmnent. In the following example we compare the performance of the CPU with the GPU based on a matrix multiplication exercise. For a large $N\times P$ matrix $X$, we want to compute $X^tX$.


In a first step, we load the `gpuR`-package.^[As with the setting up of GPUs on your machine in general, installing all prerequisites to make `gpuR` work on your local machine can be a bit of work and can depend a lot on your system.] Note the output to the console. It shows the type of GPU  identified by `gpuR`. This is the platform on which `gpuR` will compute the GPU examples. In order to compare the performances, we also load the `bench` package used in previous lectures.

In [None]:
# load package
library(bench)
library(gpuR)

Next, we initiate a large matrix filled with pseudo random numbers, representing a dataset with $N$ observations and $P$ variables.

In [None]:
# initiate dataset with pseudo random numbers
N <- 10000  # number of observations
P <- 100 # number of variables
X <- matrix(rnorm(N * P, 0, 1), nrow = N, ncol =P)

For the GPU examples to work, we need one more preparatory step. GPUs have their own memory, which they can access faster than they can access RAM. However, this GPU memory is typically not very large compared to the memory CPUs have access to. Hence, there is a potential trade-off between losing some efficiency but working with more data or vice versa.^[If we instruct the GPU to use the own memory, but the data does not fit in it, the program will result in an error.] Here, we show both variants. With `gpuMatrix()` we create an object representing matrix `X` for computation on the GPU. However, this only points the GPU to the matrix and does not actually transfer data to the GPU's memory. The latter is done in the other variant with `vclMatrix()`.


In [None]:
# prepare GPU-specific objects/settings
gpuX <- gpuMatrix(X, type = "float")  # point GPU to matrix (matrix stored in non-GPU memory)
vclX <- vclMatrix(X, type = "float")  # transfer matrix to GPU (matrix stored in GPU memory)


Now we run the three examples: first, based on standard R, using the CPU. Then, computing on the GPU but using CPU memory. And finally, computing on the GPU and using GPU memory. In order to make the comparison fair, we force `bench::mark()` to run at least 20 iterations per benchmarked variant.

In [None]:
# compare three approaches
(gpu_cpu <- bench::mark(
  
  # compute with CPU 
  cpu <- t(X) %*% X,
  
  # GPU version, GPU pointer to CPU memory (gpuMatrix is simply a pointer)
  gpu1_pointer <- t(gpuX) %*% gpuX,
  
  # GPU version, in GPU memory (vclMatrix formation is a memory transfer)
  gpu2_memory <- t(vclX) %*% vclX,
 
check = FALSE, memory = FALSE, min_iterations = 20))

The performance comparison is visualized with boxplots.

In [None]:
plot(gpu_cpu, type = "boxplot")

### Example II: Basic statistics (`Rth`)
(Commented out code here)


<!-- Install package  -->
<!-- - needs cuda and thrust (in latest cuda versions included). -->
<!-- - gcc version 8 or younger (use ` sudo update-alternatives --config gcc` to switch to version 8 if necessary). -->
<!-- - point directly to thrust when installing -->
<!--  STILL DOES NOT WORK -->
<!-- ```{r} -->
<!-- devtools::install_github("matloff/Rth", configure.args = "--with-thrust-home=/usr/lib/cuda-10.2/targets/x86_64-linux/include/thrust/ --with-backend=CUDA") -->
<!-- ``` -->



<!-- Load package -->
<!-- ```{r} -->
<!-- # load packages -->
<!-- library(Rth) -->
<!-- ``` -->

<!-- Compute histogram and correlation of a 100 million row dataset. -->

<!-- Create the dataset (based on pseudo random numbers) -->
<!-- ```{r} -->
<!-- # set fix vars -->
<!-- N <- 100000000 # number of observations -->
<!-- P <- 2 # number of variables -->

<!-- # draw random dataset -->
<!-- randat <- matrix(runif(N*P), ncol = P) -->

<!-- ``` -->


<!-- Compute histogram of first var -->
<!-- ```{r} -->
<!-- var1 <- randat[,1] -->
<!-- rthhist("var1") -->
<!-- Rth::rthhist("var1") -->
<!-- ``` -->



<!-- Micro benchmarking of correlation computation -->



<!-- # Extended Example: OLS -->
<!-- Here, the CPU is faster! -->

<!-- ```{r} -->

<!-- # Example taken from https://www.arc.vt.edu/wp-content/uploads/2017/04/GPU_R_Workshop_2017_slidy.html#13 -->

<!-- set.seed(123456) -->
<!-- np <- 40  #number of predictors -->
<!-- nr <- 1e+05  #number of observations -->
<!-- X <- cbind(5, 1:nr, matrix(rnorm((np - 1) * nr, 0, 0.01), nrow = nr, ncol = (np - -->
<!--     1))) -->
<!-- beta <- matrix(c(1, 3, runif(np - 1, 0, 0.2)), ncol = 1) -->
<!-- y <- X %*% beta + matrix(rnorm(nr, 0, 1), nrow = nr, ncol = 1) -->
<!-- # CPU bound version, slight optimize via crossprod but otherwise vanilla -->
<!-- time2 <- system.time({ -->
<!--     ms2 <- solve(crossprod(X), crossprod(X, y)) -->
<!-- }) -->
<!-- # GPU version, GPU pointer to CPU memory!! (gpuMatrix is simply a pointer) -->
<!-- gpuX = gpuMatrix(X, type = "float")  #point GPU to matrix -->
<!-- gpuy = gpuMatrix(y, type = "float") -->
<!-- time4 <- system.time({ -->
<!--     ms4 <- gpuR::solve(gpuR::crossprod(gpuX), gpuR::crossprod(gpuX, gpuy)) -->
<!-- }) -->
<!-- # GPU version, in GPU memory!! (vclMatrix formation is a memory transfer) -->
<!-- vclX = vclMatrix(X, type = "float")  #push matrix to GPU -->
<!-- vcly = vclMatrix(y, type = "float") -->
<!-- time5 <- system.time({ -->
<!--     ms5 <- gpuR::solve(gpuR::crossprod(vclX), gpuR::crossprod(vclX, vcly)) -->
<!-- }) -->

<!-- data.frame(CPU=time3[3], -->
<!--            GPU_CPUmem=time4[3], -->
<!--            GPU_GPUmem=time5[3]) -->
<!-- ``` -->

## GPUs and Machine Learning

A most common application of GPUs for scientific computing is machine learning, in particular deep learning (machine learning based on artificial neural networks). Training deep learning models can be very computationally intense and to an important part depends on tensor (matrix) multiplications. This is also an area where you might come across highly parallelized computing based on GPUs without even noticing it, as the now commonly used software to build and train deep neural nets ([tensorflow](https://www.tensorflow.org/), and the high-level [Keras](https://keras.io/) API) can easily be run on a CPU or GPU without any further configuration/preparation (apart from the initial installation of these programs). The example below is a simple illustration of how such techniques can be used in an econometrics context.

(Commented out code here)

<!-- ```{r warning=FALSE, message=FALSE} -->

<!-- library(keras) -->
<!-- mnist <- dataset_mnist() -->
<!-- x_train <- mnist$train$x -->
<!-- y_train <- mnist$train$y -->
<!-- x_test <- mnist$test$x -->
<!-- y_test <- mnist$test$y -->


<!-- # reshape -->
<!-- x_train <- array_reshape(x_train, c(nrow(x_train), 784)) -->
<!-- x_test <- array_reshape(x_test, c(nrow(x_test), 784)) -->
<!-- # rescale -->
<!-- x_train <- x_train / 255 -->
<!-- x_test <- x_test / 255 -->


<!-- y_train <- to_categorical(y_train, 10) -->
<!-- y_test <- to_categorical(y_test, 10) -->



<!-- model <- keras_model_sequential() -->
<!-- model %>% -->
<!--   layer_dense(units = 256, activation = 'relu', input_shape = c(784)) %>% -->
<!--   layer_dropout(rate = 0.4) %>% -->
<!--   layer_dense(units = 128, activation = 'relu') %>% -->
<!--   layer_dropout(rate = 0.3) %>% -->
<!--   layer_dense(units = 10, activation = 'softmax') -->


<!-- summary(model) -->


<!-- model %>% compile( -->
<!--   loss = 'categorical_crossentropy', -->
<!--   optimizer = optimizer_rmsprop(), -->
<!--   metrics = c('accuracy') -->
<!-- ) -->


<!-- history <- model %>% fit( -->
<!--   x_train, y_train, -->
<!--   epochs = 30, batch_size = 128, -->
<!--   validation_split = 0.2 -->
<!-- ) -->


<!-- plot(history) -->
<!-- ``` -->


###  Tensorflow/Keras example: predict housing prices

In this example we train a simple sequential model with two hidden layers in order to predict the median value of owner-occupied homes (in USD 1,000) in the Boston area (data are from the 1970s). The original data and a detailed description can be found [here](https://www.cs.toronto.edu/~delve/data/boston/bostonDetail.html). The example follows closely [this keras tutorial](https://keras.rstudio.com/articles/tutorial_basic_regression.html#the-boston-housing-prices-dataset) published by RStudio. See [RStudio's keras installation guide](https://keras.rstudio.com/index.html) for how to install keras (and tensorflow) and the corresponding R package `keras`.^[This might involve the installation of additional packages and software outside the R environment.] While the purpose of the example here is to demonstrate a typical (but very simple!) usage case of GPUs in machine learning, the same code should also run on a normal machine (without using GPUs) with a default installation of keras.

Apart from `keras`, we load packages to prepare the data and visualize the output. Via `dataset_boston_housing()`, we load the dataset (shipped with the keras installation) in the format preferred by the `keras` library.



In [None]:
if (Sys.info()["sysname"]=="Darwin"){ # run on mac os machine
     
        use_python("/Users/umatter/opt/anaconda3/bin/python") # IMPORTANT: keras/tensorflow is set up to run in this environment on this machine!
}

In [None]:
# load packages
library(keras)
library(tibble)
library(ggplot2)
library(tfdatasets)


# load data
boston_housing <- dataset_boston_housing()
str(boston_housing)


In a first step, we split the data into a training set and a test set. The latter is used to monitor the out-of-sample performance of the model fit. Testing the validity of an estimated model by looking at how it performs out-of-sample is of particular relevance when working with (deep) neural networks, as they can easily lead to over-fitting. Validity checks based on the test sample are, therefore, often an integral part of modelling with tensorflow/keras.

In [None]:
# assign training and test data/labels
c(train_data, train_labels) %<-% boston_housing$train
c(test_data, test_labels) %<-% boston_housing$test

In order to better understand and interpret the dataset we add the original variable names, and convert it to a `tibble`. 

In [None]:
library(dplyr)

column_names <- c('CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 
                  'DIS', 'RAD', 'TAX', 'PTRATIO', 'B', 'LSTAT')

train_df <- train_data %>% 
  as_tibble(.name_repair = "minimal") %>% 
  setNames(column_names) %>% 
  mutate(label = train_labels)

test_df <- test_data %>% 
  as_tibble(.name_repair = "minimal") %>% 
  setNames(column_names) %>% 
  mutate(label = test_labels)

Next, we have a close look at the data. Note the usage of the term 'label' for what is usually called the 'dependent variable' in econometrics.^[Typical textbook examples in machine learning deal with classification (e.g. a logit model), while in microeconometrics the typical example is usually a linear model (continuous dependent variable).] As the aim of the exercise is to predict median prices of homes, the output of the model will be a continuous value ('labels').

In [None]:
# check example data dimensions and content
paste0("Training entries: ", length(train_data), ", labels: ", length(train_labels))
summary(train_data)
summary(train_labels) # Display first 10 entries

As the dataset contains variables ranging from per capita crime rate to indicators for highway access, the variables are obviously measured in different units and hence displayed on different scales. This is not per se a problem for the fitting procedure. However, fitting is more efficient when all features (variables) are normalized.

In [None]:
spec <- feature_spec(train_df, label ~ . ) %>% 
  step_numeric_column(all_numeric(), normalizer_fn = scaler_standard()) %>% 
  fit()

layer <- layer_dense_features(
  feature_columns = dense_features(spec), 
  dtype = tf$float32
)
layer(train_df)

We specify the model as a linear stack of layers: The input (all 13 explanatory variables), two densely connected hidden layers (each with a 64-dimensional output space), and finally the one-dimensional output layer (the 'dependent variable').


In [None]:
# Create the model
# model specification
input <- layer_input_from_dataset(train_df %>% select(-label))

output <- input %>% 
  layer_dense_features(dense_features(spec)) %>% 
  layer_dense(units = 64, activation = "relu") %>%
  layer_dense(units = 64, activation = "relu") %>%
  layer_dense(units = 1) 

model <- keras_model(input, output)

In order to fit the model, we first have to compile it (configure it for training). At this step we set the configuration parameters that will guide the training/optimization procedure. We use the mean squared errors loss function (`mse`) typically used for regressions. We chose the [RMSProp](http://www.cs.toronto.edu/~tijmen/csc321/slides/lecture_slides_lec6.pdf) optimizer to find the minimum loss.

In [None]:
# compile the model  
model %>% 
  compile(
    loss = "mse",
    optimizer = optimizer_rmsprop(),
    metrics = list("mean_absolute_error")
  )


Now we can get a summary of the model we are about to fit to the data.

In [None]:
# get a summary of the model
model

Given the relatively simple model and small dataset, we set the maximum number of epochs to 500 and allow for early stopping in case the validation loss (based on test data) is not improving for a while.

In [None]:
# Set max. number of epochs
epochs <- 500

Finally, we fit the model while preserving the training history, and visualize the training progress.

In [None]:
# Fit the model and store training stats

history <- model %>% fit(
  x = train_df %>% select(-label),
  y = train_df$label,
  epochs = epochs,
  validation_split = 0.2,
  verbose = 0
)


plot(history)

## A word of caution

From just comparing the number of threads of a modern CPU with the number of threads of a modern GPU, one might get the impression that parallel tasks should always be implemented for GPU computing. However, whether one approach or the other is faster can depend a lot on the overall task and the data at hand. Moreover, the parallel implementation of tasks can be done more or less well on either system. Really efficient parallel implementation of tasks can take a lot of coding time (particularly when done for GPUs).^[For a more detailed discussion of the relevant factors for well done parallelization (either on CPUs or GPUs), see @matloff_2015].



## References
