<img src="http://imgur.com/1ZcRyrc.png" style="float: left; margin: 20px; height: 55px">

# Practice Spark Lab

_Authors: David Yerrington (SF)_

---

> *Note: This lab may be best administered as a walk through with the instructor.*

**In this lab, we will use Spark to dig into the Bay Area Bike Share data.**

You'll need to run this lab on the virtual machine (VM) provided. As usual, connect to your VM using:

    vagrant up
    vagrant ssh

Once inside, run:

    spark_local_start.sh

**Important:** If your machine is already running and you've started the Hadoop services with `bigdata_start.sh`, you may want to first run `bigdata_stop.sh` to stop all services and free up some memory.

Once you've started Spark in local mode, you should be able to access Jupyter at this address:

http://10.211.55.101:18888

This is where we'll work.

Our goal is to calculate the average number of trips per hour using the Caltrain Station as a starting point.

**Check that your SparkContext is available.**

In [None]:
# sc

### 1) Load the Bay Area Bike Share trip data.

> **Note:** The data have been pre-loaded onto your VM. `201408_babs_open_data/201408_trip_data.csv`

In [None]:
trips = sc.textFile('file:///home/vagrant/data/201408_babs_open_data/201408_trip_data.csv')

### 2) What kind of object are the data loaded as?

In [1]:
# As a resilient distributed data set.

### 3) Split CSV lines.

In Spark, we can build complex pipelines that are only executed when we ask to collect them.

In a Python pipeline, the calculation is immediately executed, but, with Spark, the pipeline definition and execution are separate steps.

In other words, we can define the pipeline with all of its steps and the data will only flow through it when we call `collect`. In order to get familiar with this new workflow, we'll start with small steps to build our pipeline.

**Apply a map to trips that split each line at commas and save that to a resilient distributed data set (RDD).**

> **Hint:** If you want to check that you're doing things correctly, you can collect the result and display the first few lines.

In [None]:
trips = trips.map(lambda line: line.split(","))

### 4) Filter for the Caltrain station.

We can also create filters using the `filter()` method in Spark.

**Select station number 70 by filtering on the fifth column.** 

We perform all of the following analysis on just this station, which corresponds to the most popular starting point. Save this to a variable called `station_70`.

In [None]:
station_70 = trips.filter(lambda x: x[4] == '70')

### 5) Trips by day: Hour (mapper).

Let's analyze the trips by the hour. We can do this by performing a MapReduce job in Spark. First, we'll need to emit tuples with a count of one for each key (day and hour), then we'll sum the counts by key.

**Emit a tuple of ((date, hour), 1), applying a map to `station_70` that extracts the relevant data from each line.**

In [None]:
# Emit a tuple of ((date, hour), 1).
trips_by_day_hour = station_70.map(lambda x: ((x[2].split()[0], x[2].split()[1].split(':')[0]), 1))

### 6) Trips by day: Hour (reducer).

Use the `reduceByKey()` method to obtain the number of trips per day and hour.

In [None]:
trips_by_day_hour = trips_by_day_hour.reduceByKey(lambda a, b: a+b)

### 7) Trips by hour (mapper).

Let's further group the trips by hour. We'll do this with a second MapReduce job.

First, we will discard the day and emit a tuple for hour and count. You can achieve this with a map.

In [None]:
# Emit a tuple of (hour, count).
trips_by_hour = trips_by_day_hour.map(lambda x: (int(x[0][1]), x[1]))

### 8) Trips by hour (reducer).

Now, calculate the average number of trips by hour using the `combineByKey()` method.

> You can find a suggestion for how to do this [here](http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/).

In [None]:
avg_trips_by_hour = trips_by_hour.combineByKey( (lambda x: (x, 1)), 
 (lambda x, y: (x[0] + y, x[1] + 1)), 
 (lambda x, y: (x[0] + y[0], x[1] + y[1])) 
 )
avg_trips_by_hour = avg_trips_by_hour.mapValues(lambda v : v[0] / v[1]) 

### 9) `collect()` the results.
Finally, we can collect our results and sort them.

In [None]:
avg_trips_sorted = sorted(avg_trips_by_hour.collect())

### 10) [Bonus] Using the Spark SQLContext.

Besides the SparkContext, Spark also exposes a SQLContext that allows us to perform SQL queries on an RDD object.

A SQLContext is also already created for you. Don’t create another or unspecified behavior may occur. As you can see below, the SQLContext provided is a HiveContext.

**Run a query using the SQLContext to obtain the average duration of a trip originating from the Caltrain station.**

In [None]:
sqlContext

In [None]:
tripsSql = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
                                                                      inferschema='true').load('file:///home/vagrant/data/201408_babs_open_data/201408_trip_data.csv')

In [None]:
tripsSql.printSchema()

In [None]:
# Register this DataFrame as a table.
tripsSql.registerTempTable("tripsSql")

In [None]:
sqlContext.sql("""
SELECT
    hour,
    COUNT(1) AS c,
    ROUND(AVG(duration) / 60) AS avg_duration
FROM (
    SELECT
        CAST(SPLIT(SPLIT(t.startdate, ' ')[1], ':')[0] AS INT) AS hour,
        t.duration AS duration
    FROM "tripsSql" t
    WHERE
        t.startterminal = 70
        AND
        t.duration IS NOT NULL
    ) r
GROUP BY hour
ORDER BY hour ASC;
""")