In [246]:
import pandas as pd
import numpy as np
import glob
import os
import bz2
import codecs
import json
import matplotlib.pylab as plt
import uuid
import random
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, DoubleType, StringType
from pyspark.sql.functions import to_utc_timestamp, unix_timestamp, from_unixtime
%matplotlib inline

In [247]:
conf = SparkConf()

In [248]:
conf.setMaster('local[8]')

<pyspark.conf.SparkConf at 0x7f39a4b6fb38>

In [249]:
sc = SparkContext.getOrCreate(conf=conf)
sql_sc = SQLContext(sc)

In [250]:
schema = StructType([
        StructField('VendorID',IntegerType(),True),
        StructField('tpep_pickup_datetime',TimestampType(),True),
        StructField('tpep_dropoff_datetime',TimestampType(),True),
        StructField('passenger_count',IntegerType(),True),
        StructField('trip_distance',DoubleType(),True),
        StructField('pickup_longitude',DoubleType(),True),
        StructField('pickup_latitude',DoubleType(),True),
        StructField('RateCodeID',IntegerType(),True),
        StructField('store_and_fwd_flag',StringType(),True),
        StructField('dropoff_longitude',DoubleType(),True),
        StructField('dropoff_latitude',DoubleType(),True),
        StructField('payment_type',IntegerType(),True),
        StructField('fare_amount',DoubleType(),True),
        StructField('extra',DoubleType(),True),
        StructField('mta_tax',DoubleType(),True),
        StructField('tip_amount',DoubleType(),True),
        StructField('tolls_amount',DoubleType(),True),
        StructField('improvement_surcharge',DoubleType(),True),
        StructField('total_amount',DoubleType(),True),
    ])

In [251]:
infilename = 'yellow_tripdata_2015-03.csv'
#infilename = 'sample.csv'
raw_sdf = (sql_sc
    .read
    .csv(infilename, header=True, schema=schema, timestampFormat='yyyy-MM-dd HH:mm:ss'))

In [252]:
clean_sdf = raw_sdf.withColumn('tpep_pickup_timestamp_ms',  unix_timestamp(raw_sdf['tpep_pickup_datetime' ])*1000 + 5*60*60*1000)
clean_sdf = clean_sdf.withColumn('tpep_dropoff_timestamp_ms', unix_timestamp(raw_sdf['tpep_dropoff_datetime'])*1000 + 5*60*60*1000)

In [253]:
clean_sdf.limit(1).toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1 entries, 0 to 0
Data columns (total 21 columns):
VendorID                     1 non-null int64
tpep_pickup_datetime         1 non-null datetime64[ns]
tpep_dropoff_datetime        1 non-null datetime64[ns]
passenger_count              1 non-null int64
trip_distance                1 non-null float64
pickup_longitude             1 non-null float64
pickup_latitude              1 non-null float64
RateCodeID                   1 non-null int64
store_and_fwd_flag           1 non-null object
dropoff_longitude            1 non-null float64
dropoff_latitude             1 non-null float64
payment_type                 1 non-null int64
fare_amount                  1 non-null float64
extra                        1 non-null float64
mta_tax                      1 non-null float64
tip_amount                   1 non-null float64
tolls_amount                 1 non-null float64
improvement_surcharge        1 non-null float64
total_amount                 1

In [254]:
clean_sdf.limit(5).toPandas()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,...,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,tpep_pickup_timestamp_ms,tpep_dropoff_timestamp_ms
0,1,2015-03-06 08:02:31,2015-03-06 08:09:55,1,1.2,-73.990211,40.750969,1,N,-73.987892,...,2,7.0,0.0,0.5,0.0,0.0,0.3,7.8,1425646951000,1425647395000
1,1,2015-03-06 08:02:31,2015-03-06 08:15:23,1,3.2,-73.935188,40.80072,1,N,-73.952553,...,2,11.5,0.0,0.5,0.0,0.0,0.3,12.3,1425646951000,1425647723000
2,1,2015-03-06 08:02:31,2015-03-06 08:12:27,1,1.1,-73.963753,40.767937,1,N,-73.956947,...,2,8.0,0.0,0.5,0.0,0.0,0.3,8.8,1425646951000,1425647547000
3,1,2015-03-06 08:02:31,2015-03-06 08:09:09,1,0.8,-73.997177,40.742168,1,N,-74.008064,...,1,6.0,0.0,0.5,1.0,0.0,0.3,7.8,1425646951000,1425647349000
4,1,2015-03-06 08:02:32,2015-03-06 08:19:37,1,2.7,-74.006844,40.730267,1,N,-73.97686,...,1,13.0,0.0,0.5,2.75,0.0,0.3,16.55,1425646952000,1425647977000


In [257]:
end_timestamp = pd.Timestamp('2015-03-03 00:00:00').tz_localize('Etc/GMT+5')
filtered_sdf = clean_sdf.filter('tpep_dropoff_timestamp_ms <= %d' % int(end_timestamp.value / 1e6) )

In [258]:
filtered_sdf.count()

771187

In [259]:
filtered_sdf.limit(5).toPandas()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,...,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,tpep_pickup_timestamp_ms,tpep_dropoff_timestamp_ms
0,2,2015-03-02 11:16:24,2015-03-02 11:37:03,2,3.23,-73.98748,40.738121,1,N,-73.953362,...,1,15.5,0.0,0.5,3.26,0.0,0.3,19.56,1425312984000,1425314223000
1,1,2015-03-01 16:42:33,2015-03-01 16:59:29,1,2.3,-73.979179,40.777756,1,N,-73.953178,...,1,11.5,0.0,0.5,2.45,0.0,0.3,14.75,1425246153000,1425247169000
2,1,2015-03-01 16:42:33,2015-03-01 17:09:10,1,9.2,-73.865578,40.770863,1,N,-73.984856,...,1,26.5,0.0,0.5,6.5,5.33,0.3,39.13,1425246153000,1425247750000
3,1,2015-03-01 16:42:34,2015-03-01 16:44:52,1,0.4,-73.952545,40.772148,1,N,-73.948593,...,2,3.5,0.0,0.5,0.0,0.0,0.3,4.3,1425246154000,1425246292000
4,1,2015-03-01 16:42:34,2015-03-01 16:47:09,1,0.9,-73.954231,40.778606,1,N,-73.947395,...,1,5.0,0.0,0.5,1.15,0.0,0.3,6.95,1425246154000,1425246429000


In [278]:
def create_events(trip_record):
    events = []
    trip_id = str(uuid.uuid4())
    pickup_datetime = trip_record.tpep_pickup_timestamp_ms
    dropoff_datetime = trip_record.tpep_dropoff_timestamp_ms
    events.append({
        'event_type': 'pickup',
        'trip_id': trip_id,
        'trip_fraction': 0.0,
        'trip_duration_minutes': 0.0,
        'VendorID': trip_record.VendorID,
        'timestamp': pickup_datetime,
        'passenger_count': trip_record.passenger_count,
        'trip_distance': 0.0,
        'lat': trip_record.pickup_latitude,
        'lon': trip_record.pickup_longitude,
        'RateCodeID': trip_record.RateCodeID,
        'store_and_fwd_flag': trip_record.store_and_fwd_flag,
        'payment_type': trip_record.payment_type,
        'fare_amount': 0.0,
        'extra': 0.0,
        'mta_tax': 0.0,
        'tip_amount': 0.0,
        'tolls_amount': 0.0,
        'improvement_surcharge': 0.0,
        'total_amount': 0.0,
    })
    
    # Create route events every 1 minute.
    # Assume that route is a straight line from pickup to dropoff.
    # Assume that dollar amounts are spread uniformly throughout the trip.
    trip_duration = dropoff_datetime - pickup_datetime
    report_period_mean = 1*60*1000
    report_timestamp = pickup_datetime
    while True:
        # The next report period is a random number. This allows us to get timestamps with non-zero milliseconds.
        report_period = random.randint(0, 2*report_period_mean)
        report_timestamp += report_period
        if report_timestamp >= dropoff_datetime:
            break
        trip_fraction = (report_timestamp - pickup_datetime) / trip_duration
        events.append({
            'event_type': 'route',
            'trip_id': trip_id,
            'trip_fraction': trip_fraction,
            'trip_duration_minutes': trip_duration * trip_fraction / (60*1000),
            'VendorID': trip_record.VendorID,
            'timestamp': report_timestamp,
            'passenger_count': trip_record.passenger_count,
            'trip_distance': trip_record.trip_distance * trip_fraction,            
            'lat': trip_record.pickup_latitude + (trip_record.dropoff_latitude - trip_record.pickup_latitude) * trip_fraction,
            'lon': trip_record.pickup_longitude + (trip_record.dropoff_longitude - trip_record.pickup_longitude) * trip_fraction,
            'RateCodeID': trip_record.RateCodeID,
            'store_and_fwd_flag': trip_record.store_and_fwd_flag,
            'payment_type': trip_record.payment_type,            
            'fare_amount': trip_record.fare_amount * trip_fraction,
            'extra': trip_record.extra * trip_fraction,
            'mta_tax': trip_record.mta_tax * trip_fraction,
            'tip_amount': trip_record.tip_amount * trip_fraction,
            'tolls_amount': trip_record.tolls_amount * trip_fraction,
            'improvement_surcharge': trip_record.improvement_surcharge * trip_fraction,
            'total_amount': trip_record.total_amount * trip_fraction,
        })

    events.append({
        'event_type': 'dropoff',
        'trip_id': trip_id,
        'trip_fraction': 1.0,
        'trip_duration_minutes': trip_duration / (60*1000),
        'VendorID': trip_record.VendorID,
        'timestamp': dropoff_datetime,
        'passenger_count': trip_record.passenger_count,
        'trip_distance': trip_record.trip_distance,
        'lat': trip_record.dropoff_latitude,
        'lon': trip_record.dropoff_longitude,
        'RateCodeID': trip_record.RateCodeID,
        'store_and_fwd_flag': trip_record.store_and_fwd_flag,
        'payment_type': trip_record.payment_type,
        'fare_amount': trip_record.fare_amount,
        'extra': trip_record.extra,
        'mta_tax': trip_record.mta_tax,
        'tip_amount': trip_record.tip_amount,
        'tolls_amount': trip_record.tolls_amount,
        'improvement_surcharge': trip_record.improvement_surcharge,
        'total_amount': trip_record.total_amount,
    })
    rows = [Row(**event) for event in events]
    return rows

In [279]:
%%time
all_events_rdd = filtered_sdf.rdd.flatMap(create_events)

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 183 µs


In [280]:
all_events_rdd.take(2)

[Row(RateCodeID=1, VendorID=2, event_type='pickup', extra=0.0, fare_amount=0.0, improvement_surcharge=0.0, lat=40.738121032714844, lon=-73.98748016357422, mta_tax=0.0, passenger_count=2, payment_type=1, store_and_fwd_flag='N', timestamp=1425312984000, tip_amount=0.0, tolls_amount=0.0, total_amount=0.0, trip_distance=0.0, trip_duration_minutes=0.0, trip_fraction=0.0, trip_id='e68f5d34-a1af-4a08-9f0f-3171549bff07'),
 Row(RateCodeID=1, VendorID=2, event_type='route', extra=0.0, fare_amount=0.7325407586763519, improvement_surcharge=0.01417820823244552, lat=40.7394738931602, lon=-73.98586769238241, mta_tax=0.023630347054075868, passenger_count=2, payment_type=1, store_and_fwd_flag='N', timestamp=1425313042556, tip_amount=0.15406986279257465, tolls_amount=0.0, total_amount=0.9244191767554479, trip_distance=0.1526520419693301, trip_duration_minutes=0.9759333333333333, trip_fraction=0.047260694108151736, trip_id='e68f5d34-a1af-4a08-9f0f-3171549bff07')]

In [281]:
%%time
all_events_sdf = sql_sc.createDataFrame(all_events_rdd)

CPU times: user 16 ms, sys: 8 ms, total: 24 ms
Wall time: 165 ms


In [282]:
all_events_sdf.show()

+----------+--------+----------+-----+------------------+---------------------+------------------+------------------+--------------------+---------------+------------+------------------+-------------+-------------------+------------+------------------+-------------------+---------------------+--------------------+--------------------+
|RateCodeID|VendorID|event_type|extra|       fare_amount|improvement_surcharge|               lat|               lon|             mta_tax|passenger_count|payment_type|store_and_fwd_flag|    timestamp|         tip_amount|tolls_amount|      total_amount|      trip_distance|trip_duration_minutes|       trip_fraction|             trip_id|
+----------+--------+----------+-----+------------------+---------------------+------------------+------------------+--------------------+---------------+------------+------------------+-------------+-------------------+------------+------------------+-------------------+---------------------+--------------------+-----------

In [283]:
all_events2_sdf = all_events_sdf.withColumn('timestamp_str',  from_unixtime(all_events_sdf['timestamp' ] / 1000))

In [284]:
all_events2_sdf.show()

+----------+--------+----------+-----+------------------+---------------------+------------------+------------------+--------------------+---------------+------------+------------------+-------------+-------------------+------------+------------------+-------------------+---------------------+--------------------+--------------------+-------------------+
|RateCodeID|VendorID|event_type|extra|       fare_amount|improvement_surcharge|               lat|               lon|             mta_tax|passenger_count|payment_type|store_and_fwd_flag|    timestamp|         tip_amount|tolls_amount|      total_amount|      trip_distance|trip_duration_minutes|       trip_fraction|             trip_id|      timestamp_str|
+----------+--------+----------+-----+------------------+---------------------+------------------+------------------+--------------------+---------------+------------+------------------+-------------+-------------------+------------+------------------+-------------------+--------------

In [285]:
%%time
sorted_sdf = all_events2_sdf.orderBy('timestamp')

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 12.4 ms


In [286]:
%%time
sorted_sdf.write.mode('overwrite').format('json').save('data.json')

CPU times: user 84 ms, sys: 40 ms, total: 124 ms
Wall time: 6min 30s


# Playground