# NYC Taxi Dataset - Big Data e Computação em Nuvem
## Machine Learning & Pipelines

Profs. Thanuci & Michel

### Documentação: [PIPELINE SPARK](https://spark.apache.org/docs/latest/ml-pipeline.html)


* **vendor_id**: A code indicating the TPEP provider that provided the record. 1= Creative Mobile Technologies, LLC; 2= VeriFone Inc
* **pickup_datetime**: The date and time when the meter was engaged.
* **dropoff_datetime**: The date and time when the meter was disengaged
* **passenger_coun**t: The number of passengers in the vehicle. This is a driver-entered value
* **trip_distance**: The elapsed trip distance in miles reported by the taximeter.
* **rate_code**: The final rate code in effect at the end of the trip. 1= Standard rate 2=JFK 3=Newark 4=Nassau or Westchester 5=Negotiated fare 6=Group ride
* **store_and_fwd_flag**: This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, aka “store and forward,” because the vehicle did not have a connection to the server. Y= store and forward trip N= not a store and forward trip
* **payment_type**: A numeric code signifying how the passenger paid for the trip. 1= Credit card 2= Cash 3= No charge 4= Dispute 5= Unknown 6= Voided trip
* **fare_amount**: The time-and-distance fare calculated by the meter.
* **extra Miscellaneous**: extras and surcharges. Currently, this only includes the \$0.50 and 1 rush hour and overnight charges.
* **mta_tax**: \$0.50 MTA tax that is automatically triggered based on the metered rate in use
* **tip_amount**: Tip amount – This field is automatically populated for credit card tips. Cash tips are not included
* **tolls_amount**: Total amount of all tolls paid in trip.
* **imp_surcharge**: \$0.30 improvement surcharge assessed trips at the flag drop. The improvement surcharge began being levied in 2015.
* **total_amount**: The total amount charged to passengers. Does not include cash tips
* **pickup_location_id**: TLC Taxi Zone in which the taximeter was engaged
* **dropoff_location_id**: TLC Taxi Zone in which the taximeter was disengaged

## Import de bibliotecas

In [1]:
import pandas as pd
import seaborn as sns
from matplotlib import pyplot as plt

import pyspark.sql.functions as f
from pyspark.sql.types import StringType
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

## Criação Spark Session

In [52]:
# Criar a sessao do Spark
from pyspark.sql import SparkSession
spark = SparkSession \
            .builder \
            .master("local[4]") \
            .appName("nyc_<mudar-nome>") \
            .config("spark.jars.packages", "org.apache.hadoop:hadoop-azure:3.3.4,com.microsoft.azure:azure-storage:8.6.6") \
            .getOrCreate()

## Leitura dos Dados

In [53]:
from pyspark.sql.types import *

labels = (('VendorID', StringType()),
          ('passenger_count', FloatType()),
          ('trip_distance', FloatType()),
          ('RatecodeID', StringType()),
          ('store_and_fwd_flag', StringType()),
          ('payment_type', StringType()),
          ('fare_amount', FloatType()),
          ('extra', FloatType()),
          ('mta_tax', FloatType()),
          ('tip_amount', FloatType()),
          ('tolls_amount', FloatType()),
          ('improvement_surcharge', FloatType()),
          ('total_amount', FloatType()),
          ('pickup_datetime', TimestampType()),
          ('dropoff_datetime', TimestampType()))

schema = StructType([StructField(x[0], x[1], True) for x in labels])

In [None]:
STORAGE_ACCOUNT = 'dlspadseastusprod'
CONTAINER = 'big-data-comp-nuvem'
FOLDER = 'nyc-taxi'
import os
TOKEN = os.getenv('AZURE_STORAGE_TOKEN')


spark.conf.set("fs.azure.account.key." + STORAGE_ACCOUNT + ".blob.core.windows.net", TOKEN)

df = spark.read.csv("wasbs://{}@{}.blob.core.windows.net/{}/*.csv".format(CONTAINER, STORAGE_ACCOUNT, FOLDER), header=True, schema=schema)

In [55]:
df.schema

StructType([StructField('VendorID', StringType(), True), StructField('passenger_count', FloatType(), True), StructField('trip_distance', FloatType(), True), StructField('RatecodeID', StringType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('payment_type', StringType(), True), StructField('fare_amount', FloatType(), True), StructField('extra', FloatType(), True), StructField('mta_tax', FloatType(), True), StructField('tip_amount', FloatType(), True), StructField('tolls_amount', FloatType(), True), StructField('improvement_surcharge', FloatType(), True), StructField('total_amount', FloatType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropoff_datetime', TimestampType(), True)])

In [56]:
df = df.filter((df.total_amount < 10000) & \
               (df.total_amount > 0) & \
               (df.trip_distance < 1000) & \
               (df.passenger_count < 5) & \
               (df.pickup_datetime >= '2018') & \
               (df.pickup_datetime < '2019'))

In [57]:
df.cache()

DataFrame[VendorID: string, passenger_count: float, trip_distance: float, RatecodeID: string, store_and_fwd_flag: string, payment_type: string, fare_amount: float, extra: float, mta_tax: float, tip_amount: float, tolls_amount: float, improvement_surcharge: float, total_amount: float, pickup_datetime: timestamp, dropoff_datetime: timestamp]

In [58]:
df.count()

10405847

# Feature Engineering

In [59]:
df = df.withColumn('hour', f.hour(df.pickup_datetime))
df = df.withColumn('day_of_week', f.dayofweek(df.pickup_datetime))

In [60]:
df = df.drop('VendorID','pickup_datetime','dropoff_datetime','store_and_fwd_flag','total_amount')

In [61]:
df.limit(10).toPandas()

Unnamed: 0,passenger_count,trip_distance,RatecodeID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,hour,day_of_week
0,1.0,5.2,1,2,16.5,0.0,0.5,0.0,0.0,0.3,10,1
1,2.0,0.7,1,1,5.5,0.0,0.5,1.25,0.0,0.3,10,1
2,1.0,0.29,1,2,4.0,0.0,0.5,0.0,0.0,0.3,10,1
3,1.0,2.07,1,1,13.0,0.0,0.5,3.45,0.0,0.3,10,1
4,1.0,1.8,1,1,12.0,0.0,0.5,2.55,0.0,0.3,10,1
5,1.0,1.4,1,1,7.0,0.0,0.5,1.55,0.0,0.3,10,1
6,1.0,2.03,1,1,9.5,0.0,0.5,2.58,0.0,0.3,10,1
7,1.0,5.23,1,2,23.5,0.0,0.5,0.0,0.0,0.3,10,1
8,1.0,1.3,1,2,8.5,0.0,0.5,0.0,0.0,0.3,10,1
9,3.0,0.56,1,2,4.0,0.0,0.5,0.0,0.0,0.3,10,1


In [62]:
df.schema

StructType([StructField('passenger_count', FloatType(), True), StructField('trip_distance', FloatType(), True), StructField('RatecodeID', StringType(), True), StructField('payment_type', StringType(), True), StructField('fare_amount', FloatType(), True), StructField('extra', FloatType(), True), StructField('mta_tax', FloatType(), True), StructField('tip_amount', FloatType(), True), StructField('tolls_amount', FloatType(), True), StructField('improvement_surcharge', FloatType(), True), StructField('hour', IntegerType(), True), StructField('day_of_week', IntegerType(), True)])

# Train/Test Split

In [63]:
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
toy_df = test_df.sample(False,0.01,seed=42)

In [64]:
#print(train_df.count())
#print(test_df.count())
#print(toy_df.count())

# Feature Engineering: One-Hot-Enconding

In [65]:
# Preparação das variaveis categoricas do modelo
cat_features = ['RatecodeID','payment_type','hour','day_of_week']

indexOutputCols = [x + "Index" for x in cat_features]
oheOutputCols = [x + "OHE" for x in cat_features]

stringIndex = StringIndexer(inputCols=cat_features, outputCols=indexOutputCols, handleInvalid='skip', stringOrderType='frequencyDesc')
oheEncoder = OneHotEncoder(inputCols=indexOutputCols, outputCols=oheOutputCols)

# Feature Engineering: Feature Normalization

In [66]:
# prompt: Feature Engineering: Feature Normalization

# Numerical features to be normalized             'fare_amount'
num_features = ['passenger_count', 'trip_distance', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge']

# Create a vector assembler for numerical features
assembler = VectorAssembler(inputCols=num_features, outputCol="num_features_vec")

# Create a standard scaler to normalize the numerical features
scaler = StandardScaler(inputCol="num_features_vec", outputCol="scaled_num_features", withStd=True, withMean=True)

# Assembling dos vetores

In [67]:
# prompt: Assembling dos vetores

# Combine the OHE features and scaled numerical features into a single vector
featureCols = oheOutputCols + ["scaled_num_features"]
assembler_final = VectorAssembler(inputCols=featureCols, outputCol="features")

# Criação do Pipeline

In [68]:
# prompt: Criação do Pipeline

# lr = LinearRegression(featuresCol='features', labelCol='fare_amount')

pipeline = Pipeline(stages=[stringIndex, oheEncoder, assembler, scaler, assembler_final])
fitted_pipeline = pipeline.fit(train_df)

Transformed_train_df = fitted_pipeline.transform(train_df)


In [70]:
Transformed_train_df.limit(5).toPandas()

Unnamed: 0,passenger_count,trip_distance,RatecodeID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,...,payment_typeIndex,hourIndex,day_of_weekIndex,RatecodeIDOHE,payment_typeOHE,hourOHE,day_of_weekOHE,num_features_vec,scaled_num_features,features
0,0.0,0.0,1,1,2.5,0.0,0.5,0.0,0.0,0.3,...,0.0,8.0,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.5, 0.0, 0.0, 0.3000000119209...","[-1.946030945061688, -0.7705067163917121, -0.7...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, ..."
1,0.0,0.0,1,1,2.5,0.0,0.5,0.49,0.0,0.3,...,0.0,12.0,4.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 0.0, 0.5, 0.49000000953674316, 0.0,...","[-1.946030945061688, -0.7705067163917121, -0.7...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, ..."
2,0.0,0.0,1,1,2.5,0.0,0.5,5.0,0.0,0.3,...,0.0,8.0,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.5, 5.0, 0.0, 0.3000000119209...","[-1.946030945061688, -0.7705067163917121, -0.7...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, ..."
3,0.0,0.0,1,1,2.5,0.0,0.5,20.0,0.0,0.3,...,0.0,14.0,1.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.5, 20.0, 0.0, 0.300000011920...","[-1.946030945061688, -0.7705067163917121, -0.7...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, ..."
4,0.0,0.0,1,1,2.5,0.0,0.5,30.0,0.0,0.3,...,0.0,8.0,5.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0)","(0.0, 0.0, 0.0, 0.5, 30.0, 0.0, 0.300000011920...","[-1.946030945061688, -0.7705067163917121, -0.7...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, ..."


In [71]:
model = LinearRegression(maxIter=50,
                         solver = 'normal',
                         labelCol='fare_amount',
                        featuresCol='features',
                        elasticNetParam=0.2,
                        regParam=0.02
                         )
pipeline_model = Pipeline(stages=[stringIndex, oheEncoder, assembler, scaler, assembler_final, model])

# Model Training

In [73]:
fitted_pipe = pipeline_model.fit(train_df)
preds = fitted_pipe.transform(test_df)

# Model performance evaluation

In [None]:
# prompt: Model performance evaluation

from pyspark.ml.evaluation import RegressionEvaluator

# Create a RegressionEvaluator instance
evaluator = RegressionEvaluator(labelCol="fare_amount",
                                predictionCol="prediction",
                                metricName="rmse")

# Calculate RMSE
rmse = evaluator.evaluate(preds)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

# Calculate R-squared
r2 = evaluator.evaluate(preds, {evaluator.metricName: "r2"})
print("R-squared (R2) on test data = %g" % r2)

# Calculate MAE
mae = evaluator.evaluate(preds, {evaluator.metricName: "mae"})
print("Mean Absolute Error (MAE) on test data = %g" % mae)

RMSE of Prediction on test set: 4.9049361246251255


# Hyperparameter Tuning

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(model.regParam, [0.2, 0.3]) \
    .addGrid(model.elasticNetParam, [0.02, 0.03]) \

crossval = CrossValidator(estimator=pipeline_model,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="rmse"),
                          numFolds=5)  # Adjust the number of folds as needed

cvModel = crossval.fit(toy_df)


predictions = cvModel.transform(toy_df)
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data with tuned hyperparameters = %g" % rmse)

In [None]:
en = [0.2,0.3]
reg = [0.02,0.03]

elastic_net = [e for e in en for r in reg]
regularization = [r for e in en for r in reg]

rmse_df = pd.DataFrame({'rmse': cvModel.avgMetrics,
                       'elastic_net': elastic_net, 'regularization': regularization})
rmse_df.sort_values(by='rmse')

## Pratique

1. Separe o `dataframe` em dois, um de treino (80%) e outro de teste (20%)
2. Realize `OneHotEncoding` em variáveis categóricas
3. Normalize as variáveis numéricas com `StandardScaler`
4. Crie um pipeline com os stages criados em 2 e 3
5. Treine um regressor linear para prever a coluna `fare_amount`
6. Teste no conjunto de teste, reportando o `RMSE`

Documentação:

* [pyspark.sql.DataFrame.randomSplit](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.randomSplit.html)
* [pyspark.sql.DataFrame.sample](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.sample.html)
* [pyspark.ml.feature.StringIndexer](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.StringIndexer.html)
* [pyspark.ml.feature.OneHotEncoder](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.feature.OneHotEncoder.html)
* [pyspark.ml.feature.VectorAssembler](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html)
* [pyspark.ml.feature.StandardScaler](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.StandardScaler.html)
* [pyspark.ml.regression.LinearRegression](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.regression.LinearRegression.html)
* [pyspark.ml.Pipeline](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.Pipeline.html)
* [pyspark.ml.tuning.ParamGridBuilder](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.ParamGridBuilder.html)
* [pyspark.ml.tuning.CrossValidator](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.tuning.CrossValidator.html)