In [34]:
!pip install pyspark




In [1]:
import pyspark
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder \
    .appName("House Price Prediction") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()
spark

24/06/30 17:40:18 WARN Utils: Your hostname, Sujips-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.18.6 instead (on interface en0)
24/06/30 17:40:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/30 17:40:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [24]:
# Replace 'train.csv' and 'test.csv' with the actual paths to your datasets
train_data = spark.read.csv('housing-house-prediction/train_set.csv', header=True, inferSchema=True)
test_data = spark.read.csv('housing-house-prediction/test_set.csv', header=True, inferSchema=True)
train_data=train_data.drop('index','Lattitude','Longtitude')
test_data=test_data.drop('index','Lattitude','Longtitude')

# Show the first few rows of the training data
train_data.show()

+-----------------+-----------------+-----+----+---------+------+-------------+----------+--------+--------+--------+--------+---+--------+------------+---------+-------------+--------------------+-------------+
|           Suburb|          Address|Rooms|Type|    Price|Method|      SellerG|      Date|Distance|Postcode|Bedroom2|Bathroom|Car|Landsize|BuildingArea|YearBuilt|  CouncilArea|          Regionname|Propertycount|
+-----------------+-----------------+-----+----+---------+------+-------------+----------+--------+--------+--------+--------+---+--------+------------+---------+-------------+--------------------+-------------+
|       Aberfeldie|   241 Buckley St|    4|   h|1380000.0|    VB|       Nelson|12/08/2017|     7.5|  3040.0|     4.0|     2.0|2.0|   766.0|        NULL|     NULL|Moonee Valley|Western Metropolitan|       1543.0|
|        Northcote|    67 Charles St|    2|   h|1100000.0|    SP|       Jellis|20/05/2017|     5.5|  3070.0|     2.0|     1.0|1.0|   189.0|        NULL|

In [25]:
from pyspark.sql.functions import col, isnan, when

# Fill missing values with the median for numerical columns
numerical_columns = [col for col, dtype in train_data.dtypes if dtype in ('int', 'double')]
for column in numerical_columns:
    median_value = train_data.approxQuantile(column, [0.5], 0.25)[0]
    train_data = train_data.fillna({column: median_value})

# Fill missing values with the median for numerical columns
numerical_columns = [col for col, dtype in test_data.dtypes if dtype in ('int', 'double')]
for column in numerical_columns:
    median_value = test_data.approxQuantile(column, [0.5], 0.25)[0]
    test_data = test_data.fillna({column: median_value})

# Fill missing values with the mode for categorical columns
categorical_columns = [col for col, dtype in train_data.dtypes if dtype == 'string']
for column in categorical_columns:
    mode_value = train_data.groupBy(column).count().orderBy('count', ascending=False).first()[0]
    if mode_value is not None:
        test_data = test_data.fillna(mode_value, subset=[column])

# Fill missing values with the mode for categorical columns
categorical_columns = [col for col, dtype in test_data.dtypes if dtype == 'string']
for column in categorical_columns:
    mode_value = test_data.groupBy(column).count().orderBy('count', ascending=False).first()[0]
    if mode_value is not None:
        test_data = test_data.fillna(mode_value, subset=[column])



In [9]:
train_data.printSchema()
print(f"Number of records: {train_data.count()}")
test_data.printSchema()
print(f"Number of records: {test_data.count()}")

root
 |-- Suburb: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Rooms: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Method: string (nullable = true)
 |-- SellerG: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Distance: double (nullable = true)
 |-- Postcode: double (nullable = true)
 |-- Bedroom2: double (nullable = true)
 |-- Bathroom: double (nullable = true)
 |-- Car: double (nullable = true)
 |-- Landsize: double (nullable = true)
 |-- BuildingArea: double (nullable = true)
 |-- YearBuilt: double (nullable = true)
 |-- CouncilArea: string (nullable = true)
 |-- Lattitude: double (nullable = true)
 |-- Longtitude: double (nullable = true)
 |-- Regionname: string (nullable = true)
 |-- Propertycount: double (nullable = true)

Number of records: 5432
root
 |-- Suburb: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Rooms: integer (nullable = true)
 |-- Type: strin

### EDA

In [5]:
region_wise = train_data.groupBy('Regionname','Price').count()
region_wise.show()

+--------------------+---------+-----+
|          Regionname|    Price|count|
+--------------------+---------+-----+
|Northern Metropol...| 825000.0|    3|
|South-Eastern Met...| 620000.0|    1|
|Southern Metropol...| 465000.0|    2|
|Western Metropolitan|1336000.0|    1|
|Eastern Metropolitan|1207000.0|    1|
|Northern Metropol...| 416000.0|    1|
|Northern Metropol...| 436000.0|    1|
|South-Eastern Met...|1255000.0|    1|
|    Eastern Victoria| 872000.0|    1|
|Western Metropolitan| 895000.0|    2|
|Eastern Metropolitan|1070000.0|    1|
|Northern Metropol...|1600000.0|    5|
|Northern Metropol...| 432500.0|    1|
|Northern Metropol...|1065000.0|    4|
|Northern Metropol...| 756000.0|    1|
|Southern Metropol...| 706000.0|    2|
|Northern Metropol...|2775000.0|    1|
|Northern Metropol...| 612000.0|    1|
|Southern Metropol...| 420000.0|    5|
|Southern Metropol...| 447000.0|    2|
+--------------------+---------+-----+
only showing top 20 rows



In [6]:
method_wise = train_data.groupBy('Method','Price').count()
method_wise.show()

+------+---------+-----+
|Method|    Price|count|
+------+---------+-----+
|     S| 645000.0|    6|
|     S| 695000.0|    8|
|     S| 875000.0|    9|
|     S|1152500.0|    1|
|     S| 445000.0|    4|
|     S|1260000.0|   12|
|     S|1036000.0|    1|
|     S|1715000.0|    3|
|     S| 672000.0|    1|
|     S| 525500.0|    3|
|    PI| 902000.0|    1|
|    PI| 655000.0|    2|
|    VB|2475000.0|    1|
|    SP| 638000.0|    1|
|     S| 456000.0|    1|
|     S|2635000.0|    1|
|    PI| 470000.0|    2|
|     S| 385000.0|    2|
|     S| 442000.0|    2|
|     S| 260000.0|    1|
+------+---------+-----+
only showing top 20 rows



### Check If any Column has null values

In [10]:
null_counts = {}
for col_name in train_data.columns:
    null_count = train_data.filter(col(col_name).isNull()).count()
    null_counts[col_name] = null_count

# Display the null counts
for col_name, null_count in null_counts.items():
    print(f"Column '{col_name}' has {null_count} null values.")

Column 'Suburb' has 0 null values.
Column 'Address' has 0 null values.
Column 'Rooms' has 0 null values.
Column 'Type' has 0 null values.
Column 'Price' has 0 null values.
Column 'Method' has 0 null values.
Column 'SellerG' has 0 null values.
Column 'Date' has 0 null values.
Column 'Distance' has 0 null values.
Column 'Postcode' has 0 null values.
Column 'Bedroom2' has 0 null values.
Column 'Bathroom' has 0 null values.
Column 'Car' has 25 null values.
Column 'Landsize' has 0 null values.
Column 'BuildingArea' has 2542 null values.
Column 'YearBuilt' has 2130 null values.
Column 'CouncilArea' has 553 null values.
Column 'Lattitude' has 0 null values.
Column 'Longtitude' has 0 null values.
Column 'Regionname' has 0 null values.
Column 'Propertycount' has 0 null values.


### Label Encodings

In [26]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, isnan, when
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
# Encode categorical variables
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid="keep") for column in categorical_columns]
encoders = [OneHotEncoder(inputCol=column+"_index", outputCol=column+"_encoded") for column in categorical_columns]

# Create a pipeline for indexing and encoding
pipeline = Pipeline(stages=indexers + encoders)
pipeline_model = pipeline.fit(train_data)
# Transform training and test data using the fitted pipeline
train_data_final = pipeline_model.transform(train_data)
test_data_final = pipeline_model.transform(train_data)
train_data_final


DataFrame[Suburb: string, Address: string, Rooms: int, Type: string, Price: double, Method: string, SellerG: string, Date: string, Distance: double, Postcode: double, Bedroom2: double, Bathroom: double, Car: double, Landsize: double, BuildingArea: double, YearBuilt: double, CouncilArea: string, Regionname: string, Propertycount: double, Suburb_index: double, Address_index: double, Type_index: double, Method_index: double, SellerG_index: double, Date_index: double, CouncilArea_index: double, Regionname_index: double, Suburb_encoded: vector, Address_encoded: vector, Type_encoded: vector, Method_encoded: vector, SellerG_encoded: vector, Date_encoded: vector, CouncilArea_encoded: vector, Regionname_encoded: vector]

In [27]:
# Drop original categorical columns
train_data = train_data.drop(*categorical_columns)
test_data = test_data.drop(*categorical_columns)
train_data.show(5)

+-----+---------+--------+--------+--------+--------+---+--------+------------+---------+-------------+
|Rooms|    Price|Distance|Postcode|Bedroom2|Bathroom|Car|Landsize|BuildingArea|YearBuilt|Propertycount|
+-----+---------+--------+--------+--------+--------+---+--------+------------+---------+-------------+
|    4|1380000.0|     7.5|  3040.0|     4.0|     2.0|2.0|   766.0|        92.0|   1941.0|       1543.0|
|    2|1100000.0|     5.5|  3070.0|     2.0|     1.0|1.0|   189.0|        92.0|   1941.0|      11364.0|
|    3|1480000.0|     9.2|  3104.0|     3.0|     1.0|4.0|   605.0|       116.0|   1950.0|       7809.0|
|    3|1055000.0|     5.2|  3056.0|     3.0|     1.0|1.0|   324.0|        92.0|   1930.0|      11918.0|
|    4|1000000.0|    13.8|  3107.0|     4.0|     3.0|2.0|   728.0|       164.0|   1970.0|       5420.0|
+-----+---------+--------+--------+--------+--------+---+--------+------------+---------+-------------+
only showing top 5 rows



### Identifying and remove outliers

In [28]:
from pyspark.sql.functions import expr, percentile_approx

def remove_outliers(df, numeric_columns):
    # Calculate Q1 and Q3
    quantiles = df.approxQuantile(numeric_columns, [0.25, 0.75], 0.05)
    
    # Create a list of conditions to filter out outliers
    conditions = []
    for i, column in enumerate(numeric_columns):
        Q1 = quantiles[i][0]
        Q3 = quantiles[i][1]
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        conditions.append((col(column) >= lower_bound) & (col(column) <= upper_bound))
    
    # Combine all conditions
    combined_condition = conditions[0]
    for condition in conditions[1:]:
        combined_condition = combined_condition & condition
    
    # Filter out outliers
    df_filtered = df.filter(combined_condition)
    return df_filtered

In [29]:
numeric_columns = [col_name for col_name, dtype in train_data.dtypes if dtype in ['int', 'double']]

train_data_filtered = remove_outliers(train_data_final, numeric_columns)
test_data_filtered = remove_outliers(test_data_final, numeric_columns)


In [30]:
train_data_filtered.count()

3452

### Selection Of Features

In [31]:

# Assemble features and scale them
assembler = VectorAssembler(inputCols=[col+"_encoded" for col in categorical_columns] + numerical_columns, outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

# Transform the data to have a features column
train_data_assembled = assembler.transform(train_data_filtered)
test_data_assembled = assembler.transform(test_data_filtered)

scaler_model = scaler.fit(train_data_assembled)

train_data_scaled = scaler_model.transform(train_data_assembled)
test_data_scaled = scaler_model.transform(test_data_assembled)


In [44]:

from pyspark.ml.feature import ChiSqSelector


# Select numerical columns only
df_numerical = train_data_filtered.select(numerical_columns + ['Price'])

# Assemble all numerical features into a single vector column
df_assembler = VectorAssembler(inputCols=numerical_columns, outputCol="features")

# Transform the data
assembled_df = df_assembler.transform(df_numerical)
# ChiSqSelector requires features and a label column
selector = ChiSqSelector(featuresCol='features',
                         outputCol="selected_features",
                         labelCol="Price",
                         selectorType="numTopFeatures",
                         numTopFeatures=10)

# Fit the selector
selector_model = selector.fit(assembled_df)

# Transform the data to select top features
selected_features_df = selector_model.transform(assembled_df)

# Print selected feature names
selected_feature_indices = selector_model.selectedFeatures
print(selected_feature_indices)
selected_feature_names = [
    numerical_columns[i] for i in selected_feature_indices
]
print(selected_feature_names)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
['Rooms', 'Distance', 'Postcode', 'Bedroom2', 'Bathroom', 'Car', 'Landsize', 'BuildingArea', 'YearBuilt', 'Propertycount']


In [45]:
from pyspark.ml.stat import ChiSquareTest
import pandas as pd
# Perform Chi-Square Test
chi_sq_results = ChiSquareTest.test(assembled_df, "features", "Price").head()

# Extract pValues and degreesOfFreedom
p_values = chi_sq_results.pValues
degrees_of_freedom = chi_sq_results.degreesOfFreedom
chi_sq_values = chi_sq_results.statistics

# Create a DataFrame with feature names and their chi-squared statistics
feature_importance_df = pd.DataFrame({
    'Feature': numerical_columns,
    'ChiSqStatistic': chi_sq_values,
    'PValue': p_values,
    'DegreesOfFreedom': degrees_of_freedom
})

# Sort the DataFrame by chi-squared statistic in descending order
feature_importance_df = feature_importance_df.sort_values(by='ChiSqStatistic', ascending=False)

# Export the DataFrame to CSV
feature_importance_df.to_csv('selected_features_importance.csv', index=False)


In [51]:
# ChiSqSelector requires features and a label column
selector = ChiSqSelector(featuresCol='features',
                         outputCol="selected_features",
                         labelCol="Price",
                         selectorType="numTopFeatures",
                         numTopFeatures=10)

# Fit the selector
selector_model = selector.fit(assembled_df)

# Transform the data to select top features
selected_features_df = selector_model.transform(assembled_df)

# Print selected feature names
selected_feature_indices = selector_model.selectedFeatures
selected_feature_names = [numerical_columns[i] for i in selected_feature_indices]
print("Selected Feature Names:", selected_feature_names)

# Extract selected features and their importance scores into a pandas DataFrame
selected_features_importance = pd.DataFrame({
    'Feature': selected_feature_names,
    'Importance Score': selector_model.scores
})

# Export the DataFrame to CSV
selected_features_importance.to_csv('selected_features_importance_heatmap.csv', index=False)

print("Exported selected features and their importances to 'selected_features_importance.csv'")

Selected Feature Names: ['Rooms', 'Distance', 'Postcode', 'Bedroom2', 'Bathroom', 'Car', 'Landsize', 'BuildingArea', 'YearBuilt', 'Propertycount']


AttributeError: 'ChiSqSelectorModel' object has no attribute 'scores'

### Now use LR Model

In [48]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
# Define the features and target
features = ['Landsize', 'Propertycount', 'BuildingArea', 'Distance', 'Postcode', 'YearBuilt', 'Rooms', 'Bedroom2', 'Car', 'Bathroom']
target = 'Price'

# Split into train and test sets
train_df, test_df = train_data_filtered.randomSplit([0.7, 0.3], seed=123)

# Assemble features into a single vector
assembler = VectorAssembler(inputCols=features, outputCol="features")

# Standardize features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)

# Pipeline for assembling and scaling
pipeline = Pipeline(stages=[assembler, scaler])

# Fit the pipeline on the training data
pipeline_model = pipeline.fit(train_df)

# Transform training and test data
train_transformed = pipeline_model.transform(train_df)
test_transformed = pipeline_model.transform(test_df)

# Select only the scaled features and target for training and test sets
X_train = train_transformed.select("scaled_features")
y_train = train_transformed.select(target)
X_test = test_transformed.select("scaled_features")
y_test = test_transformed.select(target)

# Show the transformed features
X_train.show()
y_train.show()


+--------------------+
|     scaled_features|
+--------------------+
|[-0.5230081471560...|
|[-0.8068421266191...|
|[-1.3134318874330...|
|[-0.6020505211837...|
|[-0.6559430489298...|
|[-0.7529495988729...|
|[-0.6128290267329...|
|[-0.8319919729006...|
|[-0.8679203247314...|
|[0.05543831731950...|
|[1.43867986280439...|
|[1.59317177567673...|
|[-0.3325878824528...|
|[1.11532469632740...|
|[1.10454619077817...|
|[-1.3134318874330...|
|[-0.3613305639175...|
|[-1.3134318874330...|
|[-0.6236075322821...|
|[0.03028847103795...|
+--------------------+
only showing top 20 rows

+---------+
|    Price|
+---------+
|1097000.0|
| 911000.0|
| 900000.0|
|1000000.0|
| 955000.0|
|1035000.0|
|1172500.0|
|1465000.0|
|1100000.0|
|1195000.0|
|1380000.0|
|1705000.0|
|1330000.0|
|1720000.0|
|1680000.0|
| 507000.0|
| 752000.0|
| 805000.0|
| 440000.0|
| 650000.0|
+---------+
only showing top 20 rows



In [49]:
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from sklearn.metrics import roc_curve, auc
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns

def plot_roc_curve(predictions, model_name):
    # Extract the probability and label columns
    prediction_pdf = predictions.select("probability", "Price").toPandas()
    prediction_pdf['probability'] = prediction_pdf['probability'].apply(lambda x: x[1])

    # Compute ROC curve and ROC area
    fpr, tpr, _ = roc_curve(prediction_pdf['Price'], prediction_pdf['probability'])
    roc_auc = auc(fpr, tpr)

    # Plotting using Seaborn
    sns.set(style="whitegrid")
    plt.figure(figsize=(10, 6))
    sns.lineplot(x=fpr, y=tpr, lw=2, label=f'{model_name} (AUC = {roc_auc:.2f})')
    plt.plot([0, 1], [0, 1], color='navy', linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic')
    plt.legend(loc="lower right")
    plt.show()

def get_best_model(models, model_names, train_data, test_data):
    best_model = None
    best_model_name = None
    best_r2 = 0.0

    # Evaluators for additional metrics
    mae_evaluator = RegressionEvaluator(labelCol='Price', predictionCol='prediction', metricName='mae')
    r2_evaluator = RegressionEvaluator(labelCol='Price', predictionCol='prediction', metricName='r2')
    mse_evaluator = RegressionEvaluator(labelCol='Price', predictionCol='prediction', metricName='mse')
    rmse_evaluator = RegressionEvaluator(labelCol='Price', predictionCol='prediction', metricName='rmse')

    # DataFrame to store results
    results = pd.DataFrame(columns=['Model', 'MAE', 'R2', 'MSE', 'RMSE'])
    # DataFrame to store results
    for model, model_name in zip(models, model_names):
        pipeline = Pipeline(stages= [assembler,selector,scaler, model])
        pipeline_model = pipeline.fit(train_data)
        predictions = pipeline_model.transform(test_data)

        mae = mae_evaluator.evaluate(predictions)
        r2 = r2_evaluator.evaluate(predictions)
        mse = mse_evaluator.evaluate(predictions)
        rmse = rmse_evaluator.evaluate(predictions)

        print(f"Metrics for {model_name}:")
        print(f"MAE: {mae}")
        print(f"R2: {r2}")
        print(f"MSE: {mse}")
        print(f"RMSE: {rmse}\n")
        # Add results to the DataFrame
        new_row = pd.DataFrame({'Model': [model_name],
                            'MAE': [mae],
                            'R2': [r2],
                            'MSE': [mse],
                            'RMSE': [rmse]})
        results = pd.concat([results, new_row], ignore_index=True)

        # Plot ROC curve
        # plot_roc_curve(predictions, model_name)

        if r2 > best_r2:
            best_model = pipeline_model
            best_model_name = model_name
            best_r2 = r2

    # print(f"Best model: {best_model_name} with ROC-AUC: {best_roc_auc}")

    # Save the results DataFrame to a CSV file
    results.to_csv('model_metrics.csv', index=False)

    return best_model, best_model_name, results

In [41]:
train_df

DataFrame[Suburb: string, Address: string, Rooms: int, Type: string, Price: double, Method: string, SellerG: string, Date: string, Distance: double, Postcode: double, Bedroom2: double, Bathroom: double, Car: double, Landsize: double, BuildingArea: double, YearBuilt: double, CouncilArea: string, Regionname: string, Propertycount: double, Suburb_index: double, Address_index: double, Type_index: double, Method_index: double, SellerG_index: double, Date_index: double, CouncilArea_index: double, Regionname_index: double, Suburb_encoded: vector, Address_encoded: vector, Type_encoded: vector, Method_encoded: vector, SellerG_encoded: vector, Date_encoded: vector, CouncilArea_encoded: vector, Regionname_encoded: vector]

In [50]:
# List of models to evaluate
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor

models = [
    LinearRegression(featuresCol="scaled_features", labelCol="Price",regParam=0.1),
    DecisionTreeRegressor(featuresCol="scaled_features", labelCol="Price"),
    RandomForestRegressor(featuresCol="scaled_features", labelCol="Price")
]
model_names = ['Linear Regression', 'Decision Tree Regression', 'Random Forest Regression']
# Get the best model
train_df_sampled = train_df.sample(withReplacement=False, fraction=0.34, seed=42)

best_model, best_model_name, results = get_best_model(models, model_names, train_df, test_df)


Metrics for Linear Regression:
MAE: 208663.83116561268
R2: 0.5401535270619431
MSE: 74319747284.15541
RMSE: 272616.4838819462



  results = pd.concat([results, new_row], ignore_index=True)


Metrics for Decision Tree Regression:
MAE: 191572.69439744233
R2: 0.5746793553753022
MSE: 68739730939.5033
RMSE: 262182.62898121856

Metrics for Random Forest Regression:
MAE: 190924.73979370054
R2: 0.5871825662883217
MSE: 66718979384.40486
RMSE: 258300.1730243417

