# Initializing spark connection

In [2]:
from pyspark.sql import SparkSession

team = "team30"
warehouse = "project/hive/warehouse"

spark = SparkSession.builder\
        .appName("{} - spark ML".format(team))\
        .master("yarn")\
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", warehouse)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .enableHiveSupport()\
        .getOrCreate()

In [31]:
rides = spark.read.format("avro").table("team30_projectdb.rides_part_buck")
rides.printSchema()

root
 |-- vendorid: integer (nullable = true)
 |-- pickup_time: timestamp (nullable = true)
 |-- dropoff_time: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- ratecodeid: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: short (nullable = true)



In [32]:
rides = rides.limit(10000)

In [33]:
rides.show()
print(rides.count())

+--------+-------------------+-------------------+---------------+-------------+----------------+---------------+----------+------------------+-----------------+----------------+-----------+-----+-------+---------------------+----------+------------+------------+------------+
|vendorid|        pickup_time|       dropoff_time|passenger_count|trip_distance|pickup_longitude|pickup_latitude|ratecodeid|store_and_fwd_flag|dropoff_longitude|dropoff_latitude|fare_amount|extra|mta_tax|improvement_surcharge|tip_amount|tolls_amount|total_amount|payment_type|
+--------+-------------------+-------------------+---------------+-------------+----------------+---------------+----------+------------------+-----------------+----------------+-----------+-----+-------+---------------------+----------+------------+------------+------------+
|       1|2015-01-02 08:23:27|2015-01-02 08:47:40|              1|         11.9|          -73.78|          40.64|         1|                 N|           -73.81|        

# Preprocessing

In [34]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Word2Vec, Tokenizer, RegexTokenizer
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorIndexer
import os
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql import DataFrame
from pyspark.sql.types import StringType
import pyspark.sql.functions as F
import math

def run(command):
    return os.popen(command).read()

In [35]:
all_features = ["vendorid", "passenger_count", "trip_distance", "ratecodeid", "payment_type",
               "pickup_time", "pickup_longitude", "pickup_latitude", 
               "dropoff_longitude", "dropoff_latitude"]
label = "fare_amount"

rides = rides.select(all_features + [label]).na.drop()
rides = rides.withColumnRenamed(label, "label")

rides.show()

+--------+---------------+-------------+----------+------------+-------------------+----------------+---------------+-----------------+----------------+-----+
|vendorid|passenger_count|trip_distance|ratecodeid|payment_type|        pickup_time|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|label|
+--------+---------------+-------------+----------+------------+-------------------+----------------+---------------+-----------------+----------------+-----+
|       1|              1|          8.3|         1|           1|2015-01-15 05:01:09|          -73.97|           40.8|            -73.9|            40.9| 26.5|
|       1|              1|         11.9|         1|           1|2015-01-27 15:11:57|          -73.98|          40.75|           -73.91|           40.88| 34.5|
|       1|              1|          0.6|         1|           1|2015-01-26 11:25:28|          -73.98|          40.77|           -73.98|           40.77|  2.5|
|       1|              1|          8.0|      

In [36]:
class TimeTransformer(Transformer):
    def __init__(self, inputCol, outputCols=["week_day_sin", "week_day_cos", "hour_sin", "hour_cos", "minute_sin", "minute_cos"]):
        super(TimeTransformer, self).__init__()
        self.inputCol = inputCol
        self.outputCols = outputCols

    def _transform(self, dataset):
        weekday = F.dayofweek(F.col(self.inputCol))
        weekday_sin = F.round(F.sin(2*math.pi*weekday/7), 14)
        weekday_cos = F.round(F.cos(2*math.pi*weekday/7), 14)

        hour = F.hour(F.col(self.inputCol))
        hour_sin = F.round(F.sin(2*math.pi*hour/7), 14)
        hour_cos = F.round(F.cos(2*math.pi*hour/7), 14)

        minute = F.minute(F.col(self.inputCol))
        minute_sin = F.round(F.sin(2*math.pi*minute/7), 14)
        minute_cos = F.round(F.cos(2*math.pi*minute/7), 14)

        return dataset.withColumn(self.outputCols[0], weekday_sin) \
                      .withColumn(self.outputCols[1], weekday_cos) \
                      .withColumn(self.outputCols[2], hour_sin) \
                      .withColumn(self.outputCols[3], hour_cos) \
                      .withColumn(self.outputCols[4], minute_sin) \
                      .withColumn(self.outputCols[5], minute_cos)


class CoordinatesTransformer(Transformer):
    def __init__(self, inputCol, outputCols):  # z is missing because it's always 0
        super(CoordinatesTransformer, self).__init__()
        self.lat = inputCol[0]
        self.lon = inputCol[1]
        self.outputCols = outputCols

    def _transform(self, dataset):
        a = 6378137
        e2 = 6.6943799901377997e-3
        n = a/F.sqrt(1 - e2*F.sin(self.lat)**2)

        x = n * F.cos(self.lat) * F.cos(self.lon)
        y = n * F.cos(self.lat) * F.sin(self.lon)

        return dataset.withColumn(self.outputCols[0], x) \
                      .withColumn(self.outputCols[1], y)

In [41]:
# cat_cols = ["vendorid", "ratecodeid" ,"payment_type"]
cat_cols = ["ratecodeid"]
num_cols = ["passenger_count", "trip_distance"]
time_cols = ["week_day_sin", "week_day_cos", "hour_sin", "hour_cos", "minute_sin", "minute_cos", "pickup_x", "pickup_y", "dropoff_x", "dropoff_y"]

indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)).setHandleInvalid("skip")
    for c in cat_cols
]

encoders = [
    OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol="{0}_encoded".format(indexer.getOutputCol()))
    for indexer in indexers
]

time_transformer = TimeTransformer(inputCol="pickup_time")
coordinates_transformer_1 = CoordinatesTransformer(inputCol=["pickup_latitude", "pickup_longitude"], outputCols=["pickup_x", "pickup_y"])
coordinates_transformer_2 = CoordinatesTransformer(inputCol=["dropoff_latitude", "dropoff_longitude"], outputCols=["dropoff_x", "dropoff_y"])


assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + num_cols + time_cols, outputCol="features", handleInvalid="keep")

pipeline = Pipeline(stages=indexers + encoders + [time_transformer] + [coordinates_transformer_1] + [coordinates_transformer_2] + [assembler])
# pipeline = Pipeline(stages=indexers + encoders + [time_transformer] + [coordinates_transformer_1] + [coordinates_transformer_2])

model = pipeline.fit(rides)
data = model.transform(rides)
data = data.select(["features", "label"])
data.show()
print(data.count())

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[1.0,0.0,0.0,0.0,...| 34.0|
|[1.0,0.0,0.0,0.0,...| 39.0|
|[1.0,0.0,0.0,0.0,...| 26.5|
|[1.0,0.0,0.0,0.0,...| 32.0|
|[0.0,0.0,1.0,0.0,...|  0.0|
|[1.0,0.0,0.0,0.0,...|  2.5|
|[1.0,0.0,0.0,0.0,...| 21.5|
|[1.0,0.0,0.0,0.0,...| 35.5|
|[1.0,0.0,0.0,0.0,...| 52.0|
|[1.0,0.0,0.0,0.0,...| 31.5|
|[1.0,0.0,0.0,0.0,...| 19.5|
|[1.0,0.0,0.0,0.0,...| 17.5|
|[1.0,0.0,0.0,0.0,...| 45.0|
|[0.0,0.0,1.0,0.0,...| 0.01|
|[0.0,0.0,1.0,0.0,...| 68.0|
|[0.0,0.0,1.0,0.0,...| 0.01|
|[0.0,1.0,0.0,0.0,...| 52.0|
|[1.0,0.0,0.0,0.0,...| 21.5|
|[0.0,0.0,1.0,0.0,...| 0.01|
|[1.0,0.0,0.0,0.0,...|  6.0|
+--------------------+-----+
only showing top 20 rows

10000


In [42]:
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=6).fit(data)
transformed = featureIndexer.transform(data)
transformed.show()

+--------------------+-----+--------------------+
|            features|label|     indexedFeatures|
+--------------------+-----+--------------------+
|[1.0,0.0,0.0,0.0,...| 49.5|[1.0,0.0,0.0,0.0,...|
|[0.0,0.0,1.0,0.0,...| 35.0|[0.0,0.0,1.0,0.0,...|
|[0.0,0.0,0.0,1.0,...| 61.0|[0.0,0.0,0.0,1.0,...|
|[1.0,0.0,0.0,0.0,...| 51.0|[1.0,0.0,0.0,0.0,...|
|[1.0,0.0,0.0,0.0,...|  2.5|[1.0,0.0,0.0,0.0,...|
|[1.0,0.0,0.0,0.0,...| 36.5|[1.0,0.0,0.0,0.0,...|
|[1.0,0.0,0.0,0.0,...|  6.0|[1.0,0.0,0.0,0.0,...|
|[1.0,0.0,0.0,0.0,...| 34.5|[1.0,0.0,0.0,0.0,...|
|[1.0,0.0,0.0,0.0,...|  2.5|[1.0,0.0,0.0,0.0,...|
|[0.0,0.0,0.0,1.0,...| 71.5|[0.0,0.0,0.0,1.0,...|
|[0.0,0.0,1.0,0.0,...| 60.0|[0.0,0.0,1.0,0.0,...|
|[1.0,0.0,0.0,0.0,...| 28.0|[1.0,0.0,0.0,0.0,...|
|[1.0,0.0,0.0,0.0,...|  6.5|[1.0,0.0,0.0,0.0,...|
|[1.0,0.0,0.0,0.0,...| 33.5|[1.0,0.0,0.0,0.0,...|
|[1.0,0.0,0.0,0.0,...| 55.5|[1.0,0.0,0.0,0.0,...|
|[1.0,0.0,0.0,0.0,...| 30.0|[1.0,0.0,0.0,0.0,...|
|[1.0,0.0,0.0,0.0,...| 38.5|[1.0,0.0,0.0,0.0,...|


In [10]:
# train_data, test_data = transformed.randomSplit([0.7, 0.3])

# run("hdfs dfs -rm -R project/data")

# train_data.select("indexedFeatures", "label")\
#     .coalesce(1)\
#     .write\
#     .mode("overwrite")\
#     .format("json")\
#     .save("project/data/train")

# # Run it from root directory of the repository
# run("hdfs dfs -cat project/data/train/*.json > data/train.json")

# test_data.select("indexedFeatures", "label")\
#     .coalesce(1)\
#     .write\
#     .mode("overwrite")\
#     .format("json")\
#     .save("project/data/test")

# # Run it from root directory of the repository
# run("hdfs dfs -cat project/data/test/*.json > data/test.json")

''

In [43]:
train_data, test_data = transformed.randomSplit([0.7, 0.3])

In [47]:
test_data.count()

3030

# Modeling

In [54]:
from pyspark.ml.evaluation import RegressionEvaluator 

# Create evaluators
rmse_evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
r2_evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

In [55]:
from pyspark.ml.regression import LinearRegression
# Create Linear Regression Model
lr = LinearRegression(featuresCol="indexedFeatures")

# Fit the data to the pipeline stages
model_lr = lr.fit(train_data)
predictions = model_lr.transform(test_data)
predictions.show(10)
S
rmse1 = rmse_evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse1)
mae1 = r2_evaluator.evaluate(predictions)
print("R-squared (R2) on test data = %g" % mae1)

+--------------------+-----+--------------------+------------------+
|            features|label|     indexedFeatures|        prediction|
+--------------------+-----+--------------------+------------------+
|(18,[0,6,7,8,9,10...| 12.5|(18,[0,6,7,8,9,10...|  9.33452696000295|
|(18,[0,6,7,8,9,10...| 13.0|(18,[0,6,7,8,9,10...|  10.6590169095474|
|(18,[0,6,7,8,9,10...| 34.0|(18,[0,6,7,8,9,10...| 18.00533922970627|
|(18,[0,6,7,8,9,11...| 40.5|(18,[0,6,7,8,9,11...| 14.32991615977065|
|(18,[0,6,7,8,9,11...| 68.0|(18,[0,6,7,8,9,11...|21.761929461627634|
|(18,[0,6,7,8,9,11...| 24.5|(18,[0,6,7,8,9,11...| 16.08282879884877|
|(18,[0,6,7,9,10,1...| 33.0|(18,[0,6,7,9,10,1...| 14.31467784199113|
|(18,[0,6,7,9,10,1...| 27.5|(18,[0,6,7,9,10,1...|15.084052116677046|
|(18,[0,6,7,9,11,1...| 35.0|(18,[0,6,7,9,11,1...| 16.28414112308896|
|(18,[0,6,7,9,11,1...| 25.5|(18,[0,6,7,9,11,1...|  20.5633486006784|
|(18,[0,6,7,9,11,1...| 34.0|(18,[0,6,7,9,11,1...| 16.20634139561591|
|(18,[0,6,7,9,11,1...| 13.0|(18,[0

In [52]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pprint import pprint
import numpy as np

lr_grid = ParamGridBuilder()
lr_grid = lr_grid.addGrid(model_lr.aggregationDepth, [2, 3])\
                 .addGrid(model_lr.regParam, [0.001, 0])\
                 .build()

cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=lr_grid,
                    evaluator=rmse_evaluator,
                    parallelism=5,
                    numFolds=3)

print("DEBUG: Running grid search...")
cvModel = cv.fit(train_data)
model1 = cvModel.bestModel
pprint(model1.extractParamMap())

print("DEBUG: Saving the model")
model1.write().overwrite().save("project/models/model1")
run("hdfs dfs -get project/models/model1 models/model1")

print("DEBUG: Making and saving predictions")
predictions = model1.transform(test_data)
predictions.show(10)

predictions.select("label", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header", "true")\
    .save("project/output/model1_predictions.csv")

run("hdfs dfs -cat project/output/model1_predictions.csv/*.csv > output/model1_predictions.csv")

print("DEBUG: Evaluating performance")
rmse1 = rmse_evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse1)
mae1 = r2_evaluator.evaluate(predictions)
print("R-squared (R2) on test data = %g" % mae1)

DEBUG: Running grid search...
{Param(parent='LinearRegression_c867a86ffbe3', name='loss', doc='The loss function to be optimized. Supported options: squaredError, huber.'): 'squaredError',
 Param(parent='LinearRegression_c867a86ffbe3', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LinearRegression_c867a86ffbe3', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0).'): 1e-06,
 Param(parent='LinearRegression_c867a86ffbe3', name='epsilon', doc='The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber'): 1.35,
 Param(parent='LinearRegression_c867a86ffbe3', name='maxBlockSizeInMB', doc='maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. Default 0.0 represents choosing optimal value, depends on specific algorithm. Must be >= 0.'): 0.0,
 Param(parent='LinearRegres

In [60]:
from pyspark.ml.regression import RandomForestRegressor
# Create Logistic Regression Model
rfr = RandomForestRegressor(featuresCol="indexedFeatures")

# Fit the data to the pipeline stages
model_rfr = rfr.fit(train_data)
predictions = model_rfr.transform(test_data)
predictions.show(10)

rmse2 = rmse_evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse2)
mae2 = r2_evaluator.evaluate(predictions)
print("R-squared (R2) on test data = %g" % mae2)

+--------------------+-----+--------------------+------------------+
|            features|label|     indexedFeatures|        prediction|
+--------------------+-----+--------------------+------------------+
|(18,[0,6,7,8,9,10...| 37.0|(18,[0,6,7,8,9,10...|29.002504830128323|
|(18,[0,6,7,8,9,10...| 29.0|(18,[0,6,7,8,9,10...|29.173746840711907|
|(18,[0,6,7,8,9,10...| 32.0|(18,[0,6,7,8,9,10...| 31.73400265603728|
|(18,[0,6,7,9,10,1...| 32.0|(18,[0,6,7,9,10,1...|32.201308504551186|
|(18,[0,6,7,9,11,1...| 27.0|(18,[0,6,7,9,11,1...| 28.40500482500517|
|(18,[0,6,7,9,11,1...| 27.5|(18,[0,6,7,9,11,1...|29.354665731307183|
|(18,[0,6,7,9,11,1...| 87.5|(18,[0,6,7,9,11,1...|41.467430292986975|
|(18,[0,6,7,9,11,1...| 31.0|(18,[0,6,7,9,11,1...| 31.87056868825747|
|(18,[1,6,7,9,10,1...| 52.0|(18,[1,6,7,9,10,1...| 51.20303102811455|
|(18,[1,6,7,9,11,1...| 52.0|(18,[1,6,7,9,11,1...| 47.61292200486994|
+--------------------+-----+--------------------+------------------+
only showing top 10 rows

Root Mea

In [61]:
rfr_grid = ParamGridBuilder()
rfr_grid = rfr_grid.addGrid(model_rfr.maxDepth, [5, 6])\
                   .addGrid(model_rfr.maxBins, [32, 64])\
                   .build()

cv = CrossValidator(estimator=rfr,
                    estimatorParamMaps=rfr_grid,
                    evaluator=rmse_evaluator,
                    parallelism=5,
                    numFolds=3)

print("DEBUG: Running grid search...")
cvModel = cv.fit(train_data)
model2 = cvModel.bestModel
pprint(model2.extractParamMap())

print("DEBUG: Saving the model")
model2.write().overwrite().save("project/models/model2")
run("hdfs dfs -get project/models/model2 models/model2")

print("DEBUG: Making and saving predictions")
predictions = model2.transform(test_data)
predictions.show(10)

predictions.select("label", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header", "true")\
    .save("project/output/model2_predictions.csv")

run("hdfs dfs -cat project/output/model2_predictions.csv/*.csv > output/model2_predictions.csv")

print("DEBUG: Evaluating performance")
rmse2 = rmse_evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse2)
mae2 = r2_evaluator.evaluate(predictions)
print("R-squared (R2) on test data = %g" % mae2)

DEBUG: Running grid search...
{Param(parent='RandomForestRegressor_c5357e9ca7ea', name='predictionCol', doc='prediction column name.'): 'prediction',
 Param(parent='RandomForestRegressor_c5357e9ca7ea', name='labelCol', doc='label column name.'): 'label',
 Param(parent='RandomForestRegressor_c5357e9ca7ea', name='seed', doc='random seed.'): 21597229747921251,
 Param(parent='RandomForestRegressor_c5357e9ca7ea', name='minInfoGain', doc='Minimum information gain for a split to be considered at a tree node.'): 0.0,
 Param(parent='RandomForestRegressor_c5357e9ca7ea', name='numTrees', doc='Number of trees to train (>= 1).'): 20,
 Param(parent='RandomForestRegressor_c5357e9ca7ea', name='maxMemoryInMB', doc='Maximum memory in MB allocated to histogram aggregation. If too small, then 1 node will be split per iteration, and its aggregates may exceed this size.'): 256,
 Param(parent='RandomForestRegressor_c5357e9ca7ea', name='minInstancesPerNode', doc='Minimum number of instances each child must ha

# Compare the best models

In [62]:
# Create data frame to report performance of the models
models = [[str(model1), rmse1, mae1], [str(model2), rmse2, mae2]]

df = spark.createDataFrame(models, ["model", "RMSE", "R2"])
df.show(truncate=False)

# Save it to HDFS
df.coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header", "true")\
    .save("project/output/evaluation.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/evaluation.csv/*.csv > output/evaluation.csv")

+------------------------------------------------------------------------------------------------+------------------+-------------------+
|model                                                                                           |RMSE              |R2                 |
+------------------------------------------------------------------------------------------------+------------------+-------------------+
|LinearRegressionModel: uid=LinearRegression_c867a86ffbe3, numFeatures=18                        |10.683474479485891|-1.7180160520183727|
|RandomForestRegressionModel: uid=RandomForestRegressor_c5357e9ca7ea, numTrees=20, numFeatures=18|7.37551565203668  |0.7689579121447947 |
+------------------------------------------------------------------------------------------------+------------------+-------------------+



''