<img src="http://imgur.com/1ZcRyrc.png" style="float: left; margin: 20px; height: 55px">

# Practice Spark Lab

*Authors: Christoph Rahmede (LDN)*

**In this lab, we will use Spark to dig into the Bay Area Bike Share data.**

Our goal is to calculate the average number of trips per hour, using the Caltrain Station as starting point.

In [1]:
import pyspark as ps
from pyspark.sql import SQLContext

In [2]:
sc = ps.SparkContext('local[4]')
sqlContext = SQLContext(sc)
spark = ps.sql.SparkSession(sc)

## Load the data

In [3]:
trips = spark.read.csv(
    path="./data/201508_trip_data.csv",
    header=True,
    # Poorly formed rows in CSV are dropped rather than erroring entire operation
    mode="DROPMALFORMED",
    # Not always perfect but works well in most cases as of 2.1+
    inferSchema=True
)

trips.registerTempTable("tripsSql_1")

In [4]:
trips.show(10)

+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
|Trip ID|Duration|     Start Date|       Start Station|Start Terminal|       End Date|         End Station|End Terminal|Bike #|Subscriber Type|Zip Code|
+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
| 913460|     765|8/31/2015 23:26|Harry Bridges Pla...|            50|8/31/2015 23:39|San Francisco Cal...|          70|   288|     Subscriber|    2139|
| 913459|    1036|8/31/2015 23:11|San Antonio Shopp...|            31|8/31/2015 23:28|Mountain View Cit...|          27|    35|     Subscriber|   95032|
| 913455|     307|8/31/2015 23:13|      Post at Kearny|            47|8/31/2015 23:18|   2nd at South Park|          64|   468|     Subscriber|   94107|
| 913454|     409|8/31/2015 23:10|  San Jose City Hall|            10|8/31/2015 23

## Timestamps

You can use the following functions to cast into a timestamp and to extract parts of it.

In [5]:
from pyspark.sql.functions import date_format, to_date, to_timestamp, year, month, dayofweek, hour

In [6]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [7]:
df = trips.withColumn('time', to_timestamp('Start Date', format='MM/dd/yyyy HH:mm'))
df = df.withColumn('hour', hour('time'))
df = df.withColumn('day', to_date('time'))
df = df.withColumn('month', month('time'))
df = df.withColumn('weekday', dayofweek('time'))

In [8]:
df.select('Start Date', 'time', 'hour', 'day', 'month', 'weekday').show(10)

+---------------+-------------------+----+----------+-----+-------+
|     Start Date|               time|hour|       day|month|weekday|
+---------------+-------------------+----+----------+-----+-------+
|8/31/2015 23:26|2015-08-31 23:26:00|  23|2015-08-31|    8|      2|
|8/31/2015 23:11|2015-08-31 23:11:00|  23|2015-08-31|    8|      2|
|8/31/2015 23:13|2015-08-31 23:13:00|  23|2015-08-31|    8|      2|
|8/31/2015 23:10|2015-08-31 23:10:00|  23|2015-08-31|    8|      2|
|8/31/2015 23:09|2015-08-31 23:09:00|  23|2015-08-31|    8|      2|
|8/31/2015 23:07|2015-08-31 23:07:00|  23|2015-08-31|    8|      2|
|8/31/2015 23:07|2015-08-31 23:07:00|  23|2015-08-31|    8|      2|
|8/31/2015 22:16|2015-08-31 22:16:00|  22|2015-08-31|    8|      2|
|8/31/2015 22:12|2015-08-31 22:12:00|  22|2015-08-31|    8|      2|
|8/31/2015 21:57|2015-08-31 21:57:00|  21|2015-08-31|    8|      2|
+---------------+-------------------+----+----------+-----+-------+
only showing top 10 rows



Our datetime parsing has not been perfect. We can check for missing values. We should really try to cure this. Here let's just drop the missing values.

In [9]:
from pyspark.sql.functions import isnan, when, count, col

df.where(col('day').isNull()).toPandas()

Unnamed: 0,Trip ID,Duration,Start Date,Start Station,Start Terminal,End Date,End Station,End Terminal,Bike #,Subscriber Type,Zip Code,time,hour,day,month,weekday
0,702587,699,3/29/2015 1:43,San Jose Diridon Caltrain Station,2,3/29/2015 1:55,Japantown,9,79,Subscriber,95112,NaT,,,,
1,702585,226,3/29/2015 1:30,Grant Avenue at Columbus Avenue,73,3/29/2015 1:34,Broadway St at Battery St,82,563,Subscriber,94114,NaT,,,,


In [10]:
df.where(col('day').isNull())

DataFrame[Trip ID: int, Duration: int, Start Date: string, Start Station: string, Start Terminal: int, End Date: string, End Station: string, End Terminal: int, Bike #: int, Subscriber Type: string, Zip Code: string, time: timestamp, hour: int, day: date, month: int, weekday: int]

In [11]:
df = df.na.drop()

In [12]:
df.where(col('day').isNull())

DataFrame[Trip ID: int, Duration: int, Start Date: string, Start Station: string, Start Terminal: int, End Date: string, End Station: string, End Terminal: int, Bike #: int, Subscriber Type: string, Zip Code: string, time: timestamp, hour: int, day: date, month: int, weekday: int]

## Print the schema

In [13]:
df.printSchema()

root
 |-- Trip ID: integer (nullable = true)
 |-- Duration: integer (nullable = true)
 |-- Start Date: string (nullable = true)
 |-- Start Station: string (nullable = true)
 |-- Start Terminal: integer (nullable = true)
 |-- End Date: string (nullable = true)
 |-- End Station: string (nullable = true)
 |-- End Terminal: integer (nullable = true)
 |-- Bike #: integer (nullable = true)
 |-- Subscriber Type: string (nullable = true)
 |-- Zip Code: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: date (nullable = true)
 |-- month: integer (nullable = true)
 |-- weekday: integer (nullable = true)



## Create a temporary SQL table from the dataframe

In [14]:
df.createOrReplaceTempView("trips")

## In the following exercises, where possible use both dataframe methods and SQL queries

Hint: In Hive SQL, you can refer to column names including spaces with the following:

```SQL
SELECT `column name` FROM table
```

## Determine the number of observations

In [15]:
df.count()

353872

In [16]:
spark.sql('SELECT COUNT(*) FROM trips').show()

+--------+
|count(1)|
+--------+
|  353872|
+--------+



## Calculate mean, standard deviation, minimum and maximum of the duration column

In [17]:
df.select('Duration').describe().show()

+-------+------------------+
|summary|          Duration|
+-------+------------------+
|  count|            353872|
|   mean| 1043.044581091468|
| stddev|30027.518214567655|
|    min|                60|
|    max|          17270400|
+-------+------------------+



In [18]:
spark.sql('''
SELECT COUNT(*) AS count, 
AVG(Duration) AS mean, 
STDDEV(Duration) AS std, 
MIN(Duration) AS min, 
MAX(Duration) AS max 
FROM trips
''').show()

+------+-----------------+------------------+---+--------+
| count|             mean|               std|min|     max|
+------+-----------------+------------------+---+--------+
|353872|1043.044581091468|30027.518214567655| 60|17270400|
+------+-----------------+------------------+---+--------+



## For how many different days do you have observations?

In [19]:
df.select('day').distinct().count()

365

In [20]:
spark.sql('SELECT COUNT(*) AS count FROM trips').show()

+------+
| count|
+------+
|353872|
+------+



In [21]:
spark.sql("""SELECT count(DISTINCT(day)) FROM trips""").show()

+-------------------+
|count(DISTINCT day)|
+-------------------+
|                365|
+-------------------+



In [22]:
df.select('Duration').rdd.min()

Row(Duration=60)

## What are the first and last observed days?

In [23]:
day = df.select('day')

In [24]:
day.sort('day', ascending=False).show(1)

+----------+
|       day|
+----------+
|2015-08-31|
+----------+
only showing top 1 row



In [25]:
day.sort('day', ascending=True).show(1)

+----------+
|       day|
+----------+
|2014-09-01|
+----------+
only showing top 1 row



In [26]:
spark.sql('''
SELECT MIN(day) AS `first day`, MAX(day) AS `last day` FROM trips
''').show()

+----------+----------+
| first day|  last day|
+----------+----------+
|2014-09-01|2015-08-31|
+----------+----------+



## Obtain the counts of rides per hour

In [27]:
df.groupby('hour').count().sort('hour').show(30)

+----+-----+
|hour|count|
+----+-----+
|   0| 1012|
|   1|  506|
|   2|  281|
|   3|  156|
|   4|  640|
|   5| 1848|
|   6| 8012|
|   7|24742|
|   8|49414|
|   9|34929|
|  10|15306|
|  11|14041|
|  12|15769|
|  13|14652|
|  14|12788|
|  15|16466|
|  16|31813|
|  17|45798|
|  18|30956|
|  19|14899|
|  20| 8245|
|  21| 5738|
|  22| 3658|
|  23| 2203|
+----+-----+



In [28]:
spark.sql('''
SELECT hour, COUNT(*) FROM trips 
GROUP BY hour 
ORDER BY hour
''').show()

+----+--------+
|hour|count(1)|
+----+--------+
|   0|    1012|
|   1|     506|
|   2|     281|
|   3|     156|
|   4|     640|
|   5|    1848|
|   6|    8012|
|   7|   24742|
|   8|   49414|
|   9|   34929|
|  10|   15306|
|  11|   14041|
|  12|   15769|
|  13|   14652|
|  14|   12788|
|  15|   16466|
|  16|   31813|
|  17|   45798|
|  18|   30956|
|  19|   14899|
+----+--------+
only showing top 20 rows



## Obtain the counts per hour averaged over all observed dates

In [29]:
df.groupby('hour', 'day').count().sort('day').select('hour', 'count').groupby('hour').agg({'count': 'mean'}).sort('hour').show(30)

+----+------------------+
|hour|        avg(count)|
+----+------------------+
|   0|3.1138461538461537|
|   1| 2.219298245614035|
|   2|1.7239263803680982|
|   3|1.3333333333333333|
|   4|2.4334600760456273|
|   5| 6.019543973941368|
|   6| 22.95702005730659|
|   7| 67.97252747252747|
|   8|135.75274725274724|
|   9| 95.69589041095891|
|  10| 42.04945054945055|
|  11| 38.46849315068493|
|  12|43.202739726027396|
|  13| 40.14246575342466|
|  14|35.035616438356165|
|  15| 45.11232876712329|
|  16| 87.15890410958905|
|  17|125.47397260273972|
|  18| 84.81095890410958|
|  19| 40.81917808219178|
|  20|22.589041095890412|
|  21|15.807162534435262|
|  22|10.104972375690608|
|  23| 6.188202247191011|
+----+------------------+



In [30]:
spark.sql('''
SELECT hour, AVG(counts) 
FROM (
    SELECT hour, day, COUNT(*) AS counts 
    FROM trips group by hour, day
    ) s 
GROUP BY hour 
ORDER BY hour
''').show(30)

+----+------------------+
|hour|       avg(counts)|
+----+------------------+
|   0|3.1138461538461537|
|   1| 2.219298245614035|
|   2|1.7239263803680982|
|   3|1.3333333333333333|
|   4|2.4334600760456273|
|   5| 6.019543973941368|
|   6| 22.95702005730659|
|   7| 67.97252747252747|
|   8|135.75274725274724|
|   9| 95.69589041095891|
|  10| 42.04945054945055|
|  11| 38.46849315068493|
|  12|43.202739726027396|
|  13| 40.14246575342466|
|  14|35.035616438356165|
|  15| 45.11232876712329|
|  16| 87.15890410958905|
|  17|125.47397260273972|
|  18| 84.81095890410958|
|  19| 40.81917808219178|
|  20|22.589041095890412|
|  21|15.807162534435262|
|  22|10.104972375690608|
|  23| 6.188202247191011|
+----+------------------+



## Obtain the average duration of trips per hour departing from terminal 70 sorted by the hour

In [31]:
df.filter(df['Start Terminal']==70).groupby('hour').agg({'Duration': 'mean'}).sort('hour').show()

+----+------------------+
|hour|     avg(Duration)|
+----+------------------+
|   0|2681.2597402597403|
|   1| 886.7142857142857|
|   2| 809.6666666666666|
|   3|             242.0|
|   4|           29449.5|
|   5|2305.2820512820513|
|   6| 726.3856317093312|
|   7| 750.8696909050478|
|   8| 727.0969317661426|
|   9| 706.4761441090556|
|  10|  872.337684943429|
|  11|1450.3474576271187|
|  12| 2403.685567010309|
|  13| 2514.535117056856|
|  14| 1452.651567944251|
|  15| 906.8820512820513|
|  16| 1006.927950310559|
|  17| 725.6462147451757|
|  18| 728.8278822567457|
|  19|  658.924486803519|
+----+------------------+
only showing top 20 rows



In [32]:
spark.sql('''
SELECT hour, AVG(duration) FROM trips 
WHERE `Start Terminal`=70 
GROUP BY hour 
ORDER BY hour
''').show()

+----+------------------+
|hour|     avg(duration)|
+----+------------------+
|   0|2681.2597402597403|
|   1| 886.7142857142857|
|   2| 809.6666666666666|
|   3|             242.0|
|   4|           29449.5|
|   5|2305.2820512820513|
|   6| 726.3856317093312|
|   7| 750.8696909050478|
|   8| 727.0969317661426|
|   9| 706.4761441090556|
|  10|  872.337684943429|
|  11|1450.3474576271187|
|  12| 2403.685567010309|
|  13| 2514.535117056856|
|  14| 1452.651567944251|
|  15| 906.8820512820513|
|  16| 1006.927950310559|
|  17| 725.6462147451757|
|  18| 728.8278822567457|
|  19|  658.924486803519|
+----+------------------+
only showing top 20 rows

