In [29]:
# show the first 10 entries
filePath = "gs://dataproc-staging-us-east1-484410736875-hru3bcdb/2019-01-h1.csv"
df = spark.read.option("delimiter", ",").option("header", True).csv(filePath)
df.select("passenger_count", "pulocationid", "dolocationid", "total_amount").show(10)

# Convert to float
df = df.withColumn("passenger_count",df.passenger_count.cast('double'))
df = df.withColumn("pulocationid",df.pulocationid.cast('double'))
df = df.withColumn("dolocationid",df.dolocationid.cast('double'))
df = df.withColumn("total_amount",df.total_amount.cast('double'))

df.printSchema()

+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            1.0|       151.0|       239.0|        9.95|
|            1.0|       239.0|       246.0|        16.3|
|            3.0|       236.0|       236.0|         5.8|
|            5.0|       193.0|       193.0|        7.55|
|            5.0|       193.0|       193.0|       55.55|
|            5.0|       193.0|       193.0|       13.31|
|            5.0|       193.0|       193.0|       55.55|
|            1.0|       163.0|       229.0|        9.05|
|            1.0|       229.0|         7.0|        18.5|
|            2.0|       141.0|       234.0|        13.0|
+---------------+------------+------------+------------+
only showing top 10 rows

root
 |-- vendorid: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: doub

In [30]:
# Create trainDF and testDF
trainDF, testDF = df.randomSplit([.8, .2], seed=42)
# print(f"""There are {trainDF.count()} rows in the training set, and {testDF.count()} in the test set""")

In [31]:
# Create a decision tree regressor to predict total_amount 
    # from the other three features.
# Preparing features with transformer
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=["passenger_count", "pulocationid", "dolocationid"], outputCol="features")
vecTrainDF = vecAssembler.transform(trainDF)
vecTrainDF.select("features", "total_amount").show(10)

[Stage 51:>                                                         (0 + 1) / 1]

+-----------------+------------+
|         features|total_amount|
+-----------------+------------+
| [1.0,80.0,112.0]|         6.3|
| [1.0,114.0,79.0]|       32.75|
| [1.0,50.0,226.0]|        25.3|
|  [1.0,249.0,4.0]|         9.8|
|[1.0,158.0,158.0]|         5.8|
| [1.0,246.0,68.0]|         7.8|
|[1.0,164.0,224.0]|        10.8|
|[1.0,226.0,129.0]|        55.3|
|[1.0,142.0,260.0]|        17.3|
|[1.0,141.0,133.0]|        40.0|
+-----------------+------------+
only showing top 10 rows



                                                                                

In [32]:
# Using estimators to build model
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol="features", labelCol="total_amount")
lrModel = lr.fit(vecTrainDF)

25/04/17 00:11:48 WARN Instrumentation: [83a068f0] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

In [33]:
# Apply parameters in transformer to generate predictions
m = round(lrModel.coefficients[0], 2)
b = round(lrModel.intercept, 2)
print(f"""The formula for the linear regression is 
price = {m}*features + {b}""")

The formula for the linear regression is 
price = -0.13*features + 20.25


In [34]:
# Creating a pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vecAssembler, lr])
pipelineModel = pipeline.fit(trainDF)

predDF = pipelineModel.transform(testDF)
predDF.select("passenger_count", "pulocationid", "dolocationid", "prediction").show(10)

25/04/17 00:13:36 WARN Instrumentation: [a590ddb6] regParam is zero, which might cause numerical instability and overfitting.
[Stage 56:>                                                         (0 + 1) / 1]

+---------------+------------+------------+------------------+
|passenger_count|pulocationid|dolocationid|        prediction|
+---------------+------------+------------+------------------+
|            1.0|       223.0|       223.0|14.295507101430278|
|            1.0|       234.0|       186.0|14.819666221870735|
|            1.0|       158.0|       249.0|14.474684931784473|
|            1.0|       140.0|       237.0|14.850869212545367|
|            1.0|       148.0|        79.0|17.461753356595686|
|            1.0|       233.0|       198.0|14.625122630225736|
|            1.0|       158.0|       164.0| 15.92041896284379|
|            4.0|       161.0|       229.0|14.466387913762233|
|            1.0|       143.0|       262.0| 14.39697321224956|
|            3.0|        37.0|        36.0|19.041095634239447|
+---------------+------------+------------+------------------+
only showing top 10 rows



                                                                                

In [35]:
# Evaluating model
from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator(
    predictionCol="prediction",
    labelCol="total_amount",
    metricName="rmse")
rmse = regressionEvaluator.evaluate(predDF)
print(rmse)



72.71737130085947


                                                                                

In [36]:
# Saving model
pipelinePath = "/tmp/lr-pipeline-model"
pipelineModel.write().overwrite().save(pipelinePath)

                                                                                