# Spark Lab 1

In this lab, we will use Spark to further dig into the Bay Area Bike Share data.

You will need to run this lab on the VM provided. So, as usual, connect to your VM using

    vagrant up
    vagrant ssh

And then, once inside, run:

    spark_local_start.sh

**Important:** If your machine is already running and you've started the Hadoop services with `bigdata_start.sh`, you may want to first run `bigdata_stop.sh` to stop all services and free some memory space.

Once you've started spark in local mode, you should be able to access Jupyter at this address:

http://10.211.55.101:18888

We will work in there.

Our goal is to calculate the average number of trips per hour, using the Caltrain Station as starting point.

Check that your spark context is available:

In [1]:
sc

''

Load the Bay Bay Area Bike Share trip data:

NB: data has been pre-loaded on your VM

In [None]:
trips = sc.textFile('file:///home/vagrant/data/201408_babs_open_data/201408_trip_data.csv')

**Check:** What kind of object is `trips`?

### Exercise 1: split csv lines
In spark, we can build complex pipelines that only get executed when we ask to collect them.

Remember how we built pipelines in scikit learn as a composition of transformations?

The process here is very similar, with one big difference:

While in a python pipeline the calculation is immediately executed, with spark the pipeline definition and execution are separate steps.

In other words, we can define the pipeline with all its steps, and only when we call `collect` will the data flow through it. In order to get familiar with this new workflow, we will start with small steps to build our pipeline.

First step:
- apply a map to trips that splits each line at commas and save that to a an RDD

**Hint:** if you want to check that you're doing things right, you can collect the result and display the first few lines.

### Exercise 2: filter for Caltrain station
In Spark we can also create filters using the `filter` method.
Let's select station number 70 by filtering on the 5th column, we will do all the following analysis just on this station, which corresponds to the most popular starting point. Save this to a variable called `station_70`.

### Exercise 3: trips by day - hour (mapper)
Let's analyse the trips by the hour. We can do this by performing a map reduce job in Spark. First we will need to emit tuples with a count of 1 for each (date, hour) key, and then we will sum the counts by key.

- Emit tuple of ((date, hour), 1), applying a map to `station_70` that extracts the relevant data from each line

In [None]:
# Emit tuple of ((date, hour), 1)


### Exercise 4:  trips by day - hour (reducer)

Use the `reduceByKey` method to obtain the number of trips per (day, hour)

### Exercise 5: trips by hour (mapper)
Let's further group the trips by hour. We'll do this with a second Map Reduce job.
First we will discard the day and emit tuples of (hour, count). You can achieve this with a map

In [None]:
# Emit tuple of (hour, count)


### Exercise 6: trips by hour (reducer)
Then let's calculate the average number of trips by hour using the `combineByKey` method.

You can find a suggestion on how to do it [here](http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/).

### Exercise 7: collect
We can finally collect our result and sort them

### Bonus:

Besides the SparkContext, Spark also exposes a sqlContext that allows us to perform SQL queries on an RDD object.

A SQLContext is also already created for you. Do not create another or unspecified behavior may occur. As you can see below, the sqlContext provided is a HiveContext.

- Run the same query we performed in Hue/Hive to obtain the average duration of a trip originating from the Caltrain station

In [10]:
sqlContext

<pyspark.sql.context.SQLContext at 0x1096ff710>

In [45]:
tripsSql = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
                                                                      inferschema='true').load('../../../../../../DSI-SF-2/datasets/iowa_liquor/Iowa_Liquor_sales_sample_10pct.csv')

In [46]:
tripsSql.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Store Number: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- Zip Code: string (nullable = true)
 |-- County Number: double (nullable = true)
 |-- County: string (nullable = true)
 |-- Category: double (nullable = true)
 |-- Category Name: string (nullable = true)
 |-- Vendor Number: integer (nullable = true)
 |-- Item Number: integer (nullable = true)
 |-- Item Description: string (nullable = true)
 |-- Bottle Volume (ml): integer (nullable = true)
 |-- State Bottle Cost: string (nullable = true)
 |-- State Bottle Retail: string (nullable = true)
 |-- Bottles Sold: integer (nullable = true)
 |-- Sale (Dollars): string (nullable = true)
 |-- Volume Sold (Liters): double (nullable = true)
 |-- Volume Sold (Gallons): double (nullable = true)



In [47]:
# Register this DataFrame as a table.
tripsSql.registerTempTable("tripsSql")

In [60]:
data = sqlContext.sql("""
SELECT `Bottle Volume (ml)`, `Bottles Sold` FROM tripsSql
""")

print data.rdd.map(lambda p: p)

PythonRDD[95] at RDD at PythonRDD.scala:48


In [62]:
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel

def parsePoint(line):
    values = [float(x) for x in line]
    return LabeledPoint(values[0], values[1:])

parsedData = data.rdd.map(parsePoint)

# Build the model
model = LinearRegressionWithSGD.train(parsedData, iterations=100, step=0.00000001)


In [63]:
# Evaluate the model on training data
valuesAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
MSE = valuesAndPreds \
    .map(lambda (v, p): (v - p)**2) \
    .reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))

Mean Squared Error = 1098443.76554


In [7]:
lines = sc.textFile("../../../1.1-intro_to_big_data/code/5000.txt.utf-8.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

In [9]:
counts.collect()

[(u'', 6954),
 (u'vision) which dilates with distance and embraces with true', 1),
 (u"painters Filippino Lippi and Lorenzo di Credi. L. di Credi's", 1),
 (u'The canal which may be 16 braccia wide at the bottom and 20 at the', 1),
 (u'the chief of the others. Of this then only we will speak, and the', 1),
 (u'(_lumen cinereum_). His observations however, having hitherto', 1),
 (u'1154.', 1),
 (u'able, that your Highness will write a letter to the said Ser', 1),
 (u'from the other. By a pyramid of lines I mean those which start from', 1),
 (u'792.', 1),
 (u'space, because as white has no colour of its own, it is tinged and', 1),
 (u'the quarter where the sun is, and if you have your back turned to', 1),
 (u'ocean.', 1),
 (u'in the sciences of Anatomy and Physiology, will never be appreciated', 1),
 (u'images are transmitted to the eye and will all be comprised within', 1),
 (u'which is heavier.', 1),
 (u'IX.', 2),
 (u'all allusion to it in the MSS. Therefore, when I presently add a few'