In [1]:
import os
os.environ['PYSPARK_PYTHON'] = '/nfshome/lj1230/.conda/envs/myEnv/bin/python3.5'

from pyspark import SparkContext
sc = SparkContext('local', 'pyspark')

from pyspark.sql.session import SparkSession
spark = SparkSession(sc)

# Loading data

In [2]:
yellow = sc.textFile("yellow.csv.gz")
yellow.take(5)

['tpep_pickup_datetime,tpep_dropoff_datetime,pickup_latitude,pickup_longitude,dropoff_latitude,dropoff_longitude',
 '2015-02-01 00:00:00.0,2015-02-01 00:10:10.0,40.775485992431641,-73.95855712890625,40.720134735107422,-73.9749755859375',
 '2015-02-01 00:00:00.0,2015-02-01 00:03:36.0,40.728248596191406,-73.984840393066406,40.731391906738281,-73.975341796875',
 '2015-02-01 00:00:00.0,2015-02-01 00:10:32.0,40.731014251708984,-74.001373291015625,40.749237060546875,-74.003067016601562',
 '2015-02-01 00:00:00.0,2015-02-01 00:19:42.0,40.721115112304688,-73.987129211425781,40.794017791748047,-73.96990966796875']

In [3]:
list(enumerate(yellow.first().split(",")))

[(0, 'tpep_pickup_datetime'),
 (1, 'tpep_dropoff_datetime'),
 (2, 'pickup_latitude'),
 (3, 'pickup_longitude'),
 (4, 'dropoff_latitude'),
 (5, 'dropoff_longitude')]

In [4]:
bike = sc.textFile("citibike.csv")
bike.take(5)

['cartodb_id,the_geom,tripduration,starttime,stoptime,start_station_id,start_station_name,start_station_latitude,start_station_longitude,end_station_id,end_station_name,end_station_latitude,end_station_longitude,bikeid,usertype,birth_year,gender',
 '1,,801,2015-02-01 00:00:00+00,2015-02-01 00:14:00+00,521,8 Ave & W 31 St,40.75044999,-73.99481051,423,W 54 St & 9 Ave,40.76584941,-73.98690506,17131,Subscriber,1978,2',
 '2,,379,2015-02-01 00:00:00+00,2015-02-01 00:07:00+00,497,E 17 St & Broadway,40.73704984,-73.99009296,504,1 Ave & E 15 St,40.73221853,-73.98165557,21289,Subscriber,1993,1',
 '3,,2474,2015-02-01 00:01:00+00,2015-02-01 00:42:00+00,281,Grand Army Plaza & Central Park S,40.7643971,-73.97371465,127,Barrow St & Hudson St,40.73172428,-74.00674436,18903,Subscriber,1969,2',
 '4,,818,2015-02-01 00:01:00+00,2015-02-01 00:15:00+00,2004,6 Ave & Broome St,40.724399,-74.004704,505,6 Ave & W 33 St,40.74901271,-73.98848395,21044,Subscriber,1985,2']

In [5]:
list(enumerate(bike.first().split(",")))

[(0, 'cartodb_id'),
 (1, 'the_geom'),
 (2, 'tripduration'),
 (3, 'starttime'),
 (4, 'stoptime'),
 (5, 'start_station_id'),
 (6, 'start_station_name'),
 (7, 'start_station_latitude'),
 (8, 'start_station_longitude'),
 (9, 'end_station_id'),
 (10, 'end_station_name'),
 (11, 'end_station_latitude'),
 (12, 'end_station_longitude'),
 (13, 'bikeid'),
 (14, 'usertype'),
 (15, 'birth_year'),
 (16, 'gender')]

# Data wrangling

In [6]:
def filterBike(records):
    for record in records:
        fields = record.split(",")
        if (fields[6] == "Greenwich Ave & 8 Ave" and fields[3].startswith("2015-02-01")):
            yield (fields[3][:19], 1)
            
matchBike = bike.mapPartitions(filterBike)

In [7]:
matchBike.take(2)

[('2015-02-01 00:05:00', 1), ('2015-02-01 00:05:00', 1)]

In [8]:
def filterBike(records):
    for record in records:
        fields = record.split(",")
        if (fields[6] == "Greenwich Ave & 8 Ave" and fields[3].startswith("2015-02-01")):
            yield (fields[7:9], 1)
            
bike.mapPartitions(filterBike).take(5)

[(['40.73901691', '-74.00263761'], 1),
 (['40.73901691', '-74.00263761'], 1),
 (['40.73901691', '-74.00263761'], 1),
 (['40.73901691', '-74.00263761'], 1),
 (['40.73901691', '-74.00263761'], 1)]

In [9]:
bikeStation = (-74.00263761, 40.73901691)

In [10]:
def filterTaxi(pid, lines):
    if pid == 0:
        next(lines)
    import pyproj
    proj = pyproj.Proj(init="epsg:2263", preserve_units=True)
    station = proj(bikeStation[0], bikeStation[1])
    sq_radius = 1320 ** 2
    for line in lines:
        fields = line.split(",")
        try:
            dropoff = proj(fields[5], fields[4])
        except:
            continue
        sq_dist = (dropoff[0] - station[0]) ** 2 + (dropoff[1] - station[1]) ** 2
        if fields[1].startswith("2015-02-01") and sq_dist <= sq_radius:
            yield (fields[1][:19], 0)

matchedTaxi = yellow.mapPartitionsWithIndex(filterTaxi)
matchedTaxi.count()

7278

In [11]:
matchedTaxi.take(5)

[('2015-02-01 00:11:03', 0),
 ('2015-02-01 00:10:23', 0),
 ('2015-02-01 00:16:36', 0),
 ('2015-02-01 00:10:14', 0),
 ('2015-02-01 00:10:12', 0)]

In [12]:
matchBike.take(5)

[('2015-02-01 00:05:00', 1),
 ('2015-02-01 00:05:00', 1),
 ('2015-02-01 00:50:00', 1),
 ('2015-02-01 01:30:00', 1),
 ('2015-02-01 03:28:00', 1)]

In [13]:
alltrip = (matchBike + matchedTaxi).sortByKey().cache()

In [14]:
alltrip.take(5)

[('2015-02-01 00:03:12', 0),
 ('2015-02-01 00:04:39', 0),
 ('2015-02-01 00:05:00', 1),
 ('2015-02-01 00:05:00', 1),
 ('2015-02-01 00:05:38', 0)]

In [15]:
def connextTrips(_, records):
    import datetime
    lastTaxiTime = None
    count = 0
    for dt, mode in records:
        # 用一个指针来标记最近的taxi时间
        t = datetime.datetime.strptime(dt, "%Y-%m-%d %H:%M:%S")
        if mode == 1:
            if lastTaxiTime != None:
                diff = (t - lastTaxiTime).total_seconds()
                if 0 <= diff <= 600:
                    count += 1
        else:
            lastTaxiTime = t
    yield count
    
count = alltrip.mapPartitionsWithIndex(connextTrips).reduce(lambda x, y: x + y)
count

65

# Data wrangling with sql

In [35]:
import pyspark
sqlContext = pyspark.sql.SQLContext(sc)

In [38]:
dfAll = sqlContext.createDataFrame(alltrip)
dfAll = dfAll.select(dfAll['_1'].alias('time'), dfAll['_2'].alias('mode'))
dfAll.show()

+-------------------+----+
|               time|mode|
+-------------------+----+
|2015-02-01 00:03:12|   0|
|2015-02-01 00:04:39|   0|
|2015-02-01 00:05:00|   1|
|2015-02-01 00:05:00|   1|
|2015-02-01 00:05:38|   0|
|2015-02-01 00:06:15|   0|
|2015-02-01 00:07:07|   0|
|2015-02-01 00:07:29|   0|
|2015-02-01 00:07:57|   0|
|2015-02-01 00:08:56|   0|
|2015-02-01 00:08:57|   0|
|2015-02-01 00:09:17|   0|
|2015-02-01 00:09:52|   0|
|2015-02-01 00:10:12|   0|
|2015-02-01 00:10:14|   0|
|2015-02-01 00:10:23|   0|
|2015-02-01 00:10:34|   0|
|2015-02-01 00:10:56|   0|
|2015-02-01 00:11:01|   0|
|2015-02-01 00:11:02|   0|
+-------------------+----+
only showing top 20 rows



In [58]:
dfTrips = dfAll.select(dfAll["time"].cast("timestamp").cast("long").alias("epoch"), "mode")
dfTrips.take(2)

[Row(epoch=1422766992, mode=0), Row(epoch=1422767079, mode=0)]

In [59]:
dfTrips.registerTempTable("trips")
dfTrips.show()

+----------+----+
|     epoch|mode|
+----------+----+
|1422766992|   0|
|1422767079|   0|
|1422767100|   1|
|1422767100|   1|
|1422767138|   0|
|1422767175|   0|
|1422767227|   0|
|1422767249|   0|
|1422767277|   0|
|1422767336|   0|
|1422767337|   0|
|1422767357|   0|
|1422767392|   0|
|1422767412|   0|
|1422767414|   0|
|1422767423|   0|
|1422767434|   0|
|1422767456|   0|
|1422767461|   0|
|1422767462|   0|
+----------+----+
only showing top 20 rows



In [64]:
statement = """
SELECT sum(has_taxi)
FROM (SELECT mode, 1 - MIN(mode) 
OVER (ORDER BY epoch RANGE BETWEEN 600 PRECEDING AND CURRENT ROW) 
AS has_taxi FROM trips) newTrips
WHERE mode = 1
"""

sqlContext.sql(statement).show()

+-------------+
|sum(has_taxi)|
+-------------+
|           65|
+-------------+



In [65]:
import pyspark.sql.functions as sf
import pyspark.sql.window as sw

window = sw.Window.orderBy("epoch").rangeBetween(-600, 0)
results = dfTrips.select("mode", (1 - sf.min(dfTrips["mode"]).over(window)).alias("has_taxi")) \
          .filter(dfTrips["mode"] == 1) \
          .select(sf.sum(sf.col("has_taxi")))
results.show()

+-------------+
|sum(has_taxi)|
+-------------+
|           65|
+-------------+

