In [1]:
# Load external packages programatically
import os
# packages = "com.databricks:spark-xml_2.11:0.5.0"

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["JAVA_HOME"]

# os.environ["PYSPARK_SUBMIT_ARGS"] = (
#     "--packages {0} pyspark-shell".format(packages)
# )

'/usr/lib/jvm/java-8-openjdk-amd64'

In [2]:
import pyspark

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = (SparkSession.builder
    .master("local[*]")
    .config("spark.driver.cores", 1)
    .appName("Bike rental")
    .getOrCreate())
spark

# Bike Dataset

In this example we are going to explore a dataset generated from a bike rental system deployed in San Francisco area. The dataset consist of two data sources: 
- information about the renting stations
- information about trips done using this service

Next, we will explore both of them and compute some new information

## Stations

### Initial exploration

The first dataset contains information about the renting stations. We can use spark's csv reader to take a look at the data. Because the dataset contains an initial entry with the field names, we must provide the reader with the corresponding option take the header into account.

In [3]:
stations = spark.read.option("header", "true").csv("../data/bike-data/201508_station_data.csv")

In [4]:
stations.show(2)

+----------+--------------------+---------+-----------+---------+--------+------------+
|station_id|                name|      lat|       long|dockcount|landmark|installation|
+----------+--------------------+---------+-----------+---------+--------+------------+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|San Jose|    8/6/2013|
|         3|San Jose Civic Ce...|37.330698|-121.888979|       15|San Jose|    8/5/2013|
+----------+--------------------+---------+-----------+---------+--------+------------+
only showing top 2 rows



In [4]:
stations.printSchema()

root
 |-- station_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- dockcount: string (nullable = true)
 |-- landmark: string (nullable = true)
 |-- installation: string (nullable = true)



As we can see in the output of the previous statements, the schema inference has not worked very well. Moreover, the installation date uses a non standard format.

To get the most of our process we will provide a custom schema to coherce the data types to the proper ones. In addition, we also pass the `dateFormat` option to the Spark DataFrameReaer to parse the installation data values.

In [5]:
stationSchema = StructType([StructField("station_id", ByteType(), False), 
                           StructField("name", StringType(), False),
                           StructField("lat", DoubleType(), False),
                           StructField("long", DoubleType(), False),
                           StructField("dockcount", IntegerType(), False),
                           StructField("landmark", StringType(), False),
                           StructField("installation", DateType(), False)])

In [7]:
stations = spark.read.option("header", "true").option("dateFormat", "MM/dd/yyyy").csv("../data/bike-data/201508_station_data.csv", schema=stationSchema)

In [8]:
stations.printSchema()

root
 |-- station_id: byte (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dockcount: integer (nullable = true)
 |-- landmark: string (nullable = true)
 |-- installation: date (nullable = true)



After providing the proper schema we are able to load the dataset wihtout formatting issues

In [9]:
stations.show(truncate=False)

+----------+---------------------------------+---------+-----------+---------+------------+------------+
|station_id|name                             |lat      |long       |dockcount|landmark    |installation|
+----------+---------------------------------+---------+-----------+---------+------------+------------+
|2         |San Jose Diridon Caltrain Station|37.329732|-121.901782|27       |San Jose    |2013-08-06  |
|3         |San Jose Civic Center            |37.330698|-121.888979|15       |San Jose    |2013-08-05  |
|4         |Santa Clara at Almaden           |37.333988|-121.894902|11       |San Jose    |2013-08-06  |
|5         |Adobe on Almaden                 |37.331415|-121.8932  |19       |San Jose    |2013-08-05  |
|6         |San Pedro Square                 |37.336721|-121.894074|15       |San Jose    |2013-08-07  |
|7         |Paseo de San Antonio             |37.333798|-121.886943|15       |San Jose    |2013-08-07  |
|8         |San Salvador at 1st              |37.330165

In [10]:
stations.count()

70

### Exercise
The station dataset contains information about the location and characteristics of the stations installed for the rental service.

Let's do a little summary to compute for each landmark the date when the first station was deployed, the date of the last update and the total number of docks available for the area so far.

In [11]:
landmarks = (
    stations.
    groupBy("landmark").
    agg(
        min("installation").alias("service_start"),
        max("installation").alias("last_update"),
        sum("dockcount").alias("total_docks")
    )
)

In [11]:
(landmarks.
 orderBy(col("service_start")).
 show()
)

+-------------+-------------+-----------+-----------+
|     landmark|service_start|last_update|total_docks|
+-------------+-------------+-----------+-----------+
|     San Jose|   2013-08-05| 2014-04-09|        264|
| Redwood City|   2013-08-12| 2014-02-20|        115|
|    Palo Alto|   2013-08-14| 2013-08-15|         75|
|Mountain View|   2013-08-15| 2013-12-31|        117|
|San Francisco|   2013-08-19| 2014-01-22|        665|
+-------------+-------------+-----------+-----------+



## Trips

### Initial exploration

The second dataset contains information about registered trips using the rental service.

Again, we make use of the csv reader to take out the initial exploration.


In [12]:
trips = spark.read.option("header", "true").csv("../data/bike-data/201508_trip_data.csv")

In [13]:
trips.show(2)

+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
|Trip ID|Duration|     Start Date|       Start Station|Start Terminal|       End Date|         End Station|End Terminal|Bike #|Subscriber Type|Zip Code|
+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
| 913460|     765|8/31/2015 23:26|Harry Bridges Pla...|            50|8/31/2015 23:39|San Francisco Cal...|          70|   288|     Subscriber|    2139|
| 913459|    1036|8/31/2015 23:11|San Antonio Shopp...|            31|8/31/2015 23:28|Mountain View Cit...|          27|    35|     Subscriber|   95032|
+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
only showing top 2 rows



In [14]:
trips.printSchema()

root
 |-- Trip ID: string (nullable = true)
 |-- Duration: string (nullable = true)
 |-- Start Date: string (nullable = true)
 |-- Start Station: string (nullable = true)
 |-- Start Terminal: string (nullable = true)
 |-- End Date: string (nullable = true)
 |-- End Station: string (nullable = true)
 |-- End Terminal: string (nullable = true)
 |-- Bike #: string (nullable = true)
 |-- Subscriber Type: string (nullable = true)
 |-- Zip Code: string (nullable = true)



As we can see from the previous execution, field types are not inferred and the format of the timesatmps is not an standard one. To parse it propertly we will define the schema manually and also provide the `timestampFormat` option to the DataFrameReader

In [15]:
tripSchema = StructType([StructField("trip_id", IntegerType(), False), 
                         StructField("duration", IntegerType(), False),
                         StructField("start_date", TimestampType(), False),
                         StructField("start_station", StringType(), False),
                         StructField("start_terminal", ByteType(), False),
                         StructField("end_date", TimestampType(), False),
                         StructField("end_station", StringType(), False),
                         StructField("end_terminal", ByteType(), False),
                         StructField("bike", IntegerType(), False),
                         StructField("subscriber_type", StringType(), False),
                         StructField("zip_code", IntegerType(), False)])

In [16]:
trips = (spark.read.option("header", "true")
         .option("timestampFormat", "MM/dd/yyyy HH:mm")
         .csv("../data/bike-data/201508_trip_data.csv", schema=tripSchema)
        )

In [17]:
trips.show(truncate = True)

+-------+--------+-------------------+--------------------+--------------+-------------------+--------------------+------------+----+---------------+--------+
|trip_id|duration|         start_date|       start_station|start_terminal|           end_date|         end_station|end_terminal|bike|subscriber_type|zip_code|
+-------+--------+-------------------+--------------------+--------------+-------------------+--------------------+------------+----+---------------+--------+
| 913460|     765|2015-08-31 23:26:00|Harry Bridges Pla...|            50|2015-08-31 23:39:00|San Francisco Cal...|          70| 288|     Subscriber|    2139|
| 913459|    1036|2015-08-31 23:11:00|San Antonio Shopp...|            31|2015-08-31 23:28:00|Mountain View Cit...|          27|  35|     Subscriber|   95032|
| 913455|     307|2015-08-31 23:13:00|      Post at Kearny|            47|2015-08-31 23:18:00|   2nd at South Park|          64| 468|     Subscriber|   94107|
| 913454|     409|2015-08-31 23:10:00|  San Jo

In [18]:
trips.count()

354152

### Exercise

Compute the total number of trips, the total trip duration and the average trip duration for each bike, and display a ranking for the top 5 most used with the corresponding stats 

In [19]:
bike_info = (
    trips.
    groupBy("bike").
    agg(
        count("*").alias("total"),
        (round(sum("duration")/3600,2)).alias("total_duration(hours)"),
        (round(avg("duration")/60, 2)).alias("avg_duration(mins)")
    )
)

In [20]:
(bike_info.
 orderBy(bike_info.total.desc()).
 show(5))

+----+-----+---------------------+------------------+
|bike|total|total_duration(hours)|avg_duration(mins)|
+----+-----+---------------------+------------------+
| 878| 1121|               279.67|             14.97|
| 392| 1102|               284.41|             15.49|
| 489| 1101|               238.35|             12.99|
| 463| 1085|               279.98|             15.48|
| 532| 1074|               237.33|             13.26|
+----+-----+---------------------+------------------+
only showing top 5 rows



Display a summary of the aggretated dataset containing information about how the bikes are used.

In [21]:
bike_info.drop("bike").describe().show()

+-------+-----------------+---------------------+------------------+
|summary|            total|total_duration(hours)|avg_duration(mins)|
+-------+-----------------+---------------------+------------------+
|  count|              668|                  668|               668|
|   mean|530.1676646706587|   154.04802395209586| 22.65806886227546|
| stddev|398.3555876917163|   210.80905043525698|32.218895096086726|
|    min|                4|                 0.54|              4.63|
|    max|             1121|               4920.8|            646.06|
+-------+-----------------+---------------------+------------------+



If we want to know how individual trips look like, we can describe the initial dataset before being aggregated.

In [22]:
trips.select((col('duration')/60).alias('duration(mins)')).describe().show()

+-------+------------------+
|summary|    duration(mins)|
+-------+------------------+
|  count|            354152|
|   mean|17.433877685287616|
| stddev| 500.2822692821596|
|    min|               1.0|
|    max|          287840.0|
+-------+------------------+



### Exercise

Compute the minimun distance traveled for a bike trip. We will consider the minimum trip distance to the distance between the starting and ending stations.

We will use the haversine distante provided to compute the distance between two geographical points stated by their (long, lat) coordinates.

In [23]:
from math import radians, cos, sin, asin, sqrt

def haversine(lon1, lat1, lon2, lat2):
    """
    Calculate the great circle distance between two points 
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians 
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    r = 6371 # Radius of earth in kilometers. Use 3956 for miles
    return c * r

In [24]:
start_coords = stations.selectExpr("station_id as start_terminal", "lat as start_lat", "long as start_long")

In [25]:
end_coords = stations.selectExpr("station_id as end_terminal", "lat as end_lat", "long as end_long")

In [26]:
trip_coords = (
    trips.
    select('trip_id', 'bike', 'start_terminal', 'end_terminal', 'duration').
    join(start_coords, 'start_terminal', 'inner').
    join(end_coords, 'end_terminal') # default join type is inner
)

In [27]:
trip_coords.show(10)

+------------+--------------+-------+----+--------+---------+-----------+---------+-----------+
|end_terminal|start_terminal|trip_id|bike|duration|start_lat| start_long|  end_lat|   end_long|
+------------+--------------+-------+----+--------+---------+-----------+---------+-----------+
|          70|            50| 913460| 288|     765|37.795392|-122.394203|37.776617| -122.39526|
|          27|            31| 913459|  35|    1036|37.400443|-122.108338|37.389218|-122.081896|
|          64|            47| 913455| 468|     307|37.788975|-122.403452|37.782259|-122.392738|
|           8|            10| 913454|  68|     409|37.337391|-121.886995|37.330165|-121.885831|
|          60|            51| 913453| 487|     789|37.791464|-122.391034| 37.80477|-122.403234|
|          70|            68| 913452| 538|     293|37.784878|-122.401014|37.776617| -122.39526|
|          60|            51| 913451| 363|     896|37.791464|-122.391034| 37.80477|-122.403234|
|          74|            60| 913450| 47

In [28]:
haversine_udf = udf(haversine, DoubleType())

In [29]:
bike_trips = (
    trip_coords.
    select(
        'trip_id',
        'bike',
        'duration',
        haversine_udf('start_long', 'start_lat', 'end_long', 'end_lat').alias('distance')
    )
).cache()

In [30]:
bike_trips.show()

+-------+----+--------+------------------+
|trip_id|bike|duration|          distance|
+-------+----+--------+------------------+
| 913460| 288|     765| 2.089750124830001|
| 913459|  35|    1036| 2.648470367539097|
| 913455| 468|     307|1.2017337002978108|
| 913454|  68|     409|0.8100583165374903|
| 913453| 487|     789|1.8270577565764574|
| 913452| 538|     293| 1.048574821138317|
| 913451| 363|     896|1.8270577565764574|
| 913450| 470|     255| 1.412517635789928|
| 913449| 439|     126|0.3507874570701799|
| 913448| 472|     932| 2.082182676793992|
| 913443| 434|     691|1.6974508788205545|
| 913442| 531|     633|1.9445448794935296|
| 913441| 383|     387|1.3731493115377875|
| 913440| 621|     281|0.9914240938119777|
| 913435| 602|     424|1.4646847050411294|
| 913434| 521|     283|0.8862659380289881|
| 913433|  75|     145|0.5530861275207816|
| 913432| 426|     703|1.5816856581295398|
| 913431| 572|     605|1.4752133424244953|
| 913429| 501|     902| 2.487045024418867|
+-------+--

In [31]:
from pyspark.sql import Window

window = Window.partitionBy('bike').orderBy('trip_id')
cum_trips = (
    bike_trips.
    select(
        'trip_id',
        'bike',
        'distance',
        sum('distance').over(window).alias('cum_distance'),
        'duration',
        sum('duration').over(window).alias('cum_duration')
    )
)

In [32]:
cum_trips.show()

+-------+----+-------------------+------------------+--------+------------+
|trip_id|bike|           distance|      cum_distance|duration|cum_duration|
+-------+----+-------------------+------------------+--------+------------+
| 440922| 148| 1.6384844866675312|1.6384844866675312|     454|         454|
| 447259| 148| 1.5811076468914576|3.2195921335589888|     576|        1030|
| 447608| 148| 1.0336243296938776| 4.253216463252866|     367|        1397|
| 448415| 148| 1.3378747983775716| 5.591091261630438|     588|        1985|
| 452165| 148| 1.4348651358554794| 7.025956397485917|     405|        2390|
| 460867| 148|  0.764619930007617| 7.790576327493534|     394|        2784|
| 462950| 148|                0.0| 7.790576327493534|    5545|        8329|
| 472254| 148|                0.0| 7.790576327493534|    5055|       13384|
| 476469| 148| 0.8071258491882103| 8.597702176681745|     304|       13688|
| 476640| 148| 0.6814078572748856|  9.27911003395663|     206|       13894|
| 477261| 14

### Optional exercises

#### Exercise

compute a distance Look Up Table between stations and then use it to compute the trip distances.

> Note: optionally you can use functions.broadcast over the LUT dataset to hint spark about using a broadcast join

#### Exercise

Plot the displacements done by the bike that has traveled the longest distance.