In [5]:
import pyspark
print(pyspark.__version__)
import py4j
print(py4j.__version__)
import subprocess

result = subprocess.run('java -version', capture_output=True, text=True, shell=True)
print(result.stderr)

import os

# only for running locally. 
os.environ["JAVA_HOME"] = r"C:\Program Files\Java\jdk-17"
os.environ["PATH"] = os.environ["JAVA_HOME"] + r"\bin;" + os.environ["PATH"]

result = subprocess.run('java -version', capture_output=True, text=True, shell=True)
print(result.stderr)

3.5.5
0.10.9.7
openjdk version "1.8.0_432-432"
OpenJDK Runtime Environment (build 1.8.0_432-432-b06)
OpenJDK 64-Bit Server VM (build 25.432-b06, mixed mode)

java version "17.0.12" 2024-07-16 LTS
Java(TM) SE Runtime Environment (build 17.0.12+8-LTS-286)
Java HotSpot(TM) 64-Bit Server VM (build 17.0.12+8-LTS-286, mixed mode, sharing)



In [10]:
from pyspark.sql import SparkSession
import os

spark = SparkSession.builder.appName("SparkSQLApp").master("local[*]").config("spark.driver.host", "localhost").config("spark.driver.bindAddress", "127.0.0.1").getOrCreate()

csv_file = "C:/Users/hovak/OneDrive/Desktop/a4-spark/2019-01-h1.csv"

df = (spark.read.format("csv")
        .option("inferSchema", "true")
        .option("header", "true")
        .load(csv_file))

In [11]:
my_cols = ["passenger_count", "pulocationid", "dolocationid", "total_amount"]
taxiDF = df.select(*my_cols)
print("First 10 rows of the working DataFrame:")
taxiDF.show(10, truncate=False)

First 10 rows of the working DataFrame:
+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|1.0            |151.0       |239.0       |9.95        |
|1.0            |239.0       |246.0       |16.3        |
|3.0            |236.0       |236.0       |5.8         |
|5.0            |193.0       |193.0       |7.55        |
|5.0            |193.0       |193.0       |55.55       |
|5.0            |193.0       |193.0       |13.31       |
|5.0            |193.0       |193.0       |55.55       |
|1.0            |163.0       |229.0       |9.05        |
|1.0            |229.0       |7.0         |18.5        |
|2.0            |141.0       |234.0       |13.0        |
+---------------+------------+------------+------------+
only showing top 10 rows



In [12]:
trainDF, testDF = taxiDF.randomSplit([0.8, 0.2], seed=42)

In [13]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline

assembler = VectorAssembler(
    inputCols=["passenger_count", "pulocationid", "dolocationid"],
    outputCol="features"
)

dtr = (DecisionTreeRegressor(labelCol="total_amount",
                             featuresCol="features",
                             maxDepth=10)           
       .setMaxBins(700))         

pipeline = Pipeline(stages=[assembler, dtr])

In [14]:
model = pipeline.fit(trainDF)

In [15]:
predictions = model.transform(testDF)

print("Features and prediction sample:")
(predictions
   .select("passenger_count", "pulocationid", "dolocationid",
           "prediction", "total_amount")
   .show(10, truncate=False))

Features and prediction sample:
+---------------+------------+------------+------------------+------------+
|passenger_count|pulocationid|dolocationid|prediction        |total_amount|
+---------------+------------+------------+------------------+------------+
|0.0            |4.0         |4.0         |10.370287081339715|5.75        |
|0.0            |4.0         |90.0        |14.11333610418399 |12.3        |
|0.0            |4.0         |144.0       |14.11333610418399 |9.45        |
|0.0            |7.0         |7.0         |7.537342592592594 |5.8         |
|0.0            |7.0         |112.0       |14.11333610418399 |16.8        |
|0.0            |7.0         |164.0       |14.11333610418399 |21.8        |
|0.0            |12.0        |45.0        |17.034383527033853|10.8        |
|0.0            |13.0        |13.0        |13.741151802656542|9.95        |
|0.0            |13.0        |164.0       |19.998085075066232|18.35       |
|0.0            |13.0        |164.0       |19.9980850750

In [16]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="total_amount",
                                predictionCol="prediction",
                                metricName="rmse")

rmse = evaluator.evaluate(predictions)
print(f"Root-Mean-Squared-Error (RMSE): {rmse:.2f}")

Root-Mean-Squared-Error (RMSE): 77.21
