In [None]:
!pip install pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler, OneHotEncoder, StringIndexer
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor, DecisionTreeRegressor, GeneralizedLinearRegression, IsotonicRegression, AFTSurvivalRegression, FMRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import numpy as np


In [None]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [None]:
# Create SparkSession
spark = SparkSession.builder.master('local').appName("HousePricePrediction").getOrCreate()

In [None]:
# Load data from CSV file
data = spark.read.csv("housing.csv", header=True, inferSchema=True)

In [None]:
# Step 3: Analyze the data
data.printSchema()
data.show(5)
print("Total number of rows:", data.count())

for col in ["ocean_proximity"]:
    print(f"Unique values in {col}:")
    data.select(col).distinct().show()

for col in data.columns:
    print(f"NA rate in {col}: {data.filter(data[col].isNull()).count() / data.count()}")

In [None]:
# Do one-hot encoding for categorical columns
indexer = StringIndexer(inputCol="ocean_proximity", outputCol="ocean_proximity_index")
data = indexer.fit(data).transform(data)
encoder = OneHotEncoder(inputCol="ocean_proximity_index", outputCol="ocean_proximity_encoded")
encModel = encoder.fit(data)
data = encModel.transform(data)


data.show(5)
print("Total number of rows:", data.count())

In [None]:
# Handle missing values in total_bedrooms column
data = data.fillna(data.approxQuantile("total_bedrooms", [0.5], 0.001)[0], subset=["total_bedrooms"])

In [None]:
def rfe_feature_selection(data, features, target_col, num_features, estimator):
    # Initialize remaining features and selected features
    remaining_features = features
    selected_features = []

    while len(selected_features) < num_features:
        # Create VectorAssembler for remaining features
        assembler = VectorAssembler(inputCols=remaining_features, outputCol="features")
        data_assembled = assembler.transform(data)

        # Train Random Forest model
        rf = estimator(labelCol=target_col, featuresCol="features")
        model = rf.fit(data_assembled)

        # Get feature importances
        importances = model.featureImportances

        # Convert SparseVector to dense representation
        importances_dense = importances.toArray()


        # Find the least important feature
        least_important_feature_index = np.argmin(importances_dense)

        least_important_feature = remaining_features[least_important_feature_index]


        # Remove least important feature from remaining features
        remaining_features.remove(least_important_feature)

        # Add least important feature to selected features
        selected_features.append(least_important_feature)

    # Select final features
    return data.select(selected_features + [target_col])


In [None]:
# Select reamining features except target column
features = [col for col in data.columns if col != "ocean_proximity_encoded" and col != "ocean_proximity"]

# Eleminate not necassary features
num_features = 8  # Smaller Values giving very bad results
data_filtered = rfe_feature_selection(data, features, "median_house_value", num_features, RandomForestRegressor)

In [None]:
print(data_filtered.columns)

In [None]:
# Select reamining features except target column
features = [col for col in data_filtered.columns if col != "median_house_value"]
if "ocean_proximity_index" in features:
    features.remove("ocean_proximity_index")
    features.append("ocean_proximity_encoded")

# Create a VectorAssembler to combine features into a single vector
assembler = VectorAssembler(inputCols=features, outputCol="features")
data = assembler.transform(data)

# Scale features
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
data = scaler.fit(data).transform(data)

# Split data into training and testing sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

In [None]:
# Create a Regression models
models = [
    ("Linear Regression", LinearRegression(labelCol="median_house_value", featuresCol="scaledFeatures")),
    ("Random Forest", RandomForestRegressor(labelCol="median_house_value", featuresCol="scaledFeatures")),
    ("Gradient-Boosted Tree", GBTRegressor(labelCol="median_house_value", featuresCol="scaledFeatures")),
    ("Decision Tree", DecisionTreeRegressor(labelCol="median_house_value", featuresCol="scaledFeatures")),
    ("Generalized Linear Regression", GeneralizedLinearRegression(labelCol="median_house_value", featuresCol="scaledFeatures")),
    ("Factorization Machines", FMRegressor(labelCol="median_house_value", featuresCol="scaledFeatures"))
]


In [None]:
evaluator = RegressionEvaluator(labelCol="median_house_value", predictionCol="prediction")

for name, model in models:
    model = model.fit(train_data)
    predictions = model.transform(test_data)
    mse = evaluator.evaluate(predictions, {evaluator.metricName: "mse"})
    rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
    r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
    print(f"Model: {name}")
    print(f"    MSE: {mse}, RMSE: {rmse}, R-squared: {r2}")

In [None]:
# Train the best model
best_model = GBTRegressor(labelCol="median_house_value", featuresCol="scaledFeatures")
best_model = best_model.fit(train_data)


In [None]:
# Evaluate best model
evaluator = RegressionEvaluator(labelCol="median_house_value", predictionCol="prediction")
predictions = best_model.transform(test_data)
mse = evaluator.evaluate(predictions, {evaluator.metricName: "mse"})
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
explained_variance = evaluator.evaluate(predictions, {evaluator.metricName: "var"})


# Print results
print(f"Best Model: Gradient-Boosted Tree")
print(f"    MSE: {mse}, RMSE: {rmse}, MAE: {mae}, R-squared: {r2}, Explained Variance: {explained_variance}")

In [None]:
# Train the best model on the entire dataset
best_model = GBTRegressor(labelCol="median_house_value", featuresCol="scaledFeatures")
best_model = best_model.fit(data)

# Save the model
best_model.save("best_model")

In [None]:
# Zip the model
import shutil
shutil.make_archive("best_model", 'zip', "best_model")


In [None]:
# Stop SparkSession
spark.stop()