In [None]:
### <center>Data Munging with Apache Spark: Using Apache Parquet, Pivot table and doing some viz, _using R_.</center>
<center>@geraudster</center>

![TDS Logo](http://photos1.meetupstatic.com/photos/event/8/a/5/2/global_434255410.jpeg)


Apache Parquet is designed to bring efficient columnar storage to Hadoop and has the following characteristics:

* Self-describing
* Columnar format
* Efficiently encode nested structures and sparsely populated data based on the Google Dremel definition/repetition levels
* Language-independent

Parquet arranges data in columns, putting related values in close proximity to each other to optimize query performance, minimize I/O, and facilitate compression. Parquet detects and encodes the same or similar data using a technique that conserves resources.

First, we will initialize the Spark context (sc) by setting some environment variables and loading the _SparkR_ package:

In [1]:
# Set this to where Spark is installed
Sys.setenv(SPARK_HOME="/opt/spark/spark-1.6.1-bin-hadoop2.6")
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.11:1.3.0" "sparkr-shell"')

# This line loads SparkR from the installed directory
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))

# Load the SparkR library
library(SparkR)
sc <- sparkR.init(master="local[*]")


Attaching package: ‘SparkR’

The following objects are masked from ‘package:stats’:

    cov, filter, lag, na.omit, predict, sd, var

The following objects are masked from ‘package:base’:

    colnames, colnames<-, endsWith, intersect, rank, rbind, sample,
    startsWith, subset, summary, table, transform



Launching java with spark-submit command /opt/spark/spark-1.6.1-bin-hadoop2.6/bin/spark-submit   "--packages" "com.databricks:spark-csv_2.11:1.3.0" "sparkr-shell" /tmp/RtmpbQN1Gw/backend_port3245fbb49b0 


### Loading Bike and Weather data in Parquet format

In [2]:
sqlContext <- sparkRSQL.init(sc)
bikes_weather <- read.parquet(sqlContext, 'data/bike_sharing/bike_weather.parquet')

In [3]:
printSchema(bikes_weather)

root
 |-- datetime: timestamp (nullable = true)
 |-- season: string (nullable = true)
 |-- holiday: boolean (nullable = true)
 |-- workingday: boolean (nullable = true)
 |-- weather: string (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- casual: integer (nullable = true)
 |-- registered: integer (nullable = true)
 |-- count: integer (nullable = true)


In [4]:
head(bikes_weather)

Unnamed: 0,datetime,season,holiday,workingday,weather,temp,atemp,humidity,windspeed,casual,registered,count
1,2011-01-15 19:00:00,springer,False,False,"Mist + Cloudy, Mist + Broken clouds, Mist + Few clouds, Mist",13.12,15.15,39,16.998,14,60,74
2,2011-01-23 18:00:00,springer,False,False,"Clear, Few clouds, Partly cloudy, Partly cloudy",4.92,6.06,30,16.998,5,44,49
3,2011-01-24 04:00:00,springer,False,True,"Clear, Few clouds, Partly cloudy, Partly cloudy",0.82,3.03,48,8.998,0,1,1
4,2011-02-13 11:00:00,springer,False,False,"Clear, Few clouds, Partly cloudy, Partly cloudy",13.12,14.395,39,30.003,26,86,112
5,2011-03-04 22:00:00,springer,False,True,"Clear, Few clouds, Partly cloudy, Partly cloudy",12.3,14.395,70,12.998,4,40,44
6,2011-03-20 10:00:00,springer,False,False,"Mist + Cloudy, Mist + Broken clouds, Mist + Few clouds, Mist",13.12,17.425,45,0.0,55,81,136


In [5]:
?read.parquet

### Heatmap of Bike Rentals by time and day of week

A heatmap is a graphical representation of data where the individual values contained in a matrix are represented as colors. See an example here https://stanford.edu/~mwaskom/software/seaborn/examples/heatmap_annotation.html

In order to do our heatmap we use the library Seaborn. It is a Python visualization library based on matplotlib. It provides a high-level interface for drawing attractive statistical graphics.

Here goes the necessary imports:

In [6]:
# TODO

#### Prepare data as matrix

In order to plot our heatmap we need to frame the data as follows:
<pre>
  Weekday
|----------|-----|-----|-------|-----|-----|
|          |  0  |  1  |  ...  |  22 |  23 | -- Hour
|----------|-----|-----|-------|-----|-----|
|  Monday  |  4  |  5  |  ...  |  47 | 22  |
|  Tuesday |  6  |  9  |  ...  |  88 | 44  |
|  ...     |  .  |  .  |  ...  |  .  |  .  |
|  Sunday  |  0  |  2  |  ...  |  4  |  6  |
|----------|-----|-----|-------|-----|-----|
</pre>


### Pivoting Data in SparkSQL
Pivot tables are an essential part of data analysis and reporting. A pivot can be thought of as translating rows into columns while applying one or more aggregations.

Here goes an example:
<pre>
+---+-----+---+
|cat| type|qty|
+---+-----+---+
|one|small|  1|
|one|large|  2|                  Aggregates by "cat" and pivots the "type" column then averages "qty".
|one|large|  2|                  +---+-------+-----+
|two|small|  3|                  |cat|  large|small|
|two|small|  3|          ===>    +---+-------+-----+
|one|large|  4|                  |two|    7.0|  4.0|
|one|small|  5|                  |one|   2.66|  3.0|
|two|small|  6|                  +---+-------+-----+
|two|large|  7|
+---+-----+---+
</pre>

In [7]:
fake_data <- data.frame(cat = c('one', 'one', 'one', 'two', 'two', 'one', 'one', 'two', 'two'),
                        type = c('small', 'large', 'large', 'small', 'small', 'large', 'small', 'small', 'large'),
                        qty = c(1, 2, 2, 3, 3, 4, 5, 6, 7))

df_fake <- createDataFrame(sqlContext, fake_data)
collect(df_fake)

Unnamed: 0,cat,type,qty
1,one,small,1
2,one,large,2
3,one,large,2
4,two,small,3
5,two,small,3
6,one,large,4
7,one,small,5
8,two,small,6
9,two,large,7


__Note__: pivot function is not yet implemented for R API, there's an issue in progress (https://github.com/apache/spark/pull/13295)

In [8]:
library(reshape2)
dcast(df_fake, cat ~ type, sum)

Using qty as value column: use value.var to override.


ERROR: Error in FUN(X[[i]], ...): envir must be either NULL, a list, or an environment.
