In [1]:
sc

<pyspark.context.SparkContext at 0x7f025f0b7c50>

In [2]:
taxi = sc.textFile('yellow.csv.gz')
bike = sc.textFile('citibike.csv')

In [3]:
list(enumerate(bike.take(1)[0].split(',')))

[(0, u'cartodb_id'),
 (1, u'the_geom'),
 (2, u'tripduration'),
 (3, u'starttime'),
 (4, u'stoptime'),
 (5, u'start_station_id'),
 (6, u'start_station_name'),
 (7, u'start_station_latitude'),
 (8, u'start_station_longitude'),
 (9, u'end_station_id'),
 (10, u'end_station_name'),
 (11, u'end_station_latitude'),
 (12, u'end_station_longitude'),
 (13, u'bikeid'),
 (14, u'usertype'),
 (15, u'birth_year'),
 (16, u'gender')]

In [4]:
bike.take(2)

[u'cartodb_id,the_geom,tripduration,starttime,stoptime,start_station_id,start_station_name,start_station_latitude,start_station_longitude,end_station_id,end_station_name,end_station_latitude,end_station_longitude,bikeid,usertype,birth_year,gender',
 u'1,,801,2015-02-01 00:00:00+00,2015-02-01 00:14:00+00,521,8 Ave & W 31 St,40.75044999,-73.99481051,423,W 54 St & 9 Ave,40.76584941,-73.98690506,17131,Subscriber,1978,2']

In [5]:
def filterBike(pId, lines):
    import csv
    for row in csv.reader(lines):
        if (row[6] == 'Greenwich Ave & 8 Ave' and row[3].startswith('2015-02-01')):
            yield (row[3][:19])
    
gbike = bike.mapPartitionsWithIndex(filterBike).cache()
gbike.take(5)

['2015-02-01 00:05:00',
 '2015-02-01 00:05:00',
 '2015-02-01 00:50:00',
 '2015-02-01 01:30:00',
 '2015-02-01 03:28:00']

In [6]:
bike.filter(lambda x: 'Greenwich Ave & 8 Ave' in x).first()

u'10,,585,2015-02-01 00:05:00+00,2015-02-01 00:15:00+00,284,Greenwich Ave & 8 Ave,40.73901691,-74.00263761,444,Broadway & W 24 St,40.7423543,-73.98915076,14843,Subscriber,1982,1'

In [7]:
gLoc = (40.73901691,-74.00263761)

In [8]:
def filterTaxi(pId, lines):
    if pId == 0:
        next(lines)
    
    import csv
    import pyproj
    proj = pyproj.Proj(init="epsg:2263", preserve_units=True)  
    gLoc = proj(-74.00263761, 40.73901691)
    sqm = 1320 ** 2 
    for row in csv.reader(lines):
        try:
            dropoff = proj(float(row[5]), float(row[4]))
        except:
            continue
        distance = (dropoff[0]-gLoc[0]) ** 2 + (dropoff[1]-gLoc[1]) ** 2
        if distance < sqm:
            yield row[1][:19]
    
gTaxi = taxi.mapPartitionsWithIndex(filterTaxi).cache()
gTaxi.take(5)

['2015-02-01 00:11:03',
 '2015-02-01 00:10:23',
 '2015-02-01 00:16:36',
 '2015-02-01 00:10:14',
 '2015-02-01 00:10:12']

In [9]:
lBikes = gbike.collect()
lTaxis = gTaxi.collect()
print(lBikes[0])
print(lTaxis[0])

2015-02-01 00:05:00
2015-02-01 00:11:03


In [10]:
import datetime
count = 0
for b in lBikes:
    # Convert the datetime string to a datetime object
    bt = datetime.datetime.strptime(b, '%Y-%m-%d %H:%M:%S')
    for t in lTaxis:
        # Convert the datetime string to a datetime object
        tt = datetime.datetime.strptime(t, '%Y-%m-%d %H:%M:%S')
        
        diff = (bt-tt).total_seconds()
        
        if diff > 0 and diff < 600:
            count += 1
            break

print(count)

65


In [11]:
# Using RDD

gAll = (gTaxi.map(lambda x: (x, 0)) + gbike.map(lambda x: (x, 1)))
gAll.sortByKey().take(5)

[('2015-02-01 00:03:12', 0),
 ('2015-02-01 00:04:39', 0),
 ('2015-02-01 00:05:00', 1),
 ('2015-02-01 00:05:00', 1),
 ('2015-02-01 00:05:38', 0)]

In [17]:
def findTrips(_, records):
    import datetime
    lastTaxiTime = None
    for dt, event in records:
        t = datetime.datetime.strptime(dt, '%Y-%m-%d %H:%M:%S')
        if event == 1:
            if lastTaxiTime != None:
                if (t - lastTaxiTime).total_seconds() < 600:
                    yield(dt,event)
        else:
            lastTaxiTime = t
        
gAll.sortByKey().mapPartitionsWithIndex(findTrips).count()

65

# Using SQL

In [13]:
df = sqlContext.createDataFrame(gAll,('time','event'))

In [15]:
df1 = df.select(df['time'].cast('timestamp').cast('long').alias('epoch'),'event')

In [16]:
df1.registerTempTable('gAll')

In [23]:
query = '''
SELECT sum(has_taxi)
FROM(
    SELECT event, 1-min(event) over (ORDER BY epoch
                   RANGE BETWEEN 600 PRECEDING
                   AND CURRENT ROW)
        As has_taxi
FROM gALL
) newGAll
WHERE event = 1
'''
sqlContext.sql(query).show()

+-------------+
|sum(has_taxi)|
+-------------+
|           65|
+-------------+



# Using DataFrame

In [24]:
import pyspark.sql.functions as sf
import pyspark.sql.window as sw

In [32]:
window = sw.Window.orderBy('epoch').rangeBetween(-10,-1)
df2 = df1.select('event', (1-sf.min(df1['event']).over(window))\
                           .alias('has_taxi')) \
                           .filter(df1['event']==1) \
                           .select(sf.sum(sf.col('has_taxi')))
df2.show()

+-------------+
|sum(has_taxi)|
+-------------+
|           42|
+-------------+

