# Build Predictive Model(s)

In this workbook, you will read the merged dataset you created previously and you will create transformer, estimators and pipelines to build a binary classification model to predict wether a trip has a tip or not.

## Instructions:

1. Read in your merged dataset
2. Use transformes and encoders to perform feature engineering
3. Split into training and testing
4. Build `LogisticRegression` model(s) and train them using pipelines
5. Evaluate the performance of the model(s) using `BinaryClassificationMetrics`

You are welcome to add as many cells as you need below up until the next section. **You must include comments in your code.**

## Start Spark

In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("assign-4-model").getOrCreate()

from pyspark.sql.types import DoubleType
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline, Model

from pyspark.sql.functions import to_timestamp, year, month, dayofweek

## Load Data

In [2]:
data = spark.read.parquet('s3://weiteng-li/merged_data/')

In [3]:
data.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_time_in_secs: float (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- pickup_longitude: float (nullable = true)
 |-- pickup_latitude: float (nullable = true)
 |-- dropoff_longitude: float (nullable = true)
 |-- dropoff_latitude: float (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- surcharge: float (nullable = true)
 |-- mta_tax: float (nullable = true)
 |-- tip_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- total_amount: float (nullable = true)



## Feature Engineering

In [4]:
# convert 'tip_amount' to a binary feature
data = data.withColumn('tip_amount',data['tip_amount'].cast('double'))
binarizer = Binarizer(threshold=0.0, inputCol="tip_amount", outputCol="labels")
data = binarizer.transform(data)
stringIndexer_label = StringIndexer(inputCol="labels", outputCol="target")
label_fit = StringIndexer(inputCol="labels", outputCol="target").fit(data)

In [5]:
# build up datetime features
data = data.withColumn('dropoff_year', year('dropoff_datetime'))
data = data.withColumn('dropoff_month', month('dropoff_datetime'))
data = data.withColumn('dropoff_dayofweek', dayofweek('dropoff_datetime'))

In [15]:
# stringIndexer
payment_type = StringIndexer(inputCol="payment_type", outputCol="payment_type_IDX").setHandleInvalid("keep")
store_and_fwd_flag = StringIndexer(inputCol="store_and_fwd_flag", outputCol="store_and_fwd_flag_IDX").setHandleInvalid("keep")
dropoff_year = StringIndexer(inputCol="dropoff_year", outputCol="dropoff_year_IDX").setHandleInvalid("keep")
dropoff_month = StringIndexer(inputCol="dropoff_month", outputCol="dropoff_month_IDX").setHandleInvalid("keep")
dropoff_dayofweek = StringIndexer(inputCol="dropoff_dayofweek", outputCol="dropoff_dayofweek_IDX").setHandleInvalid("keep")

In [16]:
iput_cols =   [ "store_and_fwd_flag_IDX",
               "payment_type_IDX", 
               
               "trip_time_in_secs",
               "trip_distance", 
               "fare_amount", 
               "surcharge" , 
               "tolls_amount",
               "total_amount", 
               "passenger_count",
               
               "dropoff_year_IDX",
               "dropoff_month_IDX",
               "dropoff_dayofweek_IDX"]

In [17]:
vectorAssembler_features = VectorAssembler(inputCols=iput_cols, outputCol="features")

## Split Data into Training Set & Testset

In [18]:
train_data,test_data = data.randomSplit(weights = [0.85, 0.15], seed=2333)
print("Number of training records: " + str(train_data.count()))
print("Number of testing records : " + str(test_data.count()))

Number of training records: 147202464
Number of testing records : 25974649


## Compile Pipeline

In [19]:
logit = LogisticRegression(labelCol="labels", featuresCol="features")

In [20]:
pipeline = Pipeline(stages=[payment_type,
                            store_and_fwd_flag, 
                            dropoff_year,
                            dropoff_month,
                            dropoff_dayofweek,
                            vectorAssembler_features,
                            logit])

## Training & Validation

In [21]:
model = pipeline.fit(train_data)

In [22]:
predictions = model.transform(test_data)
predictions_target_table = predictions.select('prediction','labels').rdd

In [23]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
metrics = BinaryClassificationMetrics(predictions_target_table)
metrics.areaUnderROC

0.9980430071208847

## In the following cells, please provide the requested code and output. Do not change the order and/or structure of the cells.

In the following cell, print the Area Under the Curve (AUC) for your binary classifier.

In [24]:
metrics.areaUnderROC

0.9980430071208847

In the following cell, provide the code that saves your model your S3 bucket.

In [25]:
logit.save('s3://weiteng-li/logit/')