In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

In [2]:
# Create a SparkSession
spark = SparkSession.builder.appName("HousePricePrediction").getOrCreate()

# Load the dataset
file_path = '/kaggle/input/house-price-prediction-dataset-2000-rows/enhanced_house_price_dataset.csv'
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Display a sample of the data
df.show(5)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/12 16:21:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+----+--------+---------+-------+-------+---+-------+--------------+---------+----------+--------+------------+----------------+----------------+---------------+-------+
|Area|Bedrooms|Bathrooms|Stories|Parking|Age|   City|    Furnishing|Main Road|Guest Room|Basement|Water Supply|Air Conditioning|Preferred Tenant|Locality Rating|  Price|
+----+--------+---------+-------+-------+---+-------+--------------+---------+----------+--------+------------+----------------+----------------+---------------+-------+
|1260|       4|        3|      2|      1| 24|   Pune|Semi-Furnished|      Yes|       Yes|     Yes|        Both|              No|         Company|              4|1274350|
|5790|       2|        1|      1|      1|  7|Kolkata|   Unfurnished|      Yes|       Yes|     Yes|        Both|             Yes|        Bachelor|              5|1094846|
|5626|       5|        2|      3|      0| 15|Chennai|Semi-Furnished|       No|       Yes|     Yes| Corporation|             Yes|         Company|     

In [3]:
# Identify categorical and numerical columns
categorical_cols = ['Furnishing', 'Main Road', 'Guest Room', 'Basement', 'Water Supply', 'Air Conditioning', 'Preferred Tenant', 'City']
numerical_cols = ['Area', 'Bedrooms', 'Bathrooms', 'Stories', 'Parking', 'Age', 'Locality Rating']

# Create stages for the pipeline
stages = []
for categorical_col in categorical_cols:
    string_indexer = StringIndexer(inputCol=categorical_col, outputCol=categorical_col + "_index")
    encoder = OneHotEncoder(inputCols=[string_indexer.getOutputCol()], outputCols=[categorical_col + "_vec"])
    stages += [string_indexer, encoder]

In [4]:
# Assemble all features into a single vector
assembler_inputs = [c + "_vec" for c in categorical_cols] + numerical_cols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
stages += [assembler]

# Create a pipeline to execute the stages
pipeline = Pipeline(stages=stages)
pipeline_model = pipeline.fit(df)
preprocessed_df = pipeline_model.transform(df)

                                                                                

In [5]:
# Print the schema of the DataFrame
preprocessed_df.printSchema()

# Show descriptive statistics for numerical columns
df.describe().show()

root
 |-- Area: integer (nullable = true)
 |-- Bedrooms: integer (nullable = true)
 |-- Bathrooms: integer (nullable = true)
 |-- Stories: integer (nullable = true)
 |-- Parking: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- Furnishing: string (nullable = true)
 |-- Main Road: string (nullable = true)
 |-- Guest Room: string (nullable = true)
 |-- Basement: string (nullable = true)
 |-- Water Supply: string (nullable = true)
 |-- Air Conditioning: string (nullable = true)
 |-- Preferred Tenant: string (nullable = true)
 |-- Locality Rating: integer (nullable = true)
 |-- Price: integer (nullable = true)
 |-- Furnishing_index: double (nullable = false)
 |-- Furnishing_vec: vector (nullable = true)
 |-- Main Road_index: double (nullable = false)
 |-- Main Road_vec: vector (nullable = true)
 |-- Guest Room_index: double (nullable = false)
 |-- Guest Room_vec: vector (nullable = true)
 |-- Basement_index: double (nullable = false)
 |

25/11/12 16:22:50 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 27:>                                                         (0 + 1) / 1]

+-------+------------------+------------------+------------------+------------------+------------------+-----------------+---------+-----------+---------+----------+--------+------------+----------------+----------------+------------------+------------------+
|summary|              Area|          Bedrooms|         Bathrooms|           Stories|           Parking|              Age|     City| Furnishing|Main Road|Guest Room|Basement|Water Supply|Air Conditioning|Preferred Tenant|   Locality Rating|             Price|
+-------+------------------+------------------+------------------+------------------+------------------+-----------------+---------+-----------+---------+----------+--------+------------+----------------+----------------+------------------+------------------+
|  count|              2000|              2000|              2000|              2000|              2000|             2000|     2000|       2000|     2000|      2000|    2000|        2000|            2000|            2000

                                                                                

In [6]:
# Select the features and the target variable (Price)
final_df = preprocessed_df.select("features", col("Price").alias("label"))

# Split the data into training and testing sets
train_data, test_data = final_df.randomSplit([0.8, 0.2], seed=42)

In [7]:
# Initialize and train the Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_data)

# Make predictions on the test data
lr_predictions = lr_model.transform(test_data)

25/11/12 16:23:42 WARN Instrumentation: [654b1c7a] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

In [8]:
# Initialize and train the Random Forest Regressor model
rf = RandomForestRegressor(featuresCol="features", labelCol="label")
rf_model = rf.fit(train_data)

# Make predictions on the test data
rf_predictions = rf_model.transform(test_data)



In [9]:
# Create a RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction")

# Evaluate Linear Regression model
lr_r2 = evaluator.evaluate(lr_predictions, {evaluator.metricName: "r2"})
lr_mae = evaluator.evaluate(lr_predictions, {evaluator.metricName: "mae"})
lr_rmse = evaluator.evaluate(lr_predictions, {evaluator.metricName: "rmse"})

print("--- Linear Regression Performance ---")
print(f"R-squared (R²): {lr_r2}")
print(f"Mean Absolute Error (MAE): {lr_mae}")
print(f"Root Mean Squared Error (RMSE): {lr_rmse}")

# Evaluate Random Forest Regressor model
rf_r2 = evaluator.evaluate(rf_predictions, {evaluator.metricName: "r2"})
rf_mae = evaluator.evaluate(rf_predictions, {evaluator.metricName: "mae"})
rf_rmse = evaluator.evaluate(rf_predictions, {evaluator.metricName: "rmse"})

print("\n--- Random Forest Regressor Performance ---")
print(f"R-squared (R²): {rf_r2}")
print(f"Mean Absolute Error (MAE): {rf_mae}")
print(f"Root Mean Squared Error (RMSE): {rf_rmse}")

--- Linear Regression Performance ---
R-squared (R²): 0.7117462980143822
Mean Absolute Error (MAE): 125441.84932578642
Root Mean Squared Error (RMSE): 156479.1717209315

--- Random Forest Regressor Performance ---
R-squared (R²): 0.544180278926269
Mean Absolute Error (MAE): 153723.1515679156
Root Mean Squared Error (RMSE): 196773.06363383282


In [10]:
# Calculate and show residuals for the Random Forest model
rf_predictions.withColumn("residual", col("label") - col("prediction")).select("label", "prediction", "residual").show()

+-------+------------------+------------------+
|  label|        prediction|          residual|
+-------+------------------+------------------+
|1604260| 1432755.071253799| 171504.9287462011|
|1026618|1107869.1308457223|-81251.13084572228|
|1001727|  935757.399965192| 65969.60003480804|
|1226714|1077096.2874932545|149617.71250674548|
|1239949| 1287738.770244731| -47789.7702447311|
|1557948|1407835.7344790925|150112.26552090747|
|1297439| 1139493.844337625|157945.15566237504|
|1278489|1084751.7349093694|193737.26509063062|
|1234383|1437836.2190588296|-203453.2190588296|
|1174407|1265819.1273857807|-91412.12738578068|
|1476708|1421275.5694599722|55432.430540027795|
|1500049|1404364.0174827217| 95684.98251727829|
|1262995|1235813.1175261207| 27181.88247387926|
|1677048| 1506245.315289795|170802.68471020507|
|1611259|1420661.5616520955|190597.43834790448|
|1489618|1432648.5523198931| 56969.44768010685|
|1468172|1444539.8669988469|23632.133001153124|
|1143907|1384355.6981100184|-240448.6981

In [11]:
# Add the prediction column to the original DataFrame
predictions_with_original_data = rf_model.transform(preprocessed_df)

# Select the original columns and the predicted price
output_df = predictions_with_original_data.select(
    'Area', 'Bedrooms', 'Bathrooms', 'Stories', 'Parking', 'Age', 'City', 'Furnishing',
    'Main Road', 'Guest Room', 'Basement', 'Water Supply', 'Air Conditioning',
    'Preferred Tenant', 'Locality Rating', 'Price', col("prediction").alias("Predicted_Price")
)

# Save the results to a CSV file
output_df.write.csv("house_price_predictions.csv", header=True, mode="overwrite")