In [2]:
# Final one
from pyspark.sql import SparkSession # Build a SparkSession
from pyspark.sql import DataFrameReader # PySpark to read DataFrame

# Establish w/ Apache Spark SQL interface using PySpark and initialize configurations for application
spark = SparkSession.builder.appName("Taxi_Observation").getOrCreate()

dataset_path = "gs://for-spark-0709/2019-04.csv"

# assign an instance of DataFrameReader to reader
reader = DataFrameReader(spark)

# To load CSV file info dataFrame
# use Spark to load dataset file "2019-04.csv" into dataframe
# set two options for Spark to infer schema data types and include column headers
df = (spark.read.format("csv")
      .option("inferSchema", "true")
      .option("header", "true")
      .load(dataset_path))

# create DataFrame as temporary table view named "taxi_passenger"
df.createOrReplaceTempView("taxi_passenger")

query = """
SELECT passenger_count, pulocationid, dolocationid, total_amount
FROM taxi_passenger
"""

# run the query via Spark SQL and print the top 10 rows
pass_cnt_df = spark.sql(query)
pass_cnt_df.show(10)

                                                                                

+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            1.0|        68.0|        68.0|         7.3|
|            1.0|        50.0|        42.0|       23.15|
|            1.0|        95.0|       196.0|         9.8|
|            1.0|       211.0|       211.0|         6.8|
|            1.0|       237.0|       162.0|         7.8|
|            1.0|       148.0|        37.0|        20.3|
|            1.0|       265.0|       265.0|        0.31|
|            1.0|       265.0|       265.0|      240.35|
|            1.0|       237.0|       142.0|        10.3|
|            1.0|       249.0|        69.0|        36.3|
+---------------+------------+------------+------------+
only showing top 10 rows



In [4]:
# 2. Create trainDF and testDF.
trainDF, testDF = pass_cnt_df.randomSplit([0.8, 0.2], seed=42)
print(f"Numbers in total dataset: {pass_cnt_df.count()}")
print(f"Numbers in training set:  {trainDF.count()}")
print(f"Numbers in testing set:   {testDF.count()}")



Numbers in total dataset: 7433136


                                                                                

Numbers in training set:  5946606




Numbers in testing set:   1486530


                                                                                

In [5]:
# 3. Create a decision tree regressor to predict total_amount from the other three features.
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline

# define the selected features to new columns and instantiate them to "vecAssembler" object
columns = ["passenger_count", "pulocationid", "dolocationid"]
vecAssembler = VectorAssembler( inputCols = columns, outputCol = "features" )

# initialize a decision tree regression model and assign the selected columns featureCol and labelCol
decisionTreeR = DecisionTreeRegressor(featuresCol="features", labelCol="total_amount")

# 4. Create a pipeline.
# transformation of pipeline defined by VectorAssembler and DecisionTreeRegressor
pipeline = Pipeline(stages=[vecAssembler, decisionTreeR])

# 5. Train the model.
# train the pipline using the training dataset and save into pipeline_model
pipeline_model = pipeline.fit(trainDF)

# Make predictions for the given test data.
predictions_df = pipeline_model.transform(testDF)

# saves the predictions into a temporary SQL view
predictions_df.createOrReplaceTempView("results")

# 6. Show the predicted results along with the three features in the notebook. (Show the first 10 entries.)
# execute SQL query on temporary SQL view "result" with selected columns showing first 10 rows of dataFrame
spark.sql("""
SELECT passenger_count, pulocationid, dolocationid, prediction
FROM results
""").show(10)

[Stage 35:>                                                         (0 + 1) / 1]

+---------------+------------+------------+------------------+
|passenger_count|pulocationid|dolocationid|        prediction|
+---------------+------------+------------+------------------+
|            0.0|         1.0|         1.0|18.718776971163287|
|            0.0|         4.0|         4.0|18.718776971163287|
|            0.0|         4.0|        33.0|18.718776971163287|
|            0.0|         4.0|        79.0|15.615861838189069|
|            0.0|         4.0|       107.0|15.615861838189069|
|            0.0|         4.0|       144.0| 17.81124095206958|
|            0.0|         4.0|       234.0| 17.81124095206958|
|            0.0|         7.0|       121.0| 17.81124095206958|
|            0.0|         7.0|       223.0| 17.81124095206958|
|            0.0|         7.0|       223.0| 17.81124095206958|
+---------------+------------+------------+------------------+
only showing top 10 rows



                                                                                

In [7]:
# 7. Evaluate the model with RMSE
from pyspark.ml.evaluation import RegressionEvaluator

# RegressionEvaluator objects calculates the RMSE between predicted value
# and real values of selected columns
regress_evaluator = RegressionEvaluator(
    predictionCol="prediction",
    labelCol="total_amount",
    metricName="rmse")

# evaluate the RMSE for regression model by given real value and predicted value
rmse = regress_evaluator.evaluate(predictions_df)
rmse_f = round(rmse, 4) # round up the RMSE value up to four decimal places
print(f"RMSE (Root Mean Squared Error): {rmse_f}")




RMSE (Root Mean Squared Error): 11.8675


                                                                                