<img src="http://imgur.com/1ZcRyrc.png" style="float: left; margin: 20px; height: 55px">

# Practice Spark Lab


**In this lab, we will use Spark to dig into the Bay Area Bike Share data.**

Our goal is to calculate the average number of trips per hour, using the Caltrain Station as starting point.

In [92]:
import pyspark as ps    # for the pyspark suite
import warnings         # for displaying warning
from pyspark.sql import SQLContext

In [93]:
try:
    # we try to create a SparkContext to work locally on all cpus available
    sc = ps.SparkContext('local[4]')
    sqlContext = SQLContext(sc)
    print("Just created a SparkContext")
except ValueError:
    # give a warning if SparkContext already exists (for use inside pyspark)
    warnings.warn("SparkContext already exists in this scope")

  


In [94]:
trips = sc.textFile('./data/201508_trip_data.csv')

In [95]:
trips.take(2)

['Trip ID,Duration,Start Date,Start Station,Start Terminal,End Date,End Station,End Terminal,Bike #,Subscriber Type,Zip Code',
 '913460,765,8/31/2015 23:26,Harry Bridges Plaza (Ferry Building),50,8/31/2015 23:39,San Francisco Caltrain (Townsend at 4th),70,288,Subscriber,2139']

### 2. What kind of object is the data loaded as?

In [96]:
# It is an RDD

trips

./data/201508_trip_data.csv MapPartitionsRDD[319] at textFile at NativeMethodAccessorImpl.java:0

### 3. Split csv lines

In spark, we can build complex pipelines that only get executed when we ask to collect them.

In a python pipeline the calculation is immediately executed, but with spark the pipeline definition and execution are separate steps.

In other words, we can define the pipeline with all its steps, and only when we call `collect` will the data flow through it. In order to get familiar with this new workflow, we will start with small steps to build our pipeline.

**Apply a map to trips that splits each line at commas and save that to a an RDD.**

> **Hint:** if you want to check that you're doing things right, you can collect the result and display the first few lines or use the .take() method and give the number of lines as an argument.

In [97]:
trips = trips.map(lambda line: line.split(","))

In [98]:
trips.take(5)

[['Trip ID',
  'Duration',
  'Start Date',
  'Start Station',
  'Start Terminal',
  'End Date',
  'End Station',
  'End Terminal',
  'Bike #',
  'Subscriber Type',
  'Zip Code'],
 ['913460',
  '765',
  '8/31/2015 23:26',
  'Harry Bridges Plaza (Ferry Building)',
  '50',
  '8/31/2015 23:39',
  'San Francisco Caltrain (Townsend at 4th)',
  '70',
  '288',
  'Subscriber',
  '2139'],
 ['913459',
  '1036',
  '8/31/2015 23:11',
  'San Antonio Shopping Center',
  '31',
  '8/31/2015 23:28',
  'Mountain View City Hall',
  '27',
  '35',
  'Subscriber',
  '95032'],
 ['913455',
  '307',
  '8/31/2015 23:13',
  'Post at Kearny',
  '47',
  '8/31/2015 23:18',
  '2nd at South Park',
  '64',
  '468',
  'Subscriber',
  '94107'],
 ['913454',
  '409',
  '8/31/2015 23:10',
  'San Jose City Hall',
  '10',
  '8/31/2015 23:17',
  'San Salvador at 1st',
  '8',
  '68',
  'Subscriber',
  '95113']]

### 4. Filter for Caltrain station

In Spark we can also create filters using the `filter` method.

**Select station number 70 by filtering on the 5th column.** 

We will do all the following analysis just on this station, which corresponds to the most popular starting point. Save this to a variable called `station_70`.

In [99]:
station_70 = trips.filter(lambda x: x[4] == '70')

In [100]:
station_70.take(2)

[['913429',
  '902',
  '8/31/2015 21:07',
  'San Francisco Caltrain (Townsend at 4th)',
  '70',
  '8/31/2015 21:22',
  'Broadway St at Battery St',
  '82',
  '501',
  'Subscriber',
  '94133'],
 ['913426',
  '481',
  '8/31/2015 21:06',
  'San Francisco Caltrain (Townsend at 4th)',
  '70',
  '8/31/2015 21:14',
  'Market at 4th',
  '76',
  '542',
  'Subscriber',
  '95054']]

### 5. Trips by day - hour (mapper)

Let's analyse the trips by the hour. We can do this by performing a map reduce job in Spark. First we will need to emit tuples with a count of 1 for each (date, hour) key, and then we will sum the counts by key.

**Emit tuple of ((date, hour), 1), applying a map to `station_70` that extracts the relevant data from each line.**

In [139]:
# Emit tuple of ((date, hour), 1)

trips_by_day_hour = station_70.map(lambda x: ((x[2].split()[0],\ # 1st position in tuple: pos[0] after split x[2]
                                               x[2].split()[1].split(':')[0]), 1)) # 2nd pos in tuple: x[0] after double spltit. And 1 apart to count

In [141]:
trips_by_day_hour.take(5)

[(('8/31/2015', '21'), 1),
 (('8/31/2015', '21'), 1),
 (('8/31/2015', '20'), 1),
 (('8/31/2015', '19'), 1),
 (('8/31/2015', '18'), 1)]

### 6. Trips by day - hour (reducer)

Use the `reduceByKey` method to obtain the number of trips per (day, hour).

In [103]:
trips_by_day_hour_red = trips_by_day_hour.reduceByKey(lambda x, y: x+y).sortByKey()

In [104]:
trips_by_day_hour_red.take(10)

[(('1/1/2015', '11'), 1),
 (('1/1/2015', '12'), 1),
 (('1/1/2015', '14'), 2),
 (('1/1/2015', '16'), 3),
 (('1/10/2015', '10'), 2),
 (('1/10/2015', '11'), 2),
 (('1/10/2015', '12'), 7),
 (('1/10/2015', '13'), 2),
 (('1/10/2015', '18'), 1),
 (('1/10/2015', '22'), 3)]

In [105]:
# We can do the same by simply using the 'add' function from the operator library

from operator import add

test = trips_by_day_hour.reduceByKey(add).sortByKey()

test.take(10)

[(('1/1/2015', '11'), 1),
 (('1/1/2015', '12'), 1),
 (('1/1/2015', '14'), 2),
 (('1/1/2015', '16'), 3),
 (('1/10/2015', '10'), 2),
 (('1/10/2015', '11'), 2),
 (('1/10/2015', '12'), 7),
 (('1/10/2015', '13'), 2),
 (('1/10/2015', '18'), 1),
 (('1/10/2015', '22'), 3)]

### 7. Trips by hour (mapper)

Let's further group the trips by hour. We'll do this with a second Map Reduce job.

First we will discard the day and emit tuples of (hour, count). You can achieve this with a map.

In [106]:
# Emit tuple of (hour, count)

trips_by_hour = trips_by_day_hour_red.map(lambda x: (int(x[0][1]), x[1])) # int in pos x[0][1] and then x[1]

In [107]:
trips_by_hour.take(10)

[(11, 1),
 (12, 1),
 (14, 2),
 (16, 3),
 (10, 2),
 (11, 2),
 (12, 7),
 (13, 2),
 (18, 1),
 (22, 3)]

### 8. Trips by hour (reducer)

Now calculate the average number of trips by hour using the `combineByKey` method.

> You can find a suggestion on how to do it [here](http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/).

In [119]:
# First let's find it without using the combineByKey method

trips_by_day_hour_red \
    .map(lambda x: (int(x[0][1]),(x[1],1))) \ # the hour first as int and in second place the occurrences and 1
    .reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1])) \ # the hour is the key and we're adding x and y
    .collect()

[(12, (389, 213)),
 (14, (287, 184)),
 (16, (807, 298)),
 (10, (1151, 306)),
 (18, (2446, 307)),
 (22, (190, 141)),
 (20, (498, 231)),
 (6, (1211, 247)),
 (8, (6551, 286)),
 (0, (77, 67)),
 (2, (3, 3)),
 (4, (2, 2)),
 (11, (590, 283)),
 (13, (299, 181)),
 (9, (3081, 319)),
 (15, (390, 221)),
 (21, (311, 190)),
 (17, (2023, 307)),
 (19, (1366, 279)),
 (7, (4497, 260)),
 (23, (88, 75)),
 (5, (39, 31)),
 (3, (1, 1)),
 (1, (7, 7))]

In [111]:
avg_by_key = trips_by_day_hour_red \
    .map(lambda x: (int(x[0][1]),(x[1],1))) \# the hour first as int and in second place the occurrences and 1
    .reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1])) \# the hour is the key and we're adding x and y
    .mapValues(lambda x: x[0]/x[1]) \# finally we map x as x[0] / x[1]
    .sortByKey()

In [110]:
avg_by_key.collect()

[(0, 1.1492537313432836),
 (1, 1.0),
 (2, 1.0),
 (3, 1.0),
 (4, 1.0),
 (5, 1.2580645161290323),
 (6, 4.902834008097166),
 (7, 17.296153846153846),
 (8, 22.905594405594407),
 (9, 9.658307210031348),
 (10, 3.761437908496732),
 (11, 2.0848056537102475),
 (12, 1.8262910798122065),
 (13, 1.6519337016574585),
 (14, 1.559782608695652),
 (15, 1.7647058823529411),
 (16, 2.708053691275168),
 (17, 6.58957654723127),
 (18, 7.96742671009772),
 (19, 4.896057347670251),
 (20, 2.155844155844156),
 (21, 1.6368421052631579),
 (22, 1.3475177304964538),
 (23, 1.1733333333333333)]

In [122]:
# We can do the same using combineByKey
# commbineByKey is a generic function to combine the elements for each key using a custom
# set of aggregation functions.

sumCount = trips_by_hour.combineByKey(# we'll start from the trips_by_hour
                             lambda value: (value, 1),
                             lambda x, value: (x[0] + value, x[1] + 1), 
                             lambda x, y: (x[0] + y[0], x[1] + y[1]))

**The combineByKey Method**

In order to aggregate an RDD’s elements in parallel, Spark’s combineByKey method requires three functions:

* createCombiner
* mergeValue
* mergeCombiner

**Create a Combiner**

* lambda value: (value, 1)

The first required argument in the combineByKey method is a function to be used as the very first aggregation step for each key. The argument of this function corresponds to the value in a key-value pair. If we want to compute the sum and count using combineByKey, then we can create this “combiner” to be a tuple in the form of (sum, count). The very first step in this aggregation is then (value, 1), where value is the first RDD value that combineByKey comes across and 1 initializes the count.

**Merge a Value**

* lambda x, value: (x[0] + value, x[1] + 1)

The next required function tells combineByKey what to do when a combiner is given a new value. The arguments to this function are a combiner and a new value. The structure of the combiner is defined above as a tuple in the form of (sum, count) so we merge the new value by adding it to the first element of the tuple while incrementing 1 to the second element of the tuple.

**Merge two Combiners**

* lambda x, y: (x[0] + y[0], x[1] + y[1])

The final required function tells combineByKey how to merge two combiners. In this example with tuples as combiners in the form of (sum, count), all we need to do is add the first and last elements together.

In [123]:
sumCount.collect()

[(12, (389, 213)),
 (14, (287, 184)),
 (16, (807, 298)),
 (10, (1151, 306)),
 (18, (2446, 307)),
 (22, (190, 141)),
 (20, (498, 231)),
 (6, (1211, 247)),
 (8, (6551, 286)),
 (0, (77, 67)),
 (2, (3, 3)),
 (4, (2, 2)),
 (11, (590, 283)),
 (13, (299, 181)),
 (9, (3081, 319)),
 (15, (390, 221)),
 (21, (311, 190)),
 (17, (2023, 307)),
 (19, (1366, 279)),
 (7, (4497, 260)),
 (23, (88, 75)),
 (5, (39, 31)),
 (3, (1, 1)),
 (1, (7, 7))]

In [52]:
averageByKey = sumCount.map(lambda x: (x[0], x[1][0] / x[1][1] )).sortByKey()

In [53]:
averageByKey.collect()

[(0, 1.1492537313432836),
 (1, 1.0),
 (2, 1.0),
 (3, 1.0),
 (4, 1.0),
 (5, 1.2580645161290323),
 (6, 4.902834008097166),
 (7, 17.296153846153846),
 (8, 22.905594405594407),
 (9, 9.658307210031348),
 (10, 3.761437908496732),
 (11, 2.0848056537102475),
 (12, 1.8262910798122065),
 (13, 1.6519337016574585),
 (14, 1.559782608695652),
 (15, 1.7647058823529411),
 (16, 2.708053691275168),
 (17, 6.58957654723127),
 (18, 7.96742671009772),
 (19, 4.896057347670251),
 (20, 2.155844155844156),
 (21, 1.6368421052631579),
 (22, 1.3475177304964538),
 (23, 1.1733333333333333)]

### 9. Using the Spark `sqlContext`

Besides the SparkContext, Spark also exposes a sqlContext that allows us to perform SQL queries on an RDD object.**We'll run a query using a sqlContext to obtain the average duration of a trip originating from the Caltrain station.**

In [124]:
sqlContext

<pyspark.sql.context.SQLContext at 0x1184415c0>

In [125]:
tripsSql = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
                                                                      inferschema='true').load('./data/201508_trip_data.csv')

In [126]:
tripsSql.show(n=5)

+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
|Trip ID|Duration|     Start Date|       Start Station|Start Terminal|       End Date|         End Station|End Terminal|Bike #|Subscriber Type|Zip Code|
+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
| 913460|     765|8/31/2015 23:26|Harry Bridges Pla...|            50|8/31/2015 23:39|San Francisco Cal...|          70|   288|     Subscriber|    2139|
| 913459|    1036|8/31/2015 23:11|San Antonio Shopp...|            31|8/31/2015 23:28|Mountain View Cit...|          27|    35|     Subscriber|   95032|
| 913455|     307|8/31/2015 23:13|      Post at Kearny|            47|8/31/2015 23:18|   2nd at South Park|          64|   468|     Subscriber|   94107|
| 913454|     409|8/31/2015 23:10|  San Jose City Hall|            10|8/31/2015 23

In [127]:
tripsSql.printSchema()

root
 |-- Trip ID: integer (nullable = true)
 |-- Duration: integer (nullable = true)
 |-- Start Date: string (nullable = true)
 |-- Start Station: string (nullable = true)
 |-- Start Terminal: integer (nullable = true)
 |-- End Date: string (nullable = true)
 |-- End Station: string (nullable = true)
 |-- End Terminal: integer (nullable = true)
 |-- Bike #: integer (nullable = true)
 |-- Subscriber Type: string (nullable = true)
 |-- Zip Code: string (nullable = true)



In [128]:
tripsSql.columns

['Trip ID',
 'Duration',
 'Start Date',
 'Start Station',
 'Start Terminal',
 'End Date',
 'End Station',
 'End Terminal',
 'Bike #',
 'Subscriber Type',
 'Zip Code']

In [129]:
from pyspark.sql.functions import *

In [130]:
tripsSql.select(col('Trip ID')).first()

Row(Trip ID=913460)

In [131]:
# rename the columns to be able to use SQL
df = tripsSql.select(col("Trip ID").alias('Trip_ID'),
                     col("Duration"),
                     col("Start Date").alias('Start_Date'),
                     col("Start Station").alias('Start_Station'),
                     col("Start Terminal").alias('Start_Terminal'),
                     col("End Date").alias('End_Date'),
                     col("End Station").alias('End_Station'),
                     col("End Terminal").alias('End_Terminal'),
                     col("Bike #").alias('Bike_id'),
                     col("Subscriber Type").alias('Subscriber_Type'),
                     col("Zip Code").alias('Zip_Code'))

In [132]:
# Register this DataFrame as a table.
df.registerTempTable("tripsSql")

In [133]:
sqlContext.sql("""
SELECT t.Start_Date 
FROM tripsSql t
LIMIT 5
""").show()

+---------------+
|     Start_Date|
+---------------+
|8/31/2015 23:26|
|8/31/2015 23:11|
|8/31/2015 23:13|
|8/31/2015 23:10|
|8/31/2015 23:09|
+---------------+



In [134]:
sqlContext.sql("""
SELECT
    CAST(SPLIT(SPLIT(t.Start_Date, ' ')[1], ':')[0] AS INT) AS hour,
    t.Duration AS duration
FROM tripsSql t
WHERE
    t.Start_Terminal = 70
    AND
    t.Duration IS NOT NULL
""").show(n=24)

+----+--------+
|hour|duration|
+----+--------+
|  21|     902|
|  21|     481|
|  20|     842|
|  19|     259|
|  18|     226|
|  18|     769|
|  18|     727|
|  18|     756|
|  18|     515|
|  18|    1280|
|  18|    1139|
|  18|     702|
|  17|     926|
|  17|     696|
|  17|     908|
|  17|     919|
|  17|     718|
|  17|     507|
|  17|     663|
|  16|     818|
|  16|     553|
|  16|     458|
|  15|     656|
|  15|     611|
+----+--------+
only showing top 24 rows



In [135]:
sqlContext.sql("""
SELECT
    hour,
    COUNT(1) AS c,
    ROUND(AVG(duration) / 60) AS avg_duration
FROM (
    SELECT
        CAST(SPLIT(SPLIT(t.Start_Date, ' ')[1], ':')[0] AS INT) AS hour,
        t.Duration AS duration
    FROM tripsSql t
    WHERE
        t.Start_Terminal = 70
        AND
        t.Duration IS NOT NULL
    ) r
GROUP BY hour
ORDER BY hour ASC
""").show(n=24)

+----+----+------------+
|hour|   c|avg_duration|
+----+----+------------+
|   0|  77|        45.0|
|   1|   7|        15.0|
|   2|   3|        13.0|
|   3|   1|         4.0|
|   4|   2|       491.0|
|   5|  39|        38.0|
|   6|1211|        12.0|
|   7|4497|        13.0|
|   8|6551|        12.0|
|   9|3081|        12.0|
|  10|1151|        15.0|
|  11| 590|        24.0|
|  12| 389|        41.0|
|  13| 299|        42.0|
|  14| 287|        24.0|
|  15| 390|        15.0|
|  16| 807|        17.0|
|  17|2023|        12.0|
|  18|2446|        12.0|
|  19|1366|        11.0|
|  20| 498|        14.0|
|  21| 311|        12.0|
|  22| 190|        17.0|
|  23|  88|        11.0|
+----+----+------------+



Average counts per hour and day:

In [136]:
sqlContext.sql("""
    select hour, avg(count) from (
    select hour, day, count(day) as count from 
    (SELECT
        SPLIT(t.Start_Date, ' ')[0] as day,
        CAST(SPLIT(SPLIT(t.Start_Date, ' ')[1], ':')[0] AS INT) AS hour
    FROM tripsSql t
    WHERE
        t.Start_Terminal = 70
        AND
        t.Duration IS NOT NULL) as s
    group by hour, day
    order by hour, day) as r
    group by hour
    order by hour
""").show(24)

+----+------------------+
|hour|        avg(count)|
+----+------------------+
|   0|1.1492537313432836|
|   1|               1.0|
|   2|               1.0|
|   3|               1.0|
|   4|               1.0|
|   5|1.2580645161290323|
|   6| 4.902834008097166|
|   7|17.296153846153846|
|   8|22.905594405594407|
|   9| 9.658307210031348|
|  10| 3.761437908496732|
|  11|2.0848056537102475|
|  12|1.8262910798122065|
|  13|1.6519337016574585|
|  14| 1.559782608695652|
|  15|1.7647058823529411|
|  16| 2.708053691275168|
|  17|  6.58957654723127|
|  18|  7.96742671009772|
|  19| 4.896057347670251|
|  20| 2.155844155844156|
|  21|1.6368421052631579|
|  22|1.3475177304964538|
|  23|1.1733333333333333|
+----+------------------+

