In [1]:
sc

<pyspark.context.SparkContext at 0x10b041e90>

In [2]:
import csv
import pandas as pd
from datetime import datetime
from pyspark.sql import functions

In [3]:
def parseCSV(idx, records):
    for row in csv.reader(records):
        direction = 0
        bus = row[7].split('_')[2]
        tripid = row[7].split('_')[1].split('-')[2]
        start = int(row[0].split('T')[1].split(':')[0])
        minute = int(row[0].split('T')[1].split(':')[1])
        t = datetime.strptime(row[0], '%Y-%m-%dT%H:%M:%SZ')  
        date = t.strftime('%Y-%m-%d')
        tm = t.strftime('%Y-%m-%d %H:%M:%S')
        unique_key = str(date) + str(bus) + str(tripid)
        
        # Create bus direction
        # Achilles changes - changed the values for the bearings and switched directions for Q48
        
        if bus == 'BX1':
            if float(row[4]) < 200:
                direction = 1
            else:
                direction = 2
        elif bus == 'BX6':
            if float(row[4]) < 110:
                direction = 1
            else:
                direction = 2
        elif bus == 'BX13':
            if 70 <= float(row[4]) < 150:
                direction = 1
            else:
                direction = 2
        elif bus == 'Q48':
            if 100 <= float(row[4]) < 150:
                direction = 2
            else:
                direction = 1 
                
        if minute < 15:
            interval = str(start) + str(':00-') + str(start) + str(':15')
        elif 15 <= minute < 30:
            interval = str(start) + str(':15-') + str(start) + str(':30')
        elif 30 <= minute < 45:
            interval = str(start) + str(':30-') + str(start) + str(':45')
        elif 45 <= minute < 60:
            interval = str(start) + str(':45-') + str(start+1) + str(':00')

        yield unique_key, tm, bus, tripid, direction, interval

## Source data file. 
# path = '/Users/JordanVani/Documents/NYU/BDM/nyc-bus-delay-event/Data/BDM_BusData.csv'
path = '/Users/ianstuart/Google Drive/CUSP Courses/Big Data/Project/nyc-bus-delay-event/Data/output.csv'

## Parse datafile to RDD.
data = sc.textFile(path).mapPartitionsWithIndex(parseCSV)

## For each unique bus line, calculate route start time.
min_by_group = (data
                .map(lambda x: (x[0], x[0:6]))
                .reduceByKey(lambda x1, x2: min(x1, x2, key=lambda x: x[1]))
                .values()
                .map(lambda x: (x[0], (x[1:6]))))

## For each unique bus line, calculate route end time.
max_by_group = (data
                .map(lambda x: (x[0], x[0:2]))
                .reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[1]))
                .values())

## Join start and stop times.
rdd = min_by_group.join(max_by_group)
rdd = rdd.flatMap(lambda x: [[x[0], x[1][0][0], x[1][1], x[1][0][1],
                              x[1][0][2], x[1][0][4]]])

# Calculate duration of bus.
time_diff = rdd.toDF(['id', 'start', 'stop', 'bus', 'tripid', 'interval'])
time_diff = time_diff.select('id', time_diff['start'].cast('timestamp'),
                             time_diff['stop'].cast('timestamp'), 'bus', 'tripid', 'interval')
timeDiff = (functions.unix_timestamp('stop', format="yyyy-MM-dd HH:mm:ss")
            - functions.unix_timestamp('start', format="yyyy-MM-dd HH:mm:ss"))
time_diff = time_diff.withColumn('duration', timeDiff)

# Calculate mean direction
trip_dir = data.toDF(['id_', 'time', 'bus', 'tripid', 'direction', 'interval'])
trip_dir = trip_dir.groupby("id_").agg({'direction': 'avg'})

# Join direction back to data.
master = time_diff.join(trip_dir, time_diff.id == trip_dir.id_, how='left_outer')
master = master.select('id', 'start', 'bus', 'tripid', 'interval', 'duration', 
                       functions.col('avg(direction)').cast('int').alias('direction'))

In [4]:
master.show(5)

+--------------------+--------------------+----+------+-----------+--------+---------+
|                  id|               start| bus|tripid|   interval|duration|direction|
+--------------------+--------------------+----+------+-----------+--------+---------+
| 2014-08-12BX6040500|2014-08-12 10:50:...| BX6|040500|10:45-11:00|    3516|        1|
| 2014-08-12BX6082600|2014-08-12 17:48:...| BX6|082600|17:45-18:00|    3716|        1|
|2014-08-12BX13099600|2014-08-12 20:37:...|BX13|099600|20:30-20:45|    3298|        2|
|2014-08-12BX13073800|2014-08-12 16:25:...|BX13|073800|16:15-16:30|    2329|        1|
|2014-08-12BX13076800|2014-08-12 16:37:...|BX13|076800|16:30-16:45|    3225|        2|
+--------------------+--------------------+----+------+-----------+--------+---------+
only showing top 5 rows



In [9]:
master.count()

815

In [15]:
rdd_times = (master
             .groupby("bus", functions.date_format('start', 'yyyy-MM-dd').alias('date'), 
                      "direction", "interval")
             .agg({"duration": "avg", "id": "count"}))

In [None]:
(rdd_times
 .sort(functions.col('bus'), functions.col('date'), functions.col('interval'))
 .show(100))

In [16]:
# read in schedule data
schedules = sc.textFile('game_schedules/combined_schedules.csv').map(lambda line: line.split(","))

In [17]:
# convert schedule data to dataframe
sched_df = schedules.toDF(['index','Home team','starttime','endtime','startwindow_start', \
                               'startwindow_end','endwindow_start','endwindow_end'])
sched_df.head(3)

In [8]:
sched_df.printSchema()

root
 |-- index: string (nullable = true)
 |-- Home team: string (nullable = true)
 |-- starttime: string (nullable = true)
 |-- endtime: string (nullable = true)
 |-- startwindow_start: string (nullable = true)
 |-- startwindow_end: string (nullable = true)
 |-- endwindow_start: string (nullable = true)
 |-- endwindow_end: string (nullable = true)



In [9]:
master.printSchema()

root
 |-- id: string (nullable = true)
 |-- start: timestamp (nullable = true)
 |-- bus: string (nullable = true)
 |-- tripid: string (nullable = true)
 |-- interval: string (nullable = true)
 |-- duration: long (nullable = true)
 |-- direction: integer (nullable = true)



In [18]:
# convert all time columns to type 'timestamp'
for col in sched_df.columns[2:]:
    sched_df = sched_df.withColumn(col, sched_df[col].cast('timestamp'))
sched_df.printSchema()

In [19]:
# add a 'date' column to use for join with rdd_times
sched_df = sched_df.withColumn('date', functions.date_format('starttime', 'yyyy-MM-dd'))

In [20]:
# join schedule data to bus interval data
joined_df = rdd_times.join(sched_df, 'date', 'left')
joined_df.take(3)

In [62]:
joined_df.count()

307

In [21]:
joined_df = joined_df.drop('index', 'Home team', 'starttime', 'endtime')
joined_df.printSchema()

In [22]:
from pyspark.sql.functions import lit, concat, col

joined_df = joined_df.withColumn('time', functions.split(joined_df.interval, '-')[0])
joined_df = joined_df.withColumn('start', functions.concat(col('date'), lit(' '), \
                                                           col('time')).cast('timestamp'))
joined_df.take(3)

In [23]:
joined_df = joined_df.drop('time')
joined_df.printSchema()

In [24]:
from pyspark.sql.functions import udf

def checkWindows(start, start_start, start_end, end_start, end_end):
    if (start >= start_start) & (start <= start_end) | \
            (start >= end_start) & (start <= end_end):
        return 1
    else:
        return 0

labelFunc = udf(checkWindows)

In [25]:
labeled_df = joined_df.withColumn('is_game_bus', labelFunc(joined_df.start, \
                                                    joined_df.startwindow_start, \
                                                    joined_df.startwindow_end, \
                                                    joined_df.endwindow_start, \
                                                    joined_df.endwindow_end))

In [26]:
labeled_df.take(5)

[Row(date=u'2014-08-12', bus=u'BX1', direction=2, interval=u'11:00-11:15', avg(duration)=1352.0, count(id)=1, startwindow_start=datetime.datetime(2014, 8, 12, 17, 40), startwindow_end=datetime.datetime(2014, 8, 12, 19, 40), endwindow_start=datetime.datetime(2014, 8, 12, 21, 36), endwindow_end=datetime.datetime(2014, 8, 12, 23, 36), start=datetime.datetime(2014, 8, 12, 11, 0), is_game_bus=u'0'),
 Row(date=u'2014-08-12', bus=u'BX1', direction=1, interval=u'9:45-10:00', avg(duration)=3688.0, count(id)=1, startwindow_start=datetime.datetime(2014, 8, 12, 17, 40), startwindow_end=datetime.datetime(2014, 8, 12, 19, 40), endwindow_start=datetime.datetime(2014, 8, 12, 21, 36), endwindow_end=datetime.datetime(2014, 8, 12, 23, 36), start=datetime.datetime(2014, 8, 12, 9, 45), is_game_bus=u'0'),
 Row(date=u'2014-08-12', bus=u'BX1', direction=1, interval=u'10:45-11:00', avg(duration)=3403.0, count(id)=1, startwindow_start=datetime.datetime(2014, 8, 12, 17, 40), startwindow_end=datetime.datetime(201