# Spark Lab 1

In this lab, we will use Spark to further dig into the Bay Area Bike Share data.

### Environment Option 1 - BigData VM

You will need to run this lab on the VM provided. So, as usual, connect to your VM using

    vagrant up
    vagrant ssh

And then, once inside, run:

    spark_local_start.sh

**Important:** If your machine is already running and you've started the Hadoop services with `bigdata_start.sh`, you may want to first run `bigdata_stop.sh` to stop all services and free some memory space.

Once you've started spark in local mode, you should be able to access Jupyter at this address:

http://10.211.55.101:18888

We will work in there.


### Environment Option 2 - Local Spark Environment

In order to work on this lab with your local machine, just start your local jupyter notebook with spark using our previous configuration using the following command alias:

```bash
jupyter-spark
```


## Overview


Our goal is to calculate the average number of trips per hour, using the Caltrain Station as starting point.

Check that your spark context is available:

> If you are using your BigData VM, create a new notebook on the instance running at http://10.211.55.101:18888, using the content in this notebook as a reference point.

In [None]:
sc

Load the Bay Bay Area Bike Share trip data:

Data has been pre-loaded on your BigData VM:

In [None]:
trips = sc.textFile('file:///home/vagrant/data/201408_babs_open_data/201408_trip_data.csv')

If want to use your local Spark instance (not in the BigData VM), you should be able to setup your trips using the local dataset from the "bay_area_bike_share" directory.  The dataset is too large to fit on Github uncompressed so it is in zip format.  You will need to unarchive it and load it.

In [13]:
csv_file = "../../../../../datasets/bay_area_bike_share/201408_babs_open_data/201408_trip_data.csv"
trips = sc.textFile(csv_file)

**Check:** What kind of object is `trips`?

### Exercise 1: split csv lines
In spark, we can build complex pipelines that only get executed when we ask to collect them.

Remember how we built pipelines in scikit learn as a composition of transformations?

The process here is very similar, with one big difference:

While in a python pipeline the calculation is immediately executed, with spark the pipeline definition and execution are separate steps.

In other words, we can define the pipeline with all its steps, and only when we call `collect` will the data flow through it. In order to get familiar with this new workflow, we will start with small steps to build our pipeline.

First step:
- apply a map to trips that splits each line at commas and save that to a an RDD

**Hint:** if you want to check that you're doing things right, you can collect the result and display the first few lines.

### Exercise 2: filter for Caltrain station
In Spark we can also create filters using the `filter` method.
Let's select station number 70 by filtering on the 5th column, we will do all the following analysis just on this station, which corresponds to the most popular starting point. Save this to a variable called `station_70`.

### Exercise 3: trips by day - hour (mapper)
Let's analyse the trips by the hour. We can do this by performing a map reduce job in Spark. First we will need to emit tuples with a count of 1 for each (date, hour) key, and then we will sum the counts by key.

- Emit tuple of ((date, hour), 1), applying a map to `station_70` that extracts the relevant data from each line

In [None]:
# Emit tuple of ((date, hour), 1)


### Exercise 4:  trips by day - hour (reducer)

Use the `reduceByKey` method to obtain the number of trips per (day, hour)

### Exercise 5: trips by hour (mapper)
Let's further group the trips by hour. We'll do this with a second Map Reduce job.
First we will discard the day and emit tuples of (hour, count). You can achieve this with a map

In [None]:
# Emit tuple of (hour, count)


### Exercise 6: trips by hour (reducer)
Then let's calculate the average number of trips by hour using the `combineByKey` method.

You can find a suggestion on how to do it [here](http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/).

### Exercise 7: collect
We can finally collect our result and sort them

### Bonus:

Besides the SparkContext, Spark also exposes a sqlContext that allows us to perform SQL queries on an RDD object.

A SQLContext is also already created for you. Do not create another or unspecified behavior may occur. As you can see below, the sqlContext provided is a HiveContext.

- Run the same query we performed in Hue/Hive to obtain the average duration of a trip originating from the Caltrain station

In [None]:
sqlContext

In [None]:
tripsSql = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
                                                                      inferschema='true').load('file:///home/vagrant/data/201408_babs_open_data/201408_trip_data.csv')

In [None]:
tripsSql.printSchema()

In [None]:
# Register this DataFrame as a table.
tripsSql.registerTempTable("tripsSql")

In [None]:
sqlContext.sql("""
YOUR QUERY HERE
""")