# High tip classification - Random Forest

## Spark

<img src="https://upload.wikimedia.org/wikipedia/commons/thumb/f/f3/Apache_Spark_logo.svg/1280px-Apache_Spark_logo.svg.png" width="400">

**Hardware**: 10 nodes - r5.8xlarge (32 CPU, 256GB RAM)

In [None]:
import os
os.environ['TAXI_S3'] = 's3://saturn-titan/nyc-taxi'

In [2]:
from ml_utils import MLUtils

ml_utils = MLUtils(
    ml_task='high_tip',
    tool='spark',
    model='random_forest',
)

# Load data and feature engineering

In [3]:
import numpy as np
import datetime
import findspark
findspark.init()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T


spark = SparkSession.builder.getOrCreate()

In [4]:
%%time
tip_train = spark.read.parquet(f'{ml_utils.taxi_path}/data/ml/tip_train')
tip_train.count()

CPU times: user 2.82 ms, sys: 0 ns, total: 2.82 ms
Wall time: 9.72 s


219889897

In [5]:
tip_train.head()

Row(id='e54881cf13434e2482468589053b046c', pickup_datetime=datetime.datetime(2018, 5, 25, 8, 49, 20), dropoff_datetime=datetime.datetime(2018, 5, 25, 9, 12, 59), pickup_taxizone_id=33.0, dropoff_taxizone_id=158.0, pickup_weekday=4, pickup_weekofyear=21, pickup_hour=8, pickup_minute=49, pickup_week_hour=104, passenger_count=1.0, tip_fraction=0.20842105263157895)

<br>

We can use the full data with Spark so no need to sample here

# Random forest classifier

We'll train a classifier than can predict "high-tip" rides - those where the tip percent is >25%

In [6]:
features = ml_utils.tip_vars.features
high_tip = ml_utils.tip_vars.high_tip
y_col = ml_utils.tip_vars.y_col

tip_train = tip_train.withColumn('label', (tip_train[y_col] > high_tip).cast(T.IntegerType()))

In [7]:
tip_train.groupby('label').count().show()

+-----+---------+
|label|    count|
+-----+---------+
|    1| 58310016|
|    0|161579881|
+-----+---------+



In [8]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.pipeline import Pipeline

assembler = VectorAssembler(
    inputCols=features,
    outputCol='features',
)

pipeline = Pipeline(stages=[assembler])

Transform features first to isolate random forest runtime

In [9]:
%%time
assembler_fitted = pipeline.fit(tip_train)
X = assembler_fitted.transform(tip_train)

X.cache()
X.count()

CPU times: user 6.43 ms, sys: 426 µs, total: 6.85 ms
Wall time: 23.1 s


219889897

In [10]:
from pyspark.ml.classification import RandomForestClassifier
rfc = RandomForestClassifier(numTrees=100, maxDepth=10, seed=42)

In [11]:
%%time
with ml_utils.time_fit():
    fitted = rfc.fit(X)

CPU times: user 141 ms, sys: 14.7 ms, total: 155 ms
Wall time: 18min 37s


## Predict on test set

In [12]:
tip_test = spark.read.parquet(f'{ml_utils.taxi_path}/data/ml/tip_test')
tip_test = tip_test.withColumn('label', (tip_test[y_col] > high_tip).cast(T.IntegerType()))

preds = fitted.transform(assembler_fitted.transform(tip_test))

In [13]:
preds.head()

Row(id='2e8f402e4dc44f2fae8b9328a237c4d2', pickup_datetime=datetime.datetime(2019, 9, 9, 10, 19, 44), dropoff_datetime=datetime.datetime(2019, 9, 9, 10, 31, 26), pickup_taxizone_id=162.0, dropoff_taxizone_id=170.0, pickup_weekday=0, pickup_weekofyear=37, pickup_hour=10, pickup_minute=19, pickup_week_hour=10, passenger_count=1.0, tip_fraction=0.11764705882352941, label=0, features=DenseVector([0.0, 37.0, 10.0, 10.0, 19.0, 1.0, 162.0, 170.0]), rawPrediction=DenseVector([78.7087, 21.2913]), probability=DenseVector([0.7871, 0.2129]), prediction=0.0)

In [14]:
# need to convert spark DenseVector to array type to extract positive class probability
to_array = F.udf(lambda v: v.toArray().tolist(), T.ArrayType(T.FloatType()))

(preds
 .select(preds.id, preds[y_col].alias('actual'), to_array(preds.probability)[1].alias('predicted'))
 .show(5))

+--------------------+-------------------+----------+
|                  id|             actual| predicted|
+--------------------+-------------------+----------+
|2e8f402e4dc44f2fa...|0.11764705882352941|0.21291299|
|5f067a4121244f42b...| 0.2168421052631579|0.20769915|
|60e8442d3d434df49...|               0.15|0.21291299|
|2d1537ce2ed347778...|            0.10625| 0.2546924|
|13bb8a9ecbd04b559...|                0.0|0.22621535|
+--------------------+-------------------+----------+
only showing top 5 rows



In [15]:
path = f'{ml_utils.taxi_path}/ml_results/predictions/{ml_utils.ml_task}__{ml_utils.tool}__{ml_utils.model}'
path

's3://saturn-titan/nyc-taxi/ml_results/predictions/high_tip__spark__random_forest'

In [16]:
%%time
(preds
 .select(preds.id, preds[y_col].alias('actual'), to_array(preds.probability)[1].alias('predicted'))
 .write.parquet(path, mode='overwrite')
)

CPU times: user 12.3 ms, sys: 5.12 ms, total: 17.4 ms
Wall time: 26.6 s


In [17]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
auc = evaluator.evaluate(preds)
ml_utils.write_metric_df('auc', auc)

Unnamed: 0,ml_task,tool,model,metric,value,fit_seconds
0,high_tip,spark,random_forest,auc,0.536425,1117.825969
