The pipeline is trained on normalized counts.

Three models:
1. All features treated as continuous; 06m39s for 2014-tiny
2. Weekday and hour treated as categorical; 07m34s for 2014-tiny
3. Weekday, hour, and grid treated as categorical; 07m56s for 2014-tiny
4. Grid treated as categorical (not implemented).

TODO:
- train on 2014, dev and test on 2015.


In [13]:
import pyspark
from utils import sparkutils

from pyspark.sql.functions import min, max, col, stddev, abs
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, OneHotEncoderEstimator, MinMaxScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [2]:
spark = pyspark.sql.SparkSession.builder\
.appName("Rides Preprocessor")\
.master("local")\
.config("spark.local.dir", "/home/atkm/nycTaxi/tmp")\
.getOrCreate()

In [3]:
def get_ride_data(year, month, size='tiny'):
    return f'data/yellow_tripdata_{year}-{month:02}_{size}.csv'

def get_metar_data(year, month):
    return f'data/metar_data/lga_{year}-{month:02}.csv'

def read_csv(path):
    return spark.read.format("csv")\
      .option("header", "true")\
      .option("inferSchema", "true")\
      .load(path)

def load_metar(metarPath):
    metar = read_csv(metarPath)
    metar = metar.select("valid","tmpf", " p01i") # note whitespace in p01i
    return metar.withColumnRenamed('valid', 'datetime')\
        .withColumnRenamed('tmpf', 'fahrenheit')\
        .withColumnRenamed(' p01i', 'precip_in')

def load_rides(ridesPath):
    rides = read_csv(ridesPath)
    # 2014
    colNames = map(lambda name: name.strip(), rides.columns)
    rides = rides.toDF(*colNames)
    return rides.select("pickup_datetime","pickup_latitude", "pickup_longitude")


In [18]:
# df = rides and metar joined.
# model_type = '1', '2', or '3'.
def rf_pipeline(df, model_type, grid_dict, numFolds=5):
    
    rf = RandomForestRegressor(
        featuresCol='features',
        labelCol='count_scaled'
    )
    
    # Build pipeline.
    if model_type == '1':
        assembler = VectorAssembler(
            inputCols = ['grid_x', 'grid_y', 'weekday', 'hour', 'fahrenheit', 'precip_in'],
            outputCol = 'features'
        )
        pipeline = Pipeline(
            stages=[assembler, rf]
        )
    elif model_type == '2':
        encoder = OneHotEncoderEstimator(
            inputCols=['weekday', 'hour'],
            outputCols=['weekday_vec', 'hour_vec']
        )
        assembler = VectorAssembler(
            inputCols = ['grid_x', 'grid_y', 'weekday_vec', 'hour_vec', 'fahrenheit', 'precip_in'],
            outputCol = 'features'
        )
        pipeline = Pipeline(
            stages=[encoder, assembler, rf]
        )
    elif model_type == '3':
        encoder = OneHotEncoderEstimator(
            inputCols=['weekday','hour','grid_x','grid_y'],
            outputCols=['weekday_vec','hour_vec','grid_x_vec','grid_y_vec']
        )
        assembler = VectorAssembler(
            inputCols = ['grid_x_vec', 'grid_y_vec', 'weekday_vec', 'hour_vec', 'fahrenheit', 'precip_in'],
            outputCol = 'features'
        )
        pipeline = Pipeline(
            stages=[encoder, assembler, rf]
        )
    else:
        raise ValueError("Model type must be either 1, 2, or 3.")
        
    # TODO: is there a random search module?
    # start with numTrees: [10, 100, 1000]
    # maxDepth: [10, 100, 1000]
    # minInstancesPerNode: [1, 10, 100, 1000]
    paramGrid = ParamGridBuilder() \
        .addGrid(rf.numTrees, grid_dict['numTrees']) \
        .addGrid(rf.maxDepth, grid_dict['maxDepth']) \
        .addGrid(rf.minInstancesPerNode, grid_dict['minInstancesPerNode']) \
        .build()
    
    
    #train, dev, test = joined.randomSplit([1/3] * 3)
    train, dev, test = joined.randomSplit([0.8, 0.1, 0.1])
    
    evaluator = RegressionEvaluator(
        labelCol='count_scaled', predictionCol='prediction', metricName='rmse'
    )

    crossval = CrossValidator(estimator=pipeline,
                         estimatorParamMaps=paramGrid,
                         evaluator=evaluator,
                         numFolds=numFolds)
    
    model = crossval.fit(train)
    pred_on_test = model.transform(test)
    rmse_on_test = evaluator.evaluate(pred_on_test)
    # TODO: scale back to make these values interpretable
    mean_error = pred.agg(pyspark.sql.functions.abs(col('count') - col('prediction')))
    error_stddev = pred.agg(stddev(col('count') - col('prediction')))
    
    # get metric values with model.avgMetrics,
    # and parameters with model.getEstimatorParamMaps
    return model, pred_on_test, rmse_on_test#, mean_error, error_stddev

def get_model_stats(model):
    
    def _parse_params(params):
        for p, val in params.items():
            yield (p.name, val)
            
    pmaps = model.getEstimatorParamMaps()
    pmaps = [list(_parse_params(params)) for params in pmaps]
    return list(zip(pmaps, model.avgMetrics))

In [5]:
rides = sparkutils.count_rides(
    load_rides(get_ride_data(2014,1))
)
metar = sparkutils.clean_metar(
    load_metar(get_metar_data(2014,1))
)
for m in range(1): # TODO: replace range(1) with range(12)
    rides = rides.unionAll(
        sparkutils.count_rides(load_rides(get_ride_data(2014,m+2)))
    )
    metar = metar.unionAll(
        sparkutils.clean_metar(load_metar(get_metar_data(2014,m+2)))
    )
    
joined = sparkutils.join_rides_metar(rides,metar)

In [6]:
rides.show(2)
metar.show(2)
joined.show(5)
print(joined.count())

+-------------------+------+------+-----+-------------------+
|    pickup_datetime|grid_x|grid_y|count|       count_scaled|
+-------------------+------+------+-----+-------------------+
|2014-01-24 09:00:00|     5|    28|    1|0.16666666666666666|
|2014-01-29 22:00:00|     6|    25|    2| 0.3333333333333333|
+-------------------+------+------+-----+-------------------+
only showing top 2 rows

+-------------------+----------+---------+
|           datetime|fahrenheit|precip_in|
+-------------------+----------+---------+
|2014-01-27 03:00:00|     30.02|      0.0|
|2014-01-10 21:00:00|     33.98|      0.0|
+-------------------+----------+---------+
only showing top 2 rows

+------+------+-----+-------------------+----------+---------+-------+----+
|grid_x|grid_y|count|       count_scaled|fahrenheit|precip_in|weekday|hour|
+------+------+-----+-------------------+----------+---------+-------+----+
|     5|    30|    1|0.16666666666666666|     30.02|      0.0|      1|   3|
|     4|    26| 

## Trained Random Forest Model - PySpark

## Model 1: all features treated as continuous

In [7]:
%%time
grid_dict = {'numTrees': [5],
             'maxDepth': [10],
             'minInstancesPerNode': [1,5]}
# cv=5, 2 parameter sets to search, 2 months => 29m32s
# cv=2, 2 parameter sets to search, 2 months => 11m40s
model, pred, rmse, mean_error, error_stddev = rf_pipeline(joined, '1', grid_dict, 2)

CPU times: user 4.26 s, sys: 1.45 s, total: 5.71 s
Wall time: 15min 18s


In [8]:
get_model_stats(model), rmse, mean_error, error_stddev

([([('numTrees', 5), ('maxDepth', 10), ('minInstancesPerNode', 1)],
   0.09575448006144924),
  ([('numTrees', 5), ('maxDepth', 10), ('minInstancesPerNode', 5)],
   0.09537282127403517)],
 0.0957761876207309)

In [16]:
pred.agg(stddev(col('count') - col('prediction'))).show()

+---------------------------------+
|stddev_samp((count - prediction))|
+---------------------------------+
|               0.5935841436292253|
+---------------------------------+



In [9]:
for i, s in enumerate(model.stages):
    print(f'{i}: {s}')

AttributeError: 'CrossValidatorModel' object has no attribute 'stages'

## Model 2: weekday and hour as categorical

In [10]:
%%time
grid_dict = {'numTrees': [5],
             'maxDepth': [10],
             'minInstancesPerNode': [1,5]}
# cv=2, 2 parameter sets to search, 2 months => 26m40s
model, pred, rmse = rf_pipeline(joined, '2', grid_dict, 2)

CPU times: user 11.1 s, sys: 3.76 s, total: 14.8 s
Wall time: 26min 7s


In [11]:
get_model_statsdel_stats(model)

[([('numTrees', 5), ('maxDepth', 10), ('minInstancesPerNode', 1)],
  0.5630926358762378),
 ([('numTrees', 5), ('maxDepth', 10), ('minInstancesPerNode', 5)],
  0.5645315490127796)]

In [11]:
encoder = OneHotEncoderEstimator(
    inputCols=['weekday', 'hour'],
    outputCols=['weekday_vec', 'hour_vec']
)
feature_assembler = VectorAssembler(
    inputCols = ['grid_x', 'grid_y', 'weekday_vec', 'hour_vec', 'fahrenheit', 'precip_in'],
    outputCol = 'features'
)
# target_scaler = MinMaxScaler(
#     inputCol="count",
#     outputCol="count_scaled"
# )
rf = RandomForestRegressor(
    featuresCol='features',
    labelCol='count_scaled',
    numTrees=3, #default = 20
    maxDepth=5, #default = 5
)

pipeline = Pipeline(
    stages=[encoder, feature_assembler, rf]
)

In [12]:
%%time
train, test = joined.randomSplit([0.5,0.5])
model = pipeline.fit(train)
pred = model.transform(test)
print(pred.columns)
evaluator = RegressionEvaluator(
    labelCol='count_scaled', predictionCol='prediction', metricName='rmse'
)
rmse = evaluator.evaluate(pred)
print(rmse)

['grid_x', 'grid_y', 'count', 'count_scaled', 'fahrenheit', 'precip_in', 'weekday', 'hour', 'weekday_vec', 'hour_vec', 'features', 'prediction']
0.11078096256449929
CPU times: user 371 ms, sys: 93.2 ms, total: 464 ms
Wall time: 26.2 s


## Model 3: weekday, hour, and grid as categorical

In [54]:
encoder = OneHotEncoderEstimator(
    inputCols=['weekday','hour','grid_x','grid_y'],
    outputCols=['weekday_vec','hour_vec','grid_x_vec','grid_y_vec']
)
assembler = VectorAssembler(
    inputCols = ['grid_x_vec', 'grid_y_vec', 'weekday_vec', 'hour_vec', 'fahrenheit', 'precip_in'],
    outputCol = 'features'
)
rf = RandomForestRegressor(
    featuresCol='features',
    labelCol='count',
    numTrees=3, #default = 20
    maxDepth=5, #default = 5
)

pipeline = Pipeline(
    stages=[encoder, assembler, rf]
)

In [55]:
%%time
train, test = joined.randomSplit([0.5,0.5])
model = pipeline.fit(train)
pred = model.transform(test)
print(pred.columns)
evaluator = RegressionEvaluator(
    labelCol='count', predictionCol='prediction', metricName='rmse'
)
rmse = evaluator.evaluate(pred)
print(rmse)

['grid_x', 'grid_y', 'count', 'fahrenheit', 'precip_in', 'weekday', 'hour', 'weekday_vec', 'hour_vec', 'grid_x_vec', 'grid_y_vec', 'features', 'prediction']
0.5673652391255603
CPU times: user 3.51 s, sys: 1.35 s, total: 4.86 s
Wall time: 7min 56s
