In [None]:
!pip install pyspark
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("HousingPricePrediction") \
    .getOrCreate()

try:
    # Load the housing data from CSV
    df = spark.read.csv("housing.csv", header=True, inferSchema=True)

    # Handle categorical variable: ocean_proximity
    indexer = StringIndexer(inputCol="ocean_proximity", outputCol="ocean_proximity_index")
    encoder = OneHotEncoder(inputCol="ocean_proximity_index", outputCol="ocean_proximity_encoded")

    # Assemble features
    feature_columns = ["longitude", "latitude", "housing_median_age", "total_rooms",
                       "total_bedrooms", "population", "households", "median_income",
                       "ocean_proximity_encoded"]

    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features", handleInvalid="skip")

    # Define the pipeline
    pipeline = Pipeline(stages=[indexer, encoder, assembler])

    # Fit the pipeline to the data
    pipeline_model = pipeline.fit(df)
    df_transformed = pipeline_model.transform(df)

    # Select relevant columns for training
    final_data = df_transformed.select("features", "median_house_value")

    # Split the data into training and test sets
    train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=42)

    # Train a Linear Regression model
    from pyspark.ml.regression import LinearRegression
    lr = LinearRegression(featuresCol="features", labelCol="median_house_value")
    lr_model = lr.fit(train_data)

    # Make predictions on the test data
    predictions = lr_model.transform(test_data)
    predictions.select("features", "median_house_value", "prediction").show(5)

    # Evaluate the model
    from pyspark.ml.evaluation import RegressionEvaluator
    evaluator = RegressionEvaluator(labelCol="median_house_value", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    print(f"Root Mean Squared Error (RMSE): {rmse}")

except Exception as e:
    print(f"An error occurred: {e}")

finally:
    # Stop the Spark session
    spark.stop()