In [1]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

In [12]:
 import os
 arr = os.listdir()
 print(arr)

['.ipynb_checkpoints', 'realestate.csv', 'Realestae.ipynb', 'structured_network_wordcount.ipynb']


In [14]:
# Create a SparkSession (Note, the config section is only for Windows!)
spark = SparkSession.builder.appName("DecisionTree").getOrCreate()

# Load up data as dataframe
data = spark.read.option("header", "true").option("inferSchema", "true")\
    .csv("realestate.csv")

assembler = VectorAssembler().setInputCols(["HouseAge", "DistanceToMRT", \
                           "NumberConvenienceStores"]).setOutputCol("features")

In [23]:
data.printSchema();

data.show(5)

root
 |-- No: integer (nullable = true)
 |-- TransactionDate: double (nullable = true)
 |-- HouseAge: double (nullable = true)
 |-- DistanceToMRT: double (nullable = true)
 |-- NumberConvenienceStores: integer (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- PriceOfUnitArea: double (nullable = true)

+---+---------------+--------+-------------+-----------------------+--------+---------+---------------+
| No|TransactionDate|HouseAge|DistanceToMRT|NumberConvenienceStores|Latitude|Longitude|PriceOfUnitArea|
+---+---------------+--------+-------------+-----------------------+--------+---------+---------------+
|  1|       2012.917|    32.0|     84.87882|                     10|24.98298|121.54024|           37.9|
|  2|       2012.917|    19.5|     306.5947|                      9|24.98034|121.53951|           42.2|
|  3|       2013.583|    13.3|     561.9845|                      5|24.98746|121.54391|           47.3|
|  4|         2013.

In [18]:
# Data frame
df = assembler.transform(data).select("PriceOfUnitArea", "features")
df.printSchema();

df.show(5)

root
 |-- PriceOfUnitArea: double (nullable = true)
 |-- features: vector (nullable = true)

+---------------+--------------------+
|PriceOfUnitArea|            features|
+---------------+--------------------+
|           37.9|[32.0,84.87882,10.0]|
|           42.2| [19.5,306.5947,9.0]|
|           47.3| [13.3,561.9845,5.0]|
|           54.8| [13.3,561.9845,5.0]|
|           43.1|  [5.0,390.5684,5.0]|
+---------------+--------------------+
only showing top 5 rows



In [59]:
# Let's split our data into training data and testing data
trainTest = df.randomSplit([0.5, 0.5])
trainingDF = trainTest[0]
testDF = trainTest[1]

print("Lengths --> DF(All):" + str(df.count()) + "  ::  Training:" + str(trainingDF.count()) + "  ::  TestDF:" + str(testDF.count()))

Lengths --> DF(All):414  ::  Training:190  ::  TestDF:224


In [46]:
# Now create our decision tree
dtr = DecisionTreeRegressor().setFeaturesCol("features").setLabelCol("PriceOfUnitArea")

In [47]:
# Train the model using our training data
model = dtr.fit(trainingDF)

In [48]:
# Now see if we can predict values in our test data.
# Generate predictions using our decision tree model for all features in our
# test dataframe:
fullPredictions = model.transform(testDF).cache()

In [49]:
# Extract the predictions and the "known" correct labels.
predictions = fullPredictions.select("prediction").rdd.map(lambda x: x[0])
labels = fullPredictions.select("PriceOfUnitArea").rdd.map(lambda x: x[0])

In [50]:
# Zip them together
predictionAndLabel = predictions.zip(labels).collect()

In [58]:
# Print out the predicted and actual values for each point

# ALL
for prediction in predictionAndLabel[0:5]:
  print(prediction)


(34.5125, 7.6)
(16.085714285714285, 11.2)
(12.899999999999999, 11.6)
(23.05384615384616, 13.7)
(23.05384615384616, 13.8)


In [None]:
# Stop the session
spark.stop()