In [1]:
# Install Hadoop
!pip install pyspark



Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=780c317f38dfc3a12435ecbbb083eed334992c417b70d8a29584e9b3d900e61d
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [2]:
# Import SparkSession
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler

In [3]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler

In [4]:
# Create a Spark Session and configure Spark context
spark=SparkSession.builder\
    .master("local[*]")\
    .appName("WordCount")\
    .getOrCreate()
sc=spark.sparkContext


In [5]:
#Mount (connect to) Google drive to be able to read from it (copy data files into HDFS)
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [6]:
#reading the data
df = spark.read.csv('/content/drive/My Drive/BOE/statistical_indicators.csv', header = True, inferSchema = True)

In [13]:
df.show()

+--------------------+------------+----------------------------------------------------------+---------------------------------------------------------+-----------------------------------------------------------+---------------------------------------------+-----------------------------------------------+--------------------------------------------+-----------------------------------------------+-------------------------------------------------+----------------------------------------------+-----------------------+--------------------------+-----------------------------+--------------------------+-------------+-------------------------------+--------------------------+---------------------------------+----------------------------------------------+------------------------+------------------------------+--------------------------+
|           Data_Zone|House_Prices|Travel_times_(minutes)_to_GP_surgeries_by_public_transport|Travel_times_(minutes)_to_post_office_by_public_transport|Travel

Pre-Processing the Data

In [7]:
#dropping the columns that are not needed
columns_to_drop = ["Data_Zone", "Urban_Rural_Classification"]
df = df.drop(*columns_to_drop)

df.show()

+------------+----------------------------------------------------------+---------------------------------------------------------+-----------------------------------------------------------+---------------------------------------------+-----------------------------------------------+--------------------------------------------+-----------------------------------------------+-------------------------------------------------+----------------------------------------------+-----------------------+-----------------------------+--------------------------+-------------+-------------------------------+--------------------------+---------------------------------+----------------------------------------------+------------------------+------------------------------+--------------------------+
|House_Prices|Travel_times_(minutes)_to_GP_surgeries_by_public_transport|Travel_times_(minutes)_to_post_office_by_public_transport|Travel_times_(minutes)_to_retail_centre_by_public_transport|Travel_times_(m

In [21]:
df

DataFrame[House_Prices: int, Travel_times_(minutes)_to_GP_surgeries_by_public_transport: double, Travel_times_(minutes)_to_post_office_by_public_transport: double, Travel_times_(minutes)_to_retail_centre_by_public_transport: double, Travel_times_(minutes)_to_GP_surgeries_by_car: double, Travel_times_(minutes)_to_petrol_station_by_car: double, Travel_times_(minutes)_to_post_office_by_car: double, Travel_times_(minutes)_to_primary_school_by_car: double, Travel_times_(minutes)_to_secondary_school_by_car: double, Travel_times_(minutes)_to_retail_centre_by_car: double, Land_area_(in_hectares): double, Dwellings_per_hectare_(ratio): double, Detached_dwellings_(ratio): double, Flats_(ratio): double, Semi-detached_dwellings_(ratio): double, Terraced_dwellings_(ratio): double, Dwellings_of_unknown_type_(ratio): double, Households_with_single_adult_discounts_(ratio): int, Crime_indicators_(ratio): int, Employment_deprivation_(ratio): int, Comparative_Illness_Factor: int]

In [8]:
#checking for missing values
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count

# Create a SparkSession (the entry point to PySpark)
spark = SparkSession.builder.appName("CountMissingValues").getOrCreate()
missing_value_counts = df.agg(*[count(col(c)).alias(c) for c in df.columns])

# Show the missing value counts
missing_value_counts.show()

# Sum the counts to get the total count of missing values in the DataFrame
total_missing_values = sum(missing_value_counts.collect()[0])

print("Total missing values in the DataFrame:", total_missing_values)

+------------+----------------------------------------------------------+---------------------------------------------------------+-----------------------------------------------------------+---------------------------------------------+-----------------------------------------------+--------------------------------------------+-----------------------------------------------+-------------------------------------------------+----------------------------------------------+-----------------------+-----------------------------+--------------------------+-------------+-------------------------------+--------------------------+---------------------------------+----------------------------------------------+------------------------+------------------------------+--------------------------+
|House_Prices|Travel_times_(minutes)_to_GP_surgeries_by_public_transport|Travel_times_(minutes)_to_post_office_by_public_transport|Travel_times_(minutes)_to_retail_centre_by_public_transport|Travel_times_(m

In [9]:
#removing blank rows from the dataset for all the columns based on the blank values in house prices
spark = SparkSession.builder.appName("FilterNullValues").getOrCreate()


# Filter out rows containing null values in the 'House_Prices' column
df_noNA = df.dropna(subset=["House_Prices"])

# Show the DataFrame after filtering
df_noNA.show()

df_noNA.count()



+------------+----------------------------------------------------------+---------------------------------------------------------+-----------------------------------------------------------+---------------------------------------------+-----------------------------------------------+--------------------------------------------+-----------------------------------------------+-------------------------------------------------+----------------------------------------------+-----------------------+-----------------------------+--------------------------+-------------+-------------------------------+--------------------------+---------------------------------+----------------------------------------------+------------------------+------------------------------+--------------------------+
|House_Prices|Travel_times_(minutes)_to_GP_surgeries_by_public_transport|Travel_times_(minutes)_to_post_office_by_public_transport|Travel_times_(minutes)_to_retail_centre_by_public_transport|Travel_times_(m

6014

In [10]:
missing_value_counts.show()

# Sum the counts to get the total count of missing values in the DataFrame
total_missing_values = sum(missing_value_counts.collect()[0])

print("Total missing values in the DataFrame:", total_missing_values)

+------------+----------------------------------------------------------+---------------------------------------------------------+-----------------------------------------------------------+---------------------------------------------+-----------------------------------------------+--------------------------------------------+-----------------------------------------------+-------------------------------------------------+----------------------------------------------+-----------------------+-----------------------------+--------------------------+-------------+-------------------------------+--------------------------+---------------------------------+----------------------------------------------+------------------------+------------------------------+--------------------------+
|House_Prices|Travel_times_(minutes)_to_GP_surgeries_by_public_transport|Travel_times_(minutes)_to_post_office_by_public_transport|Travel_times_(minutes)_to_retail_centre_by_public_transport|Travel_times_(m

In [47]:
print(df_noNA)

DataFrame[House_Prices: int, Travel_times_(minutes)_to_GP_surgeries_by_public_transport: double, Travel_times_(minutes)_to_post_office_by_public_transport: double, Travel_times_(minutes)_to_retail_centre_by_public_transport: double, Travel_times_(minutes)_to_GP_surgeries_by_car: double, Travel_times_(minutes)_to_petrol_station_by_car: double, Travel_times_(minutes)_to_post_office_by_car: double, Travel_times_(minutes)_to_primary_school_by_car: double, Travel_times_(minutes)_to_secondary_school_by_car: double, Travel_times_(minutes)_to_retail_centre_by_car: double, Land_area_(in_hectares): double, Dwellings_per_hectare_(ratio): double, Detached_dwellings_(ratio): double, Flats_(ratio): double, Semi-detached_dwellings_(ratio): double, Terraced_dwellings_(ratio): double, Dwellings_of_unknown_type_(ratio): double, Households_with_single_adult_discounts_(ratio): int, Crime_indicators_(ratio): int, Employment_deprivation_(ratio): int, Comparative_Illness_Factor: int]


In [11]:
from pyspark.sql.functions import col, when, median

# Create a SparkSession (the entry point to PySpark)
spark = SparkSession.builder.appName("ReplaceNullWithMedian").getOrCreate()


# Specify the column for which you want to replace null values with the median
column_to_impute_1 = "Households_with_single_adult_discounts_(ratio)"

# Calculate the median value for the specified column
median_value = df_noNA.select(median(col(column_to_impute_1))).first()[0]

# Replace null values in the specified column with the median value
df_noNA = df_noNA.withColumn(column_to_impute_1, when(col(column_to_impute_1).isNull(), median_value).otherwise(col(column_to_impute_1)))


# Specify the column for which you want to replace null values with the median
column_to_impute_2 = "Crime_indicators_(ratio)"

# Calculate the median value for the specified column
median_value = df_noNA.select(median(col(column_to_impute_2))).first()[0]

# Replace null values in the specified column with the median value
df_noNA = df_noNA.withColumn(column_to_impute_2, when(col(column_to_impute_2).isNull(), median_value).otherwise(col(column_to_impute_2)))

# Specify the column for which you want to replace null values with the median
column_to_impute_3 = "Employment_deprivation_(ratio)"

# Calculate the median value for the specified column
median_value = df_noNA.select(median(col(column_to_impute_3))).first()[0]

# Replace null values in the specified column with the median value
df_noNA = df_noNA.withColumn(column_to_impute_3, when(col(column_to_impute_3).isNull(), median_value).otherwise(col(column_to_impute_3)))


# Show the DataFrame after replacing null values with the median
df_noNA.show()



+------------+----------------------------------------------------------+---------------------------------------------------------+-----------------------------------------------------------+---------------------------------------------+-----------------------------------------------+--------------------------------------------+-----------------------------------------------+-------------------------------------------------+----------------------------------------------+-----------------------+-----------------------------+--------------------------+-------------+-------------------------------+--------------------------+---------------------------------+----------------------------------------------+------------------------+------------------------------+--------------------------+
|House_Prices|Travel_times_(minutes)_to_GP_surgeries_by_public_transport|Travel_times_(minutes)_to_post_office_by_public_transport|Travel_times_(minutes)_to_retail_centre_by_public_transport|Travel_times_(m

In [12]:
#checking for missing values after median imputation
spark = SparkSession.builder.appName("CountMissingValues").getOrCreate()
missing_value_counts_na = df_noNA.agg(*[count(col(c)).alias(c) for c in df_noNA.columns])

# Show the missing value counts
missing_value_counts_na.show()

# Sum the counts to get the total count of missing values in the DataFrame
total_missing_values_na = sum(missing_value_counts_na.collect()[0])

print("Total missing values in the DataFrame:", total_missing_values_na)


+------------+----------------------------------------------------------+---------------------------------------------------------+-----------------------------------------------------------+---------------------------------------------+-----------------------------------------------+--------------------------------------------+-----------------------------------------------+-------------------------------------------------+----------------------------------------------+-----------------------+-----------------------------+--------------------------+-------------+-------------------------------+--------------------------+---------------------------------+----------------------------------------------+------------------------+------------------------------+--------------------------+
|House_Prices|Travel_times_(minutes)_to_GP_surgeries_by_public_transport|Travel_times_(minutes)_to_post_office_by_public_transport|Travel_times_(minutes)_to_retail_centre_by_public_transport|Travel_times_(m

EDA using unsupervised machine learning

In [14]:
df_clean=df_noNA

In [15]:
df_clean.printSchema()

root
 |-- House_Prices: integer (nullable = true)
 |-- Travel_times_(minutes)_to_GP_surgeries_by_public_transport: double (nullable = true)
 |-- Travel_times_(minutes)_to_post_office_by_public_transport: double (nullable = true)
 |-- Travel_times_(minutes)_to_retail_centre_by_public_transport: double (nullable = true)
 |-- Travel_times_(minutes)_to_GP_surgeries_by_car: double (nullable = true)
 |-- Travel_times_(minutes)_to_petrol_station_by_car: double (nullable = true)
 |-- Travel_times_(minutes)_to_post_office_by_car: double (nullable = true)
 |-- Travel_times_(minutes)_to_primary_school_by_car: double (nullable = true)
 |-- Travel_times_(minutes)_to_secondary_school_by_car: double (nullable = true)
 |-- Travel_times_(minutes)_to_retail_centre_by_car: double (nullable = true)
 |-- Land_area_(in_hectares): double (nullable = true)
 |-- Dwellings_per_hectare_(ratio): double (nullable = true)
 |-- Detached_dwellings_(ratio): double (nullable = true)
 |-- Flats_(ratio): double (nullable

In [16]:
#correlation for features against target variable
from pyspark.sql.functions import col, corr

# Create a SparkSession (the entry point to PySpark)
spark = SparkSession.builder.appName("Correlation").getOrCreate()

# Select the target variable (column) for which you want to calculate the correlation
target_variable = "House_Prices"

# Calculate the correlation between the target variable and all other features
correlation_df = df_clean.select([corr(target_variable, feature).alias(feature) for feature in df_clean.columns if feature != target_variable])

# Show the correlation DataFrame
correlation_df.show()

+----------------------------------------------------------+---------------------------------------------------------+-----------------------------------------------------------+---------------------------------------------+-----------------------------------------------+--------------------------------------------+-----------------------------------------------+-------------------------------------------------+----------------------------------------------+-----------------------+-----------------------------+--------------------------+-------------------+-------------------------------+--------------------------+---------------------------------+----------------------------------------------+------------------------+------------------------------+--------------------------+
|Travel_times_(minutes)_to_GP_surgeries_by_public_transport|Travel_times_(minutes)_to_post_office_by_public_transport|Travel_times_(minutes)_to_retail_centre_by_public_transport|Travel_times_(minutes)_to_GP_surger

In [64]:
df_clean.printSchema()

root
 |-- House_Prices: integer (nullable = true)
 |-- Travel_times_(minutes)_to_GP_surgeries_by_public_transport: double (nullable = true)
 |-- Travel_times_(minutes)_to_post_office_by_public_transport: double (nullable = true)
 |-- Travel_times_(minutes)_to_retail_centre_by_public_transport: double (nullable = true)
 |-- Travel_times_(minutes)_to_GP_surgeries_by_car: double (nullable = true)
 |-- Travel_times_(minutes)_to_petrol_station_by_car: double (nullable = true)
 |-- Travel_times_(minutes)_to_post_office_by_car: double (nullable = true)
 |-- Travel_times_(minutes)_to_primary_school_by_car: double (nullable = true)
 |-- Travel_times_(minutes)_to_secondary_school_by_car: double (nullable = true)
 |-- Travel_times_(minutes)_to_retail_centre_by_car: double (nullable = true)
 |-- Land_area_(in_hectares): double (nullable = true)
 |-- Dwellings_per_hectare_(ratio): double (nullable = true)
 |-- Detached_dwellings_(ratio): double (nullable = true)
 |-- Flats_(ratio): double (nullable

In [17]:
#calculating multicollinearity through VIF to recuce redundancy
from pyspark.sql.functions import col, lit, sum as sum_func
from pyspark.ml.regression import LinearRegression
from pyspark.ml.linalg import DenseVector
from pyspark.ml.feature import VectorAssembler

# Create a SparkSession (the entry point to PySpark)
spark = SparkSession.builder.appName("VIF").getOrCreate()


# List of predictor columns for the linear regression model
predictor_columns = ['Detached_dwellings_(ratio)', 'Travel_times_(minutes)_to_post_office_by_public_transport',
                     'Travel_times_(minutes)_to_post_office_by_car', 'Travel_times_(minutes)_to_GP_surgeries_by_public_transport',
                     'Crime_indicators_(ratio)', 'Flats_(ratio)', 'Terraced_dwellings_(ratio)',
                     'Households_with_single_adult_discounts_(ratio)', 'Employment_deprivation_(ratio)',
                     'Comparative_Illness_Factor','Dwellings_per_hectare_(ratio)']

# Assemble features into a single vector column 'features'
assembler = VectorAssembler(inputCols=predictor_columns, outputCol='features')
data = assembler.transform(df_clean).select(col('House_Prices').alias('target'), 'features')

# Fit a linear regression model
lr = LinearRegression(featuresCol='features', labelCol='target')
model = lr.fit(data)

# Function to calculate R-squared for a given feature (predictor)
def calculate_r_squared(feature_name):
    # Remove the corresponding feature from the features column
    data_without_feature = data.select(col('target').alias('target_tmp'), *(col for col in data.columns if col != 'features' or col != feature_name))

    # Fit a linear regression model without the selected feature
    lr_without_feature = LinearRegression(featuresCol='features', labelCol='target_tmp')
    model_without_feature = lr_without_feature.fit(data_without_feature)

    # Calculate R-squared for the model without the selected feature
    predictions_without_feature = model_without_feature.transform(data_without_feature)
    sum_sq_residuals_without_feature = predictions_without_feature.withColumn('residual', col('target_tmp') - col('prediction')) \
                                                                 .select(sum_func(col('residual')**2).alias('sum_sq_residuals_without_feature')) \
                                                                 .collect()[0]['sum_sq_residuals_without_feature']
    sum_sq_total_without_feature = predictions_without_feature.withColumn('residual', col('target_tmp') - col('prediction')) \
                                                               .select(sum_func(col('target_tmp')**2).alias('sum_sq_total_without_feature')) \
                                                               .collect()[0]['sum_sq_total_without_feature']
    r_squared_without_feature = 1.0 - sum_sq_residuals_without_feature / sum_sq_total_without_feature
    return r_squared_without_feature

# Calculate VIF for each predictor (feature)
vif_values = {}
for feature in predictor_columns:
    r_squared_without_feature = calculate_r_squared(feature)
    vif_values[feature] = 1.0 / (1.0 - r_squared_without_feature)

# Print VIF values
print("Variance Inflation Factor (VIF) values:")
for feature, vif_value in vif_values.items():
    print(f"{feature}: {vif_value}")

Variance Inflation Factor (VIF) values:
Detached_dwellings_(ratio): 7.120578598060252
Travel_times_(minutes)_to_post_office_by_public_transport: 7.120578598060252
Travel_times_(minutes)_to_post_office_by_car: 7.120578598060252
Travel_times_(minutes)_to_GP_surgeries_by_public_transport: 7.120578598060252
Crime_indicators_(ratio): 7.120578598060252
Flats_(ratio): 7.120578598060252
Terraced_dwellings_(ratio): 7.120578598060252
Households_with_single_adult_discounts_(ratio): 7.120578598060252
Employment_deprivation_(ratio): 7.120578598060252
Comparative_Illness_Factor: 7.120578598060252
Dwellings_per_hectare_(ratio): 7.120578598060252


In [19]:
final_dataset = df.select("House_Prices", "Detached_dwellings_(ratio)", "Travel_times_(minutes)_to_post_office_by_car","Travel_times_(minutes)_to_GP_surgeries_by_public_transport","Crime_indicators_(ratio)","Dwellings_per_hectare_(ratio)","Flats_(ratio)","Households_with_single_adult_discounts_(ratio)","Employment_deprivation_(ratio)")

Supervised Learning

In [None]:
#reading the data
df = spark.read.csv('/content/drive/My\ Drive/DDA/model_mix_data.csv', header = True, inferSchema = True)

In [None]:
print(df.dtypes)

[('Price', 'int'), ('Detach_House', 'double'), ('Car_Time', 'double'), ('Public_Trans_Time', 'double'), ('Crime', 'int'), ('Density', 'double'), ('Flats', 'double'), ('Discount', 'int'), ('Emp_Deprivation', 'int'), ('Urban_Rural_Classification', 'int')]


In [None]:
df.printSchema()

root
 |-- Price: integer (nullable = true)
 |-- Detach_House: double (nullable = true)
 |-- Car_Time: double (nullable = true)
 |-- Public_Trans_Time: double (nullable = true)
 |-- Crime: integer (nullable = true)
 |-- Density: double (nullable = true)
 |-- Flats: double (nullable = true)
 |-- Discount: integer (nullable = true)
 |-- Emp_Deprivation: integer (nullable = true)
 |-- Urban_Rural_Classification: integer (nullable = true)



In [None]:
import pandas as pd
pd.DataFrame(df.take(5), columns = df.columns)

Unnamed: 0,Price,Detach_House,Car_Time,Public_Trans_Time,Crime,Density,Flats,Discount,Emp_Deprivation,Urban_Rural_Classification
0,215003,26.2,1.5,8.4,89,1.05,58.4,172,7,3
1,284539,23.0,2.7,8.3,48,16.51,19.5,120,5,3
2,185767,13.5,2.1,7.9,58,12.88,72.1,132,3,3
3,178700,9.2,2.0,7.4,204,29.3,30.9,128,8,3
4,195236,12.2,1.7,5.1,178,17.77,43.1,115,7,3


In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# instantiate a StringIndexer to convert the "number" column to categorical values
indexer = StringIndexer(inputCol="Urban_Rural_Classification", outputCol="number_cat")

# fit the transformer to the data
indexed = indexer.fit(df).transform(df)

# instantiate a OneHotEncoder to convert the "number_cat" column to binary vectors
encoder = OneHotEncoder(inputCol="number_cat", outputCol="Urban_Rural_Classification_vec")

# fit the transformer to the data
df = encoder.fit(indexed).transform(indexed)

# show the resulting DataFrame
df.show()

In [None]:
df = df.drop("number_cat", "Urban_Rural_Classification")
df.show()

In [None]:
#creating feature columns that have the independent variables

feature_cols = [col for col in df.columns if col != "Price"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)

In [None]:
#looking at the idependent variables
print(feature_cols)

['Detach_House', 'Car_Time', 'Public_Trans_Time', 'Crime', 'Density', 'Flats', 'Discount', 'Emp_Deprivation', 'Urban_Rural_Classification_vec']


In [None]:
#looking at the dataset with the independent variables converted to features
df.show()

In [None]:
print(feature_cols)
print(assembler)

['Detach_House', 'Car_Time', 'Public_Trans_Time', 'Crime', 'Density', 'Flats', 'Discount', 'Emp_Deprivation', 'Urban_Rural_Classification_vec']
VectorAssembler_6d163abae960


In [None]:
#independent variables  indexing
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(df)

In [None]:
#creating a train and test dataset
(trainingData, testData) = df.randomSplit([0.7, 0.3])

In [None]:
#Training the decision tree model
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")

In [None]:
pipeline = Pipeline(stages=[featureIndexer, dt])

In [None]:
dt.setLabelCol("Price")

DecisionTreeRegressor_e13b5613b90a

In [None]:
model = pipeline.fit(trainingData)

In [None]:
# Make predictions.
predictions_dt = model.transform(testData)

In [None]:
# Select example rows to display.
predictions_dt.select("prediction", "Price", "features").show(5)

+-----------------+-----+--------------------+
|       prediction|Price|            features|
+-----------------+-----+--------------------+
|77988.89003436426|20604|[0.3,3.2,11.7,880...|
|88255.04526748971|29536|[0.7,2.7,9.8,607....|
|88255.04526748971|30730|[0.4,1.0,4.1,613....|
|88255.04526748971|34918|[0.0,1.3,4.0,847....|
|77988.89003436426|35500|[1.4,0.9,5.1,1617...|
+-----------------+-----+--------------------+
only showing top 5 rows



In [None]:
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="Price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions_dt)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 73044.4


In [None]:
evaluator_r2 = RegressionEvaluator(labelCol="Price", predictionCol="prediction", metricName="r2")
r2_dt = evaluator_r2.evaluate(predictions_dt)
print("R-squared (coefficient of determination) decision tree = %f" % r2_dt)

R-squared (coefficient of determination) decision tree = 0.429026


In [None]:
evaluator_mae = RegressionEvaluator(labelCol="Price", predictionCol="prediction", metricName="mae")
mae_dt = evaluator_mae.evaluate(predictions_dt)
print("MAE Decision Tree = %f" % mae_dt)

MAE Decision Tree = 44030.929875


In [None]:
treeModel = model.stages[1]

In [None]:
# summary only
print(treeModel)

DecisionTreeRegressionModel: uid=DecisionTreeRegressor_e13b5613b90a, depth=5, numNodes=61, numFeatures=13


In [None]:
---------------------------------

In [None]:
#RANDOM FOREST

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
# Load and parse the data file, converting it to a DataFrame.

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer_rf =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(df)

# Split the data into training and test sets (30% held out for testing)
(trainingData_rf, testData_rf) = df.randomSplit([0.7, 0.3])

In [None]:
# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="indexedFeatures")

In [None]:
# Chain indexer and forest in a Pipeline
pipeline_rf = Pipeline(stages=[featureIndexer, rf])

In [None]:
rf.setLabelCol("Price")

RandomForestRegressor_2d0035ce4da2

In [None]:
# Train model.  This also runs the indexer.
model_rf = pipeline_rf.fit(trainingData_rf)

In [None]:
# Make predictions.
predictions_rf = model_rf.transform(testData_rf)

In [None]:
print(predictions_rf)

In [None]:
# Select example rows to display.
predictions_rf.select("prediction", "Price", "features").show(5)

+------------------+-----+--------------------+
|        prediction|Price|            features|
+------------------+-----+--------------------+
| 91144.13480819222|24111|[5.6,2.0,10.7,149...|
|103697.94627334944|30730|[0.4,1.0,4.1,613....|
| 88331.01771039361|31092|[0.2,1.9,14.8,489...|
| 82641.40991521737|31850|[0.8,2.1,10.8,111...|
| 94865.85974449184|34120|[0.3,2.4,20.5,270...|
+------------------+-----+--------------------+
only showing top 5 rows



In [None]:
# Select (prediction, true label) and compute test error
evaluator_rf = RegressionEvaluator(
    labelCol="Price", predictionCol="prediction", metricName="rmse")
rmse_rf = evaluator_rf.evaluate(predictions_rf)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse_rf)

Root Mean Squared Error (RMSE) on test data = 65778.8


In [None]:
evaluator_r2 = RegressionEvaluator(labelCol="Price", predictionCol="prediction", metricName="r2")
r2_rf = evaluator_r2.evaluate(predictions_rf)
print("R-squared (coefficient of determination) Random Forest = %f" % r2_rf)

R-squared (coefficient of determination) Random Forest = 0.488604


In [None]:
evaluator_mae = RegressionEvaluator(labelCol="Price", predictionCol="prediction", metricName="mae")
mae_rf = evaluator_mae.evaluate(predictions_rf)
print("MAE Random Forest = %f" % mae_rf)


MAE Random Forest = 44149.467546


In [28]:
# Make predictions on the testing data
predictions = model.transform(testData_rf)

# Show sample predictions
predictions.select('Price', 'prediction').show()

# Evaluate the model (for regression problems)
evaluator = RegressionEvaluator(labelCol='Price', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

NameError: ignored