In [60]:
trip_data_file = 'data/trip_data_1.csv'

In [61]:
trip_data = sqlContext.read.format('com.databricks.spark.csv') \
    .options(header='true') \
    .load(trip_data_file)

In [62]:
trip_data.registerTempTable('trip')

# Average trip time

In [63]:
sqlContext.sql('SELECT AVG(trip_time_in_secs) AS trip_time_avg from trip') \
    .show()

+-----------------+
|    trip_time_avg|
+-----------------+
|683.4235930894863|
+-----------------+



# Driver statistics

In [64]:
import itertools
import math

from collections import namedtuple
from datetime import datetime


Interval = namedtuple('Interval', ['pickup', 'dropoff'])

time_fmt = '%Y-%m-%d %H:%M:%S'


def extract_interval(row):
    # Order matters.
    interval_str = row['pickup_datetime'], row['dropoff_datetime']
    return Interval(*map(lambda s: datetime.strptime(s, time_fmt), 
                         interval_str))

intervals = trip_data \
    .map(lambda row: (row['medallion'], extract_interval(row))) \
    .groupByKey()
    
    
def calculate_trip_per_hour(intervals):
    # Sort and group by days. Use the duration between the last dropoff
    # and first pickup as the working hours in a day, instead of 24 hours.
    day_intervals = [list(g) 
                     for _, g 
                     in itertools.groupby(sorted(intervals), 
                                          key=lambda ip: ip[0].date())]
    
    trip_num, intervals_in_hour = 0, 0
    for trips in day_intervals:
        day_interval = (trips[-1].dropoff - trips[0].pickup).total_seconds()
        # Sometimes it could be zero, so ignore such case.
        if day_interval == 0:
            continue
        trip_num += len(trips)
        intervals_in_hour += math.ceil(day_interval / 3600)
    
    if intervals_in_hour == 0: 
        # Anomaly.
        return 0
    else:
        return float(trip_num) / intervals_in_hour

driver_num = intervals.count()    
trips_per_hour_sum = intervals \
    .mapValues(calculate_trip_per_hour) \
    .values() \
    .sum()

print('Trips per hour in average: %f' % (trips_per_hour_sum / driver_num))

# Calculation Duration: 4.6 min (groupByKey) + 17 s (mapValues).

Trips per hour in average: 1.900029


In [65]:
def calculate_utilization(intervals):
    # Sort and group by days. Use the duration between the last dropoff
    # and first pickup as the working hours in a day, instead of 24 hours.
    day_intervals = [list(g) 
                     for _, g 
                     in itertools.groupby(sorted(intervals), 
                                          key=lambda ip: ip[0].date())]
    
    trip_duration, total_interval = 0.0, 0.0
    for trips in day_intervals:
        day_interval = (trips[-1].dropoff - trips[0].pickup).total_seconds()
        # Sometimes it could be zero, so ignore such case.
        if day_interval == 0:
            continue
        total_interval += day_interval
        for pickup, dropoff in trips:
            trip_duration += (dropoff - pickup).total_seconds()
    
    if total_interval == 0:
        # Anomaly.
        return 0
    else:
        return trip_duration / total_interval
    
utilization_sum = intervals \
    .mapValues(calculate_utilization) \
    .values() \
    .sum()
    
print('Driver utilization in average: %f' %(utilization_sum / driver_num))

# Calculation Duration: 15 s.

Driver utilization in average: 0.378233


# Others

Other interesting metrics include:

[TODO]