In [21]:
!pip install pyspark



In [22]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

In [23]:
spark = SparkSession.builder.appName("DecisionTreeExample").getOrCreate()

Read data

In [24]:
data = spark.read.option("header", "true").option("inferSchema", "true").csv("realestate.csv")

In [25]:
data.show(2)

+---+---------------+--------+-------------+-----------------------+--------+---------+---------------+
| No|TransactionDate|HouseAge|DistanceToMRT|NumberConvenienceStores|Latitude|Longitude|PriceOfUnitArea|
+---+---------------+--------+-------------+-----------------------+--------+---------+---------------+
|  1|       2012.917|    32.0|     84.87882|                     10|24.98298|121.54024|           37.9|
|  2|       2012.917|    19.5|     306.5947|                      9|24.98034|121.53951|           42.2|
+---+---------------+--------+-------------+-----------------------+--------+---------+---------------+
only showing top 2 rows



The below step is creating a new column called features, which will be a vector that contains all the values from the specified columns ("HouseAge", "DistanceToMRT", "NumberConvenienceStores", "Latitude", "Longitude"). This feature vector can then be used as input for machine learning models in PySpark.

In [26]:
assembler = VectorAssembler().setInputCols(["HouseAge", "DistanceToMRT", "NumberConvenienceStores", "Latitude", "Longitude"]).setOutputCol("features")

In [27]:
df = assembler.transform(data).select("PriceOfUnitArea", "features")

In [28]:
df

DataFrame[PriceOfUnitArea: double, features: vector]

In [29]:
trainTest = df.randomSplit([0.8,0.2])
trainDF = trainTest[0]
testDF = trainTest[1]

Initialize DecisionTreeRegressor:

From https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.regression.DecisionTreeRegressor.html:

class pyspark.ml.regression.DecisionTreeRegressor(*, featuresCol: str = 'features', labelCol: str = 'label', predictionCol: str = 'prediction', maxDepth: int = 5, maxBins: int = 32, minInstancesPerNode: int = 1, minInfoGain: float = 0.0, maxMemoryInMB: int = 256, cacheNodeIds: bool = False, checkpointInterval: int = 10, impurity: str = 'variance', seed: Optional[int] = None, varianceCol: Optional[str] = None, weightCol: Optional[str] = None, leafCol: str = '', minWeightFractionPerNode: float = 0.0)[source]¶

In [30]:
spark_DecisionTree = DecisionTreeRegressor(featuresCol = 'features', labelCol='PriceOfUnitArea')

In [31]:
model = spark_DecisionTree.fit(trainDF)

Lets perform test and cache the result

By caching the DataFrame, you store the results of the transformation (in this case, the predictions) in memory. This avoids recomputing the same DataFrame multiple times, saving time and resources, especially when the transformations are complex or when the data is large.

In [32]:
predictions = model.transform(testDF).cache()

In [33]:
predictions.show(5)

+---------------+--------------------+------------------+
|PriceOfUnitArea|            features|        prediction|
+---------------+--------------------+------------------+
|           11.6|[16.0,4066.587,0....|15.809999999999999|
|           15.5|[26.9,4449.27,0.0...|18.883333333333333|
|           15.6|[25.6,4519.69,0.0...|18.883333333333333|
|           16.1|[31.9,1146.329,0....|13.433333333333335|
|           17.4|[27.1,4412.765,1....|18.883333333333333|
+---------------+--------------------+------------------+
only showing top 5 rows



We convert to rdd as they are easy to deal with while extarcting values compared to dataframe.

In [34]:
predicted_values = predictions.select("prediction").rdd.map(lambda x:x[0])
label_values = predictions.select("PriceOfUnitArea").rdd.map(lambda x:x[0])

In [35]:
predicted_values_and_labels = predicted_values.zip(label_values).collect()

In [36]:
for prediction in predicted_values_and_labels:
  print(prediction)

(15.809999999999999, 11.6)
(18.883333333333333, 15.5)
(18.883333333333333, 15.6)
(13.433333333333335, 16.1)
(18.883333333333333, 17.4)
(15.809999999999999, 20.5)
(26.26271186440678, 20.7)
(26.26271186440678, 20.9)
(21.2, 22.3)
(26.26271186440678, 22.9)
(38.071052631578944, 23.0)
(26.26271186440678, 23.1)
(21.2, 23.1)
(26.26271186440678, 23.5)
(26.26271186440678, 24.7)
(40.55178571428572, 25.0)
(26.26271186440678, 25.9)
(26.26271186440678, 26.5)
(26.26271186440678, 27.0)
(26.26271186440678, 27.7)
(26.26271186440678, 28.6)
(38.071052631578944, 28.8)
(35.160000000000004, 29.3)
(38.071052631578944, 29.7)
(26.26271186440678, 30.7)
(38.071052631578944, 30.9)
(26.26271186440678, 31.1)
(26.26271186440678, 31.3)
(31.814285714285713, 34.0)
(38.071052631578944, 34.1)
(40.55178571428572, 34.2)
(28.9375, 34.3)
(31.814285714285713, 34.6)
(40.55178571428572, 35.3)
(40.55178571428572, 35.5)
(40.55178571428572, 37.4)
(40.55178571428572, 37.5)
(38.071052631578944, 40.1)
(40.55178571428572, 40.3)
(38.071

In [16]:
spark.stop()