# Build Predictive Model(s)

In this workbook, you will read the merged dataset you created previously and you will create transformer, estimators and pipelines to build a binary classification model to predict wether a trip has a tip or not.

## Instructions:

1. Read in your merged dataset
2. Use transformes and encoders to perform feature engineering
3. Split into training and testing
4. Build `LogisticRegression` model(s) and train them using pipelines
5. Evaluate the performance of the model(s) using `BinaryClassificationMetrics`

You are welcome to add as many cells as you need below up until the next section. **You must include comments in your code.**

In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("A04").getOrCreate()

In [2]:
spark

In [3]:
# Load merged data
df = spark.read\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .load('s3://bigdata-a04-qianying/merged_data')

In [4]:
# Print the schema
df.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_time_in_secs: float (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- pickup_longitude: float (nullable = true)
 |-- pickup_latitude: float (nullable = true)
 |-- dropoff_longitude: float (nullable = true)
 |-- dropoff_latitude: float (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- surcharge: float (nullable = true)
 |-- mta_tax: float (nullable = true)
 |-- tip_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- total_amount: float (nullable = true)



* Prepare data

In [5]:
# Create a field as label based on tip_amount
from pyspark.sql.functions import col, expr, when

new_df = df.withColumn("whether_tip", when(df.tip_amount==0,'no').otherwise('yes')).cache() #convert value to array

In [6]:
# Print the schema
new_df.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_time_in_secs: float (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- pickup_longitude: float (nullable = true)
 |-- pickup_latitude: float (nullable = true)
 |-- dropoff_longitude: float (nullable = true)
 |-- dropoff_latitude: float (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- surcharge: float (nullable = true)
 |-- mta_tax: float (nullable = true)
 |-- tip_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- total_amount: float (nullable = true)
 |-- whether_tip: string (nullable = false)



In [7]:
# Split data into training and testing
splitted_data = new_df.randomSplit([0.6, 0.4], 24)
train_data = splitted_data[0]
test_data = splitted_data[1]

print("Number of training records: " + str(train_data.count()))
print("Number of testing records : " + str(test_data.count()))

Number of training records: 103909153
Number of testing records : 69275938


* Create pipeline and train a model

In [8]:
# Import all the packages we need

from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml import Pipeline, Model

In [9]:
# Convert all the string/timestamp fields to numeric ones.
stringIndexer_label = StringIndexer(inputCol="whether_tip", outputCol="label").setHandleInvalid("skip").fit(new_df)
stringIndexer_medallion = StringIndexer(inputCol="medallion", outputCol="medallion_IX").setHandleInvalid("skip").fit(new_df)
stringIndexer_hack_license = StringIndexer(inputCol="hack_license", outputCol="hack_license_IX").setHandleInvalid("skip").fit(new_df)

stringIndexer_vendor_id = StringIndexer(inputCol="vendor_id", outputCol="vendor_id_IX").setHandleInvalid("skip").fit(new_df)
#stringIndexer_pickup_datetime = StringIndexer(inputCol="pickup_datetime", outputCol="pickup_datetime_IX").fit(new_df)
stringIndexer_rate_code = StringIndexer(inputCol="rate_code", outputCol="rate_code_IX").setHandleInvalid("skip").fit(new_df)

stringIndexer_store_and_fwd_flag = StringIndexer(inputCol="store_and_fwd_flag", outputCol="store_and_fwd_flag_IX").setHandleInvalid("skip").fit(new_df)
#stringIndexer_dropoff_datetime = StringIndexer(inputCol="dropoff_datetime", outputCol="dropoff_datetime_IX").fit(new_df)
stringIndexer_payment_type = StringIndexer(inputCol="payment_type", outputCol="payment_type_IX").setHandleInvalid("skip").fit(new_df)

In [11]:
# See the values of label
stringIndexer_label.labels

['yes', 'no']

In [12]:
# Since we made our target variable based on tip_amount, we remove this column, otherwise it will be highly correlated to the target variable.
# And we can remove toal_amount, because it is highly correlated to fare_amount, surcharge, mta_tax, and tolls_amount
# Create a feature vector by combining all features together
vectorAssembler_features = VectorAssembler(
    inputCols=["medallion_IX", 
               "hack_license_IX", 
               "vendor_id_IX", 
               "rate_code_IX",
               "store_and_fwd_flag_IX",
               "payment_type_IX",
               "passenger_count",
               "trip_time_in_secs",
               "trip_distance",
               "pickup_longitude",
               "pickup_latitude",
               "dropoff_longitude",
               "dropoff_latitude",
               "fare_amount",
               "surcharge",
               "mta_tax",
               "tolls_amount"], 
    outputCol="features")

In [13]:
vectorAssembler_features

VectorAssembler_780ef6a891d2

In [14]:
# Define estimators for classification. 
log_model = LogisticRegression(labelCol="label", featuresCol="features") # Build a LogisticRegression model and train it using pipelines.

In [15]:
# Indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", 
                               outputCol="predictedLabel",
                               labels=stringIndexer_label.labels)

In [16]:
# Build a pipeline
pipeline_lg_model = Pipeline(stages=[stringIndexer_label, 
                               stringIndexer_medallion, 
                               stringIndexer_hack_license, 
                               stringIndexer_vendor_id, 
                               stringIndexer_rate_code,
                               stringIndexer_store_and_fwd_flag,
                               stringIndexer_payment_type,
                               vectorAssembler_features, 
                               log_model, labelConverter])

In [17]:
# Train the LogisticRegression model by using the previously defined pipeline and parts of train data to check whether the pipelines work as planned.

sample = train_data.limit(100) # Create a tiny DataFrame
model_lg = pipeline_lg_model.fit(sample)

In [18]:
# Now we can apply the model and pipeline to the whole train_data
model_lg = pipeline_lg_model.fit(train_data)

In [None]:
# Evaluate model

# Evaluate model

predictions = model_lg.transform(test_data)
results = predictions.select(['probability','prediction']).cache() # dataframe

results_collect = results.collect() # list
results_list = [(float(i[0][0]), 1-float(i[1])) for i in results_collect]

scoreAndLabels = spark.sparkContext.parallelize(results_list)

scoreAndLabels.take(10)

In [155]:
evaluatorLG = BinaryClassificationMetrics(scoresnLabels)

In [175]:
type(scoresnLabels)

pyspark.rdd.PipelinedRDD

In [93]:
predictions.show()

+--------------------+--------------------+---------+-------------------+---------+------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+-----------+-----+------------+---------------+------------+------------+---------------------+---------------+--------------------+--------------------+--------------------+----------+--------------+
|           medallion|        hack_license|vendor_id|    pickup_datetime|rate_code|store_and_fwd_flag|   dropoff_datetime|passenger_count|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount|whether_tip|label|medallion_IX|hack_license_IX|vendor_id_IX|rate_code_IX|store_and_fwd_flag_IX|payment_type_IX|            features|       rawPrediction|         p

## In the following cells, please provide the requested code and output. Do not change the order and/or structure of the cells.

In the following cell, print the Area Under the Curve (AUC) for your binary classifier.

In [None]:
print("Area under ROC = %s" % evaluatorLG.areaUnderROC)

In the following cell, provide the code that saves your model your S3 bucket.