This notebook reads in two tables of data from the MBTA (data in GTFS format; available [here](http://transitfeeds.com/p/mbta/64)), joins the tables, then outputs the result in a Parquet table.

The resulting Parquet table is used in [this tutorial](https://medium.com/@adamj9431/upload-files-to-ibm-data-science-experience-using-the-command-line-bb482fae1d73#.1z6ph8kag).

In [98]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
projectPath = "/Users/adam/Projects/mbta-data/gtfs/"

Import a csv file using the Databricks reader

In [99]:
csvPath = projectPath + "stop_times.txt"
stopTimes = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(csvPath)

In [100]:
stopTimes.printSchema()
stopTimes.first()

root
 |-- trip_id: string (nullable = true)
 |-- arrival_time: string (nullable = true)
 |-- departure_time: string (nullable = true)
 |-- stop_id: string (nullable = true)
 |-- stop_sequence: integer (nullable = true)
 |-- stop_headsign: string (nullable = true)
 |-- pickup_type: integer (nullable = true)
 |-- drop_off_type: integer (nullable = true)



Row(trip_id=u'29063613', arrival_time=u'14:32:00', departure_time=u'14:32:00', stop_id=u'12295', stop_sequence=1, stop_headsign=u'', pickup_type=0, drop_off_type=1)

Modify the schema

In [101]:
stopTimes = stopTimes.select(col('trip_id').astype(IntegerType()),
                 col('arrival_time'),
                 col('departure_time'),
                 col('stop_id').astype(IntegerType()),
                 col('stop_sequence'),
                 col('stop_headsign'),
                 col('pickup_type'),
                 col('drop_off_type'))

In [102]:
csvPath = projectPath + "stops.txt"
stops = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(csvPath)

In [103]:
stops = stops.select('*', col('stop_id').astype(IntegerType()).alias('stop_id_int')).drop(stops.stop_id)

In [104]:
stopsJoined = stopTimes.join(stops, stopTimes.stop_id==stops.stop_id_int).drop('stop_id_int')

In [105]:
stopsJoined.printSchema()

root
 |-- trip_id: integer (nullable = true)
 |-- arrival_time: string (nullable = true)
 |-- departure_time: string (nullable = true)
 |-- stop_id: integer (nullable = true)
 |-- stop_sequence: integer (nullable = true)
 |-- stop_headsign: string (nullable = true)
 |-- pickup_type: integer (nullable = true)
 |-- drop_off_type: integer (nullable = true)
 |-- stop_code: integer (nullable = true)
 |-- stop_name: string (nullable = true)
 |-- stop_desc: string (nullable = true)
 |-- stop_lat: double (nullable = true)
 |-- stop_lon: double (nullable = true)
 |-- zone_id: string (nullable = true)
 |-- stop_url: string (nullable = true)
 |-- location_type: integer (nullable = true)
 |-- parent_station: string (nullable = true)
 |-- wheelchair_boarding: integer (nullable = true)



In [106]:
stopsJoined.repartition(30).write.parquet(projectPath + "mbtaStopFreq.parquet")