In [35]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator


In [36]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("HousingPrediction").getOrCreate()

train_df = spark.read.csv("train.csv", header=True, inferSchema=True)
test_df = spark.read.csv("test.csv", header=True, inferSchema=True)


In [37]:
train_df.printSchema()
test_df.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- MSSubClass: integer (nullable = true)
 |-- MSZoning: string (nullable = true)
 |-- LotFrontage: string (nullable = true)
 |-- LotArea: integer (nullable = true)
 |-- Street: string (nullable = true)
 |-- Alley: string (nullable = true)
 |-- LotShape: string (nullable = true)
 |-- LandContour: string (nullable = true)
 |-- Utilities: string (nullable = true)
 |-- LotConfig: string (nullable = true)
 |-- LandSlope: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Condition1: string (nullable = true)
 |-- Condition2: string (nullable = true)
 |-- BldgType: string (nullable = true)
 |-- HouseStyle: string (nullable = true)
 |-- OverallQual: integer (nullable = true)
 |-- OverallCond: integer (nullable = true)
 |-- YearBuilt: integer (nullable = true)
 |-- YearRemodAdd: integer (nullable = true)
 |-- RoofStyle: string (nullable = true)
 |-- RoofMatl: string (nullable = true)
 |-- Exterior1st: string (nullable = true)
 |--

In [38]:
train_df = train_df.dropna()
test_df = test_df.dropna()

# Example: Convert string columns to numerical
train_df = train_df.withColumn("LotFrontage", train_df["LotFrontage"].cast("double"))
train_df = train_df.withColumn("MasVnrArea", train_df["MasVnrArea"].cast("double"))
train_df = train_df.withColumn("GarageYrBlt", train_df["GarageYrBlt"].cast("double"))

test_df = test_df.withColumn("LotFrontage", test_df["LotFrontage"].cast("double"))
test_df = test_df.withColumn("MasVnrArea", test_df["MasVnrArea"].cast("double"))
test_df = test_df.withColumn("GarageYrBlt", test_df["GarageYrBlt"].cast("double"))


In [39]:
selected_features = [
    'MSSubClass', 'MSZoning', 'LotFrontage', 'LotArea', 'Street',
    'OverallQual', 'OverallCond', 'YearBuilt', 'YearRemodAdd', 'RoofStyle',
    'Condition1', 'Utilities', 'SaleType', 'SaleCondition', 'PoolArea',
    'PavedDrive', 'GarageCars', 'GarageArea', 'BedroomAbvGr', 'KitchenAbvGr',
    'FullBath', '1stFlrSF', '2ndFlrSF', 'CentralAir', 'Electrical',
    'Heating', 'Foundation', 'HouseStyle'
]
label_col = "SalePrice"

In [40]:
train_df = train_df.select(selected_features + [label_col])

In [41]:
test_df = test_df.select(selected_features)

In [52]:
categorical_cols = ['MSSubClass', 'MSZoning', 'Street', 'RoofStyle', 'Condition1', 'Utilities',
                    'SaleType', 'SaleCondition', 'PavedDrive', 'CentralAir', 'Electrical',
                    'Heating', 'Foundation', 'HouseStyle']
numerical_cols = ['LotFrontage', 'LotArea', 'OverallQual', 'OverallCond', 'YearBuilt', 'YearRemodAdd',
                  'PoolArea', 'GarageCars', 'GarageArea', 'BedroomAbvGr', 'KitchenAbvGr',
                  'FullBath', '1stFlrSF', '2ndFlrSF']

In [43]:
from pyspark.sql.functions import mean

mean_values = train_df.agg(*(mean(c).alias(c) for c in numerical_cols)).collect()[0]
mean_values_dict = mean_values.asDict()
# Fill missing values with mean for numerical columns
train_df = train_df.na.fill(mean_values_dict, subset=numerical_cols)
train_df = train_df.dropna()


In [44]:
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index", handleInvalid="keep") for col in categorical_cols]
encoder = OneHotEncoder(inputCols=[indexer.getOutputCol() for indexer in indexers], 
                        outputCols=[col+"_encoded" for col in categorical_cols])


In [45]:
assembler_inputs = encoder.getOutputCols() + numerical_cols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol='features',handleInvalid="skip")

In [46]:
pipeline = Pipeline(stages=indexers + [encoder] + [assembler])

In [47]:
pipeline_model = pipeline.fit(train_df)
train_df = pipeline_model.transform(train_df)

In [48]:
from pyspark.sql.functions import col
train_df = train_df.withColumn("GarageCars", col("GarageCars").cast("integer"))
train_df = train_df.withColumn("GarageArea", col("GarageArea").cast("integer"))

test_df = test_df.withColumn("GarageCars", col("GarageCars").cast("integer"))
test_df = test_df.withColumn("GarageArea", col("GarageArea").cast("integer"))

In [49]:
test_df = pipeline_model.transform(test_df)

In [50]:
from pyspark.ml.regression import LinearRegression

# Define the Linear Regression model
lr = LinearRegression(featuresCol='features', labelCol='SalePrice')

# Train the model
lr_model = lr.fit(train_df)

24/06/16 09:28:08 WARN Instrumentation: [0f3d9239] regParam is zero, which might cause numerical instability and overfitting.
24/06/16 09:28:10 WARN Instrumentation: [0f3d9239] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
                                                                                

In [51]:
predictions = lr_model.transform(test_df)
predictions.show()

+----------+--------+-----------+-------+------+-----------+-----------+---------+------------+---------+----------+---------+--------+-------------+--------+----------+----------+----------+------------+------------+--------+--------+--------+----------+----------+-------+----------+----------+----------------+--------------+------------+---------------+----------------+---------------+--------------+-------------------+----------------+----------------+----------------+-------------+----------------+----------------+------------------+----------------+--------------+-----------------+------------------+-----------------+----------------+---------------------+------------------+------------------+------------------+---------------+------------------+------------------+--------------------+------------------+
|MSSubClass|MSZoning|LotFrontage|LotArea|Street|OverallQual|OverallCond|YearBuilt|YearRemodAdd|RoofStyle|Condition1|Utilities|SaleType|SaleCondition|PoolArea|PavedDrive|GarageCars

In [62]:
from pyspark.sql.functions import col, sqrt
from pyspark.sql import functions as F


In [63]:
predictions_train = lr_model.transform(train_df)

# Calculate squared error
predictions_train = predictions_train.withColumn("squared_error", (col("SalePrice") - col("prediction")) ** 2)

# Calculate RMSE
rmse = predictions_train.agg(F.sqrt(F.mean(F.col("squared_error"))).alias("rmse")).first()["rmse"]

print(f"Root Mean Squared Error (RMSE) on Training Data: {rmse}")


[Stage 113:>                                                        (0 + 1) / 1]

Root Mean Squared Error (RMSE) on Training Data: 34103.125602421336


                                                                                

In [65]:
ls

 2A_House_Prediction.ipynb
'2A_House_Prediction_Linear Regression.ipynb'
 Employee_Compensation.csv
 [0m[01;34mHouse_Price_prediction_gbt_model[0m/
'ML_system_using_PySpark (1).ipynb'
'Part-2 Question Notebook-.ipynb'
 iot_devices.json
 [01;34mlrmodel[0m/
 [01;34mml-vm-notebook[0m/
 [01;34mspark-3.5.1-bin-hadoop3[0m/
 [01;34msubmission.csv[0m/
 test.csv
 train.csv
 week5_8_Pyspark_Assignment.ipynb


In [66]:
model_path = "House_Price_prediction_Linear_model"
# Save the model
lr_model.save(model_path)
print("Model Saved Successfully")

                                                                                

Model Saved Successfully


In [67]:
ls

 2A_House_Prediction.ipynb
'2A_House_Prediction_Linear Regression.ipynb'
 Employee_Compensation.csv
 [0m[01;34mHouse_Price_prediction_Linear_model[0m/
 [01;34mHouse_Price_prediction_gbt_model[0m/
'ML_system_using_PySpark (1).ipynb'
'Part-2 Question Notebook-.ipynb'
 iot_devices.json
 [01;34mlrmodel[0m/
 [01;34mml-vm-notebook[0m/
 [01;34mspark-3.5.1-bin-hadoop3[0m/
 [01;34msubmission.csv[0m/
 test.csv
 train.csv
 week5_8_Pyspark_Assignment.ipynb
