In [171]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import functions as F

In [150]:
# Initialize Spark session
spark = (
    SparkSession.builder.appName("MAST30034 Project 2")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

In [151]:
# Read the Parquet file into a Spark DataFrame
sdf = spark.read.parquet("../data/curated/prop_data_cleaned2.parquet")
sdf.printSchema()

root
 |-- name: string (nullable = true)
 |-- property_url: string (nullable = true)
 |-- beds: integer (nullable = true)
 |-- baths: integer (nullable = true)
 |-- parkings: integer (nullable = true)
 |-- cost: integer (nullable = true)
 |-- postal_code: integer (nullable = true)
 |-- built_in_wardrobes: integer (nullable = true)
 |-- dishwasher: integer (nullable = true)
 |-- air_conditioning: integer (nullable = true)
 |-- floorboards: integer (nullable = true)
 |-- secure_parking: integer (nullable = true)
 |-- intercom: integer (nullable = true)
 |-- heating: integer (nullable = true)
 |-- ensuite: integer (nullable = true)
 |-- balcony_deck: integer (nullable = true)
 |-- gym: integer (nullable = true)
 |-- furnished: integer (nullable = true)
 |-- close_to_shops: integer (nullable = true)
 |-- close_to_transport: integer (nullable = true)
 |-- fully_fenced: integer (nullable = true)
 |-- close_to_schools: integer (nullable = true)
 |-- ducted_heating: integer (nullable = true)
 

In [152]:
def shape(sdf: DataFrame) -> None:
    """
    Returns the shape of a Spark DataFrame as a tuple (number of rows, number of columns).
    """
    num_rows = sdf.count()
    num_columns = len(sdf.columns)
    print(f"Shape of the DataFrame: {num_rows} rows, {num_columns} columns.")

shape(sdf)

Shape of the DataFrame: 11131 rows, 40 columns.


In [153]:
sdf.columns

['name',
 'property_url',
 'beds',
 'baths',
 'parkings',
 'cost',
 'postal_code',
 'built_in_wardrobes',
 'dishwasher',
 'air_conditioning',
 'floorboards',
 'secure_parking',
 'intercom',
 'heating',
 'ensuite',
 'balcony_deck',
 'gym',
 'furnished',
 'close_to_shops',
 'close_to_transport',
 'fully_fenced',
 'close_to_schools',
 'ducted_heating',
 'split_system_heating',
 'swimming_pool',
 'remote_garage',
 'balcony',
 'study',
 'garden_courtyard',
 'pets_allowed',
 'internal_laundry',
 'alarm_system',
 'prop_type_index',
 'parks',
 'schools',
 'supermarkets',
 'hospitals',
 'shopping_districts',
 'CBD',
 'train_stations']

In [154]:
# Select relevant features for the model
feature_columns = [
    'beds', 'baths', 'parkings', 'postal_code', 'built_in_wardrobes', 'dishwasher', 
    'air_conditioning', 'floorboards', 'secure_parking', 'intercom', 
    'heating', 'ensuite', 'balcony_deck', 'gym', 'furnished', 
    'close_to_shops', 'close_to_transport', 'fully_fenced', 
    'close_to_schools', 'ducted_heating', 'split_system_heating', 
    'swimming_pool', 'remote_garage', 'balcony', 'study', 
    'garden_courtyard', 'pets_allowed', 'internal_laundry', 
    'alarm_system', 'prop_type_index', 'parks', 'schools', 
    'supermarkets', 'hospitals', 'shopping_districts', 'CBD', 
    'train_stations'
]

In [155]:
# feature_columns = [
#     'beds', 'baths', 'parkings', 'prop_type_index',
#     #'parks', 'schools', 'supermarkets', 'hospitals', 'shopping_districts', 'CBD', 'train_stations'
# ]

In [156]:
# Create a VectorAssembler to combine feature columns into a single vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')

# Transform the features into a single vector column
data = sdf.select('cost', *feature_columns)
data = assembler.transform(data)

In [157]:
# Create a StandardScaler instance
scaler = StandardScaler(inputCol='features', outputCol='scaled_features', withMean=True, withStd=True)

# Fit the scaler to the data
scaler_model = scaler.fit(data)

# Transform the data using the fitted scaler
data = scaler_model.transform(data)

In [158]:
# Split the data into training and test sets
train_data, test_data = data.randomSplit([0.7, 0.3], seed=1003)

In [159]:
# Initialize the Linear Regression model
lr = LinearRegression(featuresCol='scaled_features', labelCol='cost')

# Fit the model to the training data
lr_model = lr.fit(train_data)

24/10/01 22:04:15 WARN Instrumentation: [25d8309d] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

In [160]:
# Print model summary
print("Intercept: " + str(lr_model.intercept))
print("RMSE: " + str(lr_model.summary.rootMeanSquaredError))
print("r2: " + str(lr_model.summary.r2))

# Get the coefficients and feature names
coefficients = lr_model.coefficients.toArray()
feature_names = feature_columns

# Create a DataFrame from the coefficients and feature names
coefficients_df = pd.DataFrame({
    'feature': feature_names,
    'coefficient': coefficients
})

# Sort the DataFrame by the 'coefficient' column in decreasing order
coefficients_df = coefficients_df.sort_values(by='coefficient', ascending=False)

# Evaluate the model on the test data
test_results = lr_model.evaluate(test_data)

# Print evaluation metrics
print("Test RMSE: ", test_results.rootMeanSquaredError)
print("Test r2: ", test_results.r2)

coefficients_df

Intercept: 630.7332393275424
RMSE: 198.05758093661768
r2: 0.33948605525424447


Test RMSE:  197.65633293471066
Test r2:  0.32895131356082263


Unnamed: 0,feature,coefficient
1,baths,86.062369
0,beds,65.348668
21,swimming_pool,23.576499
14,furnished,22.116835
24,study,18.207476
12,balcony_deck,18.058106
5,dishwasher,16.457324
7,floorboards,14.978747
23,balcony,14.816769
25,garden_courtyard,12.382335


In [161]:
# Initialize the Lasso Regression model
lasso = LinearRegression(featuresCol='features', labelCol='cost', elasticNetParam=0.5, regParam=0.1)

# Fit the model to the data
lasso_model = lasso.fit(train_data)

In [162]:
# Print model summary
print("Intercept: " + str(lasso_model.intercept))
print("RMSE: " + str(lasso_model.summary.rootMeanSquaredError))
print("r2: " + str(lasso_model.summary.r2))

# Get the coefficients and feature names
lasso_coefficients = lasso_model.coefficients.toArray()

# Create a DataFrame from the coefficients and feature names
lasso_coefficients_df = pd.DataFrame({
    'feature': feature_names,
    'coefficient': lasso_coefficients
})

# Sort the DataFrame by the 'coefficient' column in decreasing order
lasso_coefficients_df = lasso_coefficients_df.sort_values(by='coefficient', ascending=False)

# Evaluate the model on the test data
lasso_test_results = lasso_model.evaluate(test_data)

# Print evaluation metrics
print("Lasso Test RMSE: ", lasso_test_results.rootMeanSquaredError)
print("Lasso Test r2: ", lasso_test_results.r2)

lasso_coefficients_df

Intercept: 429.3758445554193
RMSE: 198.05805703439836
r2: 0.33948287971711344
Lasso Test RMSE:  197.64621618773725
Lasso Test r2:  0.3290200050712705


Unnamed: 0,feature,coefficient
1,baths,138.673839
21,swimming_pool,100.346796
14,furnished,85.761789
24,study,82.508354
23,balcony,65.544155
12,balcony_deck,64.000577
0,beds,59.467489
25,garden_courtyard,56.142328
19,ducted_heating,50.51307
7,floorboards,42.083825


In [163]:
def backward_elimination(data: DataFrame, features: list, label: str, stop_threshold=0.1):
    features_to_keep = features.copy()
    
    while len(features_to_keep) > 0:
        print(f"Training model with {len(features_to_keep)} features.")
        
        # Assemble the feature vector for the current set of features
        assembler = VectorAssembler(inputCols=features_to_keep, outputCol="features")
        
        # Drop the existing 'features' column if it exists
        if 'features' in data.columns:
            data = data.drop('features')

        # Transform the data to create the new 'features' column
        data_assembled = assembler.transform(data).select("features", label)
        
        # Create a StandardScaler instance
        scaler = StandardScaler(inputCol='features', outputCol='scaled_features', withMean=True, withStd=True)
        
        # Fit the scaler to the data and transform it
        scaler_model = scaler.fit(data_assembled)
        data_scaled = scaler_model.transform(data_assembled)
        
        # Train the model on the scaled features
        lr = LinearRegression(featuresCol="scaled_features", labelCol=label)
        lr_model = lr.fit(data_scaled)
        
        # Get the coefficients and associated features
        coefficients = lr_model.coefficients
        coef_feature_pairs = list(zip(coefficients, features_to_keep))
        
        # Find the least significant feature (smallest coefficient magnitude)
        least_significant_feature = min(coef_feature_pairs, key=lambda x: abs(x[0]))[1]
        
        # Check the magnitude of the smallest coefficient using absolute value
        if abs(min(coef_feature_pairs, key=lambda x: abs(x[0]))[0]) < stop_threshold:
            print(f"Removing least significant feature: {least_significant_feature}")
            features_to_keep.remove(least_significant_feature)
        else:
            break
    
    print(f"Final set of features: {features_to_keep}")
    return features_to_keep

In [164]:
# Perform backward elimination
final_features = backward_elimination(data, feature_columns, 'cost', stop_threshold=8.5)

Training model with 37 features.


24/10/01 22:04:19 WARN Instrumentation: [bccb678a] regParam is zero, which might cause numerical instability and overfitting.


Removing least significant feature: hospitals
Training model with 36 features.


24/10/01 22:04:20 WARN Instrumentation: [4e1fcf2c] regParam is zero, which might cause numerical instability and overfitting.


Removing least significant feature: secure_parking
Training model with 35 features.


24/10/01 22:04:21 WARN Instrumentation: [7d7830f7] regParam is zero, which might cause numerical instability and overfitting.


Removing least significant feature: alarm_system
Training model with 34 features.


24/10/01 22:04:22 WARN Instrumentation: [4d8f5ec9] regParam is zero, which might cause numerical instability and overfitting.


Removing least significant feature: supermarkets
Training model with 33 features.


24/10/01 22:04:23 WARN Instrumentation: [21474e9a] regParam is zero, which might cause numerical instability and overfitting.


Removing least significant feature: shopping_districts
Training model with 32 features.


24/10/01 22:04:24 WARN Instrumentation: [ddbda10a] regParam is zero, which might cause numerical instability and overfitting.


Removing least significant feature: schools
Training model with 31 features.


24/10/01 22:04:26 WARN Instrumentation: [00a26e6e] regParam is zero, which might cause numerical instability and overfitting.


Removing least significant feature: CBD
Training model with 30 features.


24/10/01 22:04:27 WARN Instrumentation: [29e391ed] regParam is zero, which might cause numerical instability and overfitting.


Removing least significant feature: parks
Training model with 29 features.


24/10/01 22:04:28 WARN Instrumentation: [e7c4e9f3] regParam is zero, which might cause numerical instability and overfitting.


Removing least significant feature: parkings
Training model with 28 features.


24/10/01 22:04:29 WARN Instrumentation: [dff1b8c8] regParam is zero, which might cause numerical instability and overfitting.


Removing least significant feature: train_stations
Training model with 27 features.


24/10/01 22:04:30 WARN Instrumentation: [23f6d837] regParam is zero, which might cause numerical instability and overfitting.


Removing least significant feature: close_to_schools
Training model with 26 features.


24/10/01 22:04:31 WARN Instrumentation: [efedb051] regParam is zero, which might cause numerical instability and overfitting.


Removing least significant feature: heating
Training model with 25 features.


24/10/01 22:04:32 WARN Instrumentation: [caeef25f] regParam is zero, which might cause numerical instability and overfitting.


Removing least significant feature: gym
Training model with 24 features.


24/10/01 22:04:33 WARN Instrumentation: [561104ad] regParam is zero, which might cause numerical instability and overfitting.


Removing least significant feature: pets_allowed
Training model with 23 features.


24/10/01 22:04:34 WARN Instrumentation: [852e6824] regParam is zero, which might cause numerical instability and overfitting.


Removing least significant feature: split_system_heating
Training model with 22 features.


24/10/01 22:04:35 WARN Instrumentation: [d0a7feef] regParam is zero, which might cause numerical instability and overfitting.


Removing least significant feature: close_to_shops
Training model with 21 features.


24/10/01 22:04:36 WARN Instrumentation: [4dbcaaa3] regParam is zero, which might cause numerical instability and overfitting.


Removing least significant feature: close_to_transport
Training model with 20 features.


24/10/01 22:04:37 WARN Instrumentation: [c5df25ec] regParam is zero, which might cause numerical instability and overfitting.


Removing least significant feature: internal_laundry
Training model with 19 features.


24/10/01 22:04:38 WARN Instrumentation: [84a72a39] regParam is zero, which might cause numerical instability and overfitting.


Final set of features: ['beds', 'baths', 'postal_code', 'built_in_wardrobes', 'dishwasher', 'air_conditioning', 'floorboards', 'intercom', 'ensuite', 'balcony_deck', 'furnished', 'fully_fenced', 'ducted_heating', 'swimming_pool', 'remote_garage', 'balcony', 'study', 'garden_courtyard', 'prop_type_index']


In [165]:
# Create a VectorAssembler to combine feature columns into a single vector
assembler = VectorAssembler(inputCols=final_features, outputCol='features')

# Transform the features into a single vector column
data = sdf.select('cost', *feature_columns)
data = assembler.transform(data)

In [166]:
# Create a StandardScaler instance
scaler = StandardScaler(inputCol='features', outputCol='scaled_features', withMean=True, withStd=True)

# Fit the scaler to the data
scaler_model = scaler.fit(data)

# Transform the data using the fitted scaler
data = scaler_model.transform(data)

In [167]:
# Split the data into training and test sets
train_data, test_data = data.randomSplit([0.7, 0.3], seed=1003)

In [168]:
# Initialize the Linear Regression model
lr = LinearRegression(featuresCol='scaled_features', labelCol='cost')

# Fit the model to the training data
lr_model = lr.fit(train_data)

24/10/01 22:04:40 WARN Instrumentation: [c731d260] regParam is zero, which might cause numerical instability and overfitting.


In [169]:
# Print model summary
print("Intercept: " + str(lr_model.intercept))
print("RMSE: " + str(lr_model.summary.rootMeanSquaredError))
print("r2: " + str(lr_model.summary.r2))

# Get the coefficients and feature names
coefficients = lr_model.coefficients.toArray()
feature_names = final_features

# Create a DataFrame from the coefficients and feature names
coefficients_df = pd.DataFrame({
    'feature': feature_names,
    'coefficient': coefficients
})

# Sort the DataFrame by the 'coefficient' column in decreasing order
coefficients_df = coefficients_df.sort_values(by='coefficient', ascending=False)

# Evaluate the model on the test data
test_results = lr_model.evaluate(test_data)

# Print evaluation metrics
print("Test RMSE: ", test_results.rootMeanSquaredError)
print("Test r2: ", test_results.r2)

coefficients_df

Intercept: 630.6703919110146
RMSE: 198.2768635869822
r2: 0.33802264821596684
Test RMSE:  197.58438775770833
Test r2:  0.32943973636307733


Unnamed: 0,feature,coefficient
1,baths,86.141049
0,beds,64.821416
13,swimming_pool,25.593529
10,furnished,22.296458
16,study,18.268164
9,balcony_deck,17.010795
4,dishwasher,15.613591
15,balcony,15.363227
6,floorboards,14.407658
12,ducted_heating,13.298636


## TREE MODELS

In [172]:
# Create a VectorAssembler to combine feature columns into a single vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')

# Select the target variable (cost) and prepare the final DataFrame
data = sdf.select('cost', *feature_columns)

# Transform the features into a single vector column
data = assembler.transform(data)

# Create a MinMaxScaler instance
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')

# Fit the scaler to the data
scaler_model = scaler.fit(data)

# Transform the data using the fitted scaler
data = scaler_model.transform(data)

# Split the data into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=1003)

In [173]:
# Initialize the Gradient Boosting model
gbt = GBTRegressor(featuresCol='scaled_features', labelCol='cost', maxIter=100)

# Fit the model to the training data
gbt_model = gbt.fit(train_data)

                                                                                

In [None]:
# Print model summary
print("Gradient Boosting Model Summary")
print("Number of Trees: ", gbt_model.getNumTrees)

# Evaluate the model on the test data
gbt_predictions = gbt_model.transform(test_data)

# %%
# Create evaluators for RMSE, MAE, and R-squared
gbt_rmse = rmse_evaluator.evaluate(gbt_predictions)
gbt_mae = mae_evaluator.evaluate(gbt_predictions)
gbt_r2 = r2_evaluator.evaluate(gbt_predictions)

# Print evaluation metrics
print(f"Gradient Boosting Test RMSE: {gbt_rmse}, MAE: {gbt_mae}, R2: {gbt_r2}")

In [None]:
# Get feature importances
gbt_feature_importances = gbt_model.featureImportances

# Create a DataFrame from the feature importances
feature_importances_df = pd.DataFrame({
    'feature': final_features,
    'importance': gbt_feature_importances.toArray()
})

# Sort the DataFrame by the 'importance' column in decreasing order
feature_importances_df = feature_importances_df.sort_values(by='importance', ascending=False)