In [1]:
import pandas as pd
from datetime import datetime
from dateutil.relativedelta import relativedelta
from azureml.contrib.opendatasets import NycTlcGreen

from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame


start = datetime.strptime("1/1/2016", "%m/%d/%Y")
end = datetime.strptime("5/31/2016", "%m/%d/%Y")

dfs = []
for sample_month in range(5):
    temp_df_green = NycTlcGreen(
        start + relativedelta(months=sample_month),
        end + relativedelta(months=sample_month)).to_spark_dataframe()
    dfs.append(temp_df_green.sample(False, 0.001, 3))

green_taxi_df = reduce(DataFrame.unionAll, dfs)

Save a copy of the raw_columns name list for clean up at the last step.

In [3]:
raw_columns = list(green_taxi_df.columns)

Drop the rows that both lat/long are NaN, especially all columns in the first row are NaN.

In [5]:
green_taxi_df = green_taxi_df.dropna(how='all', subset=['lpepPickupDatetime', 'pickupLatitude', 'pickupLongitude'])

NYC Latitude & Longitude: (40.7128, 74.0060)

Add to taxi dataframe

In [7]:
from pyspark.sql.functions import lit

nyc_lat = 40.7128
nyc_long = 74.0060
green_taxi_df = green_taxi_df.withColumn('lat', lit(nyc_lat)).withColumn('long', lit(nyc_long))
display(green_taxi_df.limit(5))

vendorID,lpepPickupDatetime,lpepDropoffDatetime,passengerCount,tripDistance,puLocationId,doLocationId,pickupLongitude,pickupLatitude,dropoffLongitude,dropoffLatitude,rateCodeID,storeAndFwdFlag,paymentType,fareAmount,extra,mtaTax,improvementSurcharge,tipAmount,tollsAmount,ehailFee,totalAmount,tripType,puYear,puMonth,lat,long
2,2016-05-17T20:33:59.000+0000,2016-05-17T20:48:18.000+0000,1,2.69,,,-73.98011016845703,40.668556213378906,-73.99358367919922,40.694671630859375,1,N,1,12.5,0.5,0.5,0.3,2.76,0.0,,16.56,1,2016,5,40.7128,74.006
2,2016-05-17T07:54:48.000+0000,2016-05-17T08:00:47.000+0000,1,0.76,,,-73.95852661132812,40.81023025512695,-73.96691131591797,40.804317474365234,1,N,2,5.0,0.0,0.5,0.3,0.0,0.0,,5.8,1,2016,5,40.7128,74.006
2,2016-05-17T21:04:06.000+0000,2016-05-17T21:17:34.000+0000,1,2.21,,,-73.95545196533203,40.68108367919922,-73.93508911132812,40.66455841064453,1,N,1,10.5,0.5,0.5,0.3,0.0,0.0,,11.8,1,2016,5,40.7128,74.006
2,2016-05-17T22:02:31.000+0000,2016-05-17T22:06:55.000+0000,1,1.03,,,-73.92160034179688,40.76676940917969,-73.93394470214844,40.770973205566406,1,N,1,5.5,0.5,0.5,0.3,1.7,0.0,,8.5,1,2016,5,40.7128,74.006
2,2016-05-17T22:09:14.000+0000,2016-05-17T22:15:45.000+0000,2,1.43,,,-73.85225677490234,40.729251861572266,-73.86149597167969,40.73771286010742,1,N,2,7.0,0.5,0.5,0.3,0.0,0.0,,8.3,1,2016,5,40.7128,74.006


In [8]:
# This is a contrib package in preview. The package name is subject to change.

from azureml.contrib.opendatasets.accessories.location_data import LatLongColumn
from azureml.contrib.opendatasets.accessories.location_time_customer_data \
    import LocationTimeCustomerData
from azureml.contrib.opendatasets import NoaaIsdWeather


green_taxi = LocationTimeCustomerData(
    green_taxi_df,
    LatLongColumn('lat', 'long'),
    'lpepPickupDatetime')

Initialize NoaaIsdWeather class, get enricher from it, and enrich the taxi data without aggregation

In [10]:
spark.conf.set('spark.sql.crossJoin.enabled', 'true')

In [11]:
weather = NoaaIsdWeather(
    cols=["temperature", "precipTime", "precipDepth", "snowDepth"],
    start_date=datetime(2016, 1, 1, 0, 0),
    end_date=datetime(2016, 5, 31, 23, 59))
weather_enricher = weather.get_enricher()
new_green_taxi, processed_weather = weather_enricher.enrich_customer_data_no_agg(
    customer_data_object=green_taxi,
    location_match_granularity=5,
    time_round_granularity='day')

Preview the pandas dataframe new_green_taxi.data

In [13]:
display(new_green_taxi.data.limit(3))

lat,long,vendorID,lpepPickupDatetime,lpepDropoffDatetime,passengerCount,tripDistance,puLocationId,doLocationId,pickupLongitude,pickupLatitude,dropoffLongitude,dropoffLatitude,rateCodeID,storeAndFwdFlag,paymentType,fareAmount,extra,mtaTax,improvementSurcharge,tipAmount,tollsAmount,ehailFee,totalAmount,tripType,puYear,puMonth,row_id,customer_rankgroupuq61c,customer_join_timenb8lu
40.7128,74.006,2,2016-05-19T02:22:45.000+0000,2016-05-19T02:43:55.000+0000,1,8.65,,,-73.87798309326172,40.75446319580078,-73.94257354736328,40.6864128112793,1,N,1,26.0,0.5,0.5,0.3,5.46,0.0,,32.76,1,2016,5,51539607633,1,2016-05-19T00:00:00.000+0000
40.7128,74.006,2,2016-05-15T01:13:38.000+0000,2016-05-15T01:24:02.000+0000,1,3.18,,,-73.94502258300781,40.82412338256836,-73.92411804199219,40.8641357421875,1,N,2,12.0,0.5,0.5,0.3,0.0,0.0,,13.3,1,2016,5,51539608021,1,2016-05-15T00:00:00.000+0000
40.7128,74.006,2,2016-05-09T19:10:36.000+0000,2016-05-09T19:16:00.000+0000,1,0.84,,,-73.99449920654297,40.69467544555664,-73.9945297241211,40.68497467041016,1,N,2,5.5,1.0,0.5,0.3,0.0,0.0,,7.3,1,2016,5,51539608330,1,2016-05-09T00:00:00.000+0000


Define a dict `aggregations` to define how to aggregate each field at a hour level. For `snowDepth` and `temperature` we'll take the mean and for `precipTime` and `precipDepth` we'll take the hourly maximum. Use the groupby() function along with the aggregations to group data.

In [15]:
aggregations = {
    "snowDepth": "mean",
    "precipTime": "max",
    "temperature": "mean",
    "precipDepth": "max"}

The keys (`public_rankgroup`, `public_join_time`, `customer_rankgroup`, `customer_join_time`) used by groupby() and later merge() must be hacked here due to the current design.

In [17]:
public_rankgroup = processed_weather.id

public_join_time = [
    s for s in list(processed_weather.data.columns)
    if s.startswith('ds_join_time')][0]

customer_rankgroup = weather_enricher.location_selector.customer_rankgroup

customer_join_time = [
    s for s in list(new_green_taxi.data.columns)
    if type(s) is str and s.startswith('customer_join_time')][0]

weather_df_grouped = processed_weather.data.groupby(public_rankgroup, public_join_time).agg(aggregations)
display(weather_df_grouped.limit(3))

public_rankgroupa7c41,ds_join_time0r7qp,avg(snowDepth),avg(temperature),max(precipDepth),max(precipTime)
1,2016-03-14T00:00:00.000+0000,,7.804285714285711,9999.0,0.0
1,2016-01-13T00:00:00.000+0000,9.0,1.6463917525773193,,
1,2016-05-15T00:00:00.000+0000,,19.795774647887324,20.0,0.0


Join the final dataframe, and preview the joined result.

In [19]:
taxi_df = new_green_taxi.data
joined_dataset = taxi_df.join(
    weather_df_grouped,
    [taxi_df[customer_rankgroup] == weather_df_grouped[public_rankgroup],
     taxi_df[customer_join_time] == weather_df_grouped[public_join_time]],
    how='left')

final_df = joined_dataset.select(raw_columns + [
    "avg(temperature)", "max(precipTime)", "max(precipDepth)", "avg(snowDepth)"])
display(final_df.limit(5))

vendorID,lpepPickupDatetime,lpepDropoffDatetime,passengerCount,tripDistance,puLocationId,doLocationId,pickupLongitude,pickupLatitude,dropoffLongitude,dropoffLatitude,rateCodeID,storeAndFwdFlag,paymentType,fareAmount,extra,mtaTax,improvementSurcharge,tipAmount,tollsAmount,ehailFee,totalAmount,tripType,puYear,puMonth,avg(temperature),max(precipTime),max(precipDepth),avg(snowDepth)
2,2016-03-14T11:01:20.000+0000,2016-03-14T11:11:57.000+0000,1,1.56,,,-73.93900299072266,40.799312591552734,-73.94047546386719,40.81391906738281,1,N,2,7.5,0.0,0.5,0.3,0.0,0.0,,8.3,1,2016,3,7.804285714285711,0.0,9999.0,
1,2016-03-14T01:07:20.000+0000,2016-03-14T01:11:06.000+0000,1,0.7,,,-73.86914825439453,40.74928283691406,-73.88216400146484,40.74787902832031,1,N,2,4.5,0.5,0.5,0.3,0.0,0.0,,5.8,1,2016,3,7.804285714285711,0.0,9999.0,
2,2016-03-14T14:47:36.000+0000,2016-03-14T15:09:26.000+0000,1,4.01,,,-73.83930969238281,40.88121795654297,-73.85816955566406,40.83949661254883,1,N,2,17.0,0.0,0.5,0.3,0.0,0.0,,17.8,1,2016,3,7.804285714285711,0.0,9999.0,
2,2016-03-14T00:06:39.000+0000,2016-03-14T00:22:51.000+0000,1,1.76,,,-73.95321655273438,40.73318099975586,-73.95167541503906,40.71426010131836,1,N,2,12.0,0.5,0.5,0.3,0.0,0.0,,13.3,1,2016,3,7.804285714285711,0.0,9999.0,
1,2016-03-14T08:51:27.000+0000,2016-03-14T09:26:33.000+0000,1,3.9,,,-73.95777893066406,40.732051849365234,-73.986572265625,40.73390579223633,1,N,1,22.5,0.0,0.5,0.3,4.0,5.54,,32.84,1,2016,3,7.804285714285711,0.0,9999.0,


Check the join success rate.

In [21]:
final_df.toPandas().info()

In [22]:
final_df.createOrReplaceTempView('joined_df')

In [23]:
%sql
select * from joined_df
where lpepPickupDatetime >= '2016-01-26' and lpepPickupDatetime < '2016-01-27'
order by lpepPickupDatetime limit 5

vendorID,lpepPickupDatetime,lpepDropoffDatetime,passengerCount,tripDistance,puLocationId,doLocationId,pickupLongitude,pickupLatitude,dropoffLongitude,dropoffLatitude,rateCodeID,storeAndFwdFlag,paymentType,fareAmount,extra,mtaTax,improvementSurcharge,tipAmount,tollsAmount,ehailFee,totalAmount,tripType,puYear,puMonth,avg(temperature),max(precipTime),max(precipDepth),avg(snowDepth)
2,2016-01-26T01:21:46.000+0000,2016-01-26T01:24:14.000+0000,1,0.52,,,-73.95204162597656,40.7901725769043,-73.9486312866211,40.789005279541016,1,N,1,4.0,0.5,0.5,0.3,1.06,0.0,,6.36,1,2016,1,3.9578313253012047,0.0,0.0,
1,2016-01-26T01:44:03.000+0000,2016-01-26T01:53:30.000+0000,1,3.4,,,-73.95426177978516,40.58743667602539,-73.99391174316406,40.57358551025391,1,N,1,11.5,0.5,0.5,0.3,2.0,0.0,,14.8,1,2016,1,3.9578313253012047,0.0,0.0,
2,2016-01-26T02:16:18.000+0000,2016-01-26T02:25:15.000+0000,5,1.85,,,-73.94403839111328,40.711669921875,-73.9235610961914,40.7059326171875,1,N,2,8.5,0.5,0.5,0.3,0.0,0.0,,9.8,1,2016,1,3.9578313253012047,0.0,0.0,
2,2016-01-26T03:46:45.000+0000,2016-01-26T03:54:24.000+0000,1,1.74,,,-73.98497772216797,40.66352081298828,-73.96715545654297,40.67627334594727,1,N,1,7.5,0.5,0.5,0.3,5.0,0.0,,13.8,1,2016,1,3.9578313253012047,0.0,0.0,
1,2016-01-26T04:26:24.000+0000,2016-01-26T15:03:38.000+0000,3,360.5,,,-73.94805908203125,40.65092086791992,-73.95218658447266,40.64495468139648,1,N,3,989.0,0.5,0.5,0.3,0.0,10.5,,1000.8,1,2016,1,3.9578313253012047,0.0,0.0,
