In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when
from pyspark.ml.feature import Imputer, StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize Spark session
spark = SparkSession.builder \
    .appName("HousePricePrediction") \
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/16 09:44:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/06/16 09:44:41 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
# Load data
train_data = spark.read.csv('train.csv', header=True, inferSchema=True)
test_data = spark.read.csv('test.csv', header=True, inferSchema=True)

# Display schema to understand data types
train_data.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- MSSubClass: integer (nullable = true)
 |-- MSZoning: string (nullable = true)
 |-- LotFrontage: string (nullable = true)
 |-- LotArea: integer (nullable = true)
 |-- Street: string (nullable = true)
 |-- Alley: string (nullable = true)
 |-- LotShape: string (nullable = true)
 |-- LandContour: string (nullable = true)
 |-- Utilities: string (nullable = true)
 |-- LotConfig: string (nullable = true)
 |-- LandSlope: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Condition1: string (nullable = true)
 |-- Condition2: string (nullable = true)
 |-- BldgType: string (nullable = true)
 |-- HouseStyle: string (nullable = true)
 |-- OverallQual: integer (nullable = true)
 |-- OverallCond: integer (nullable = true)
 |-- YearBuilt: integer (nullable = true)
 |-- YearRemodAdd: integer (nullable = true)
 |-- RoofStyle: string (nullable = true)
 |-- RoofMatl: string (nullable = true)
 |-- Exterior1st: string (nullable = true)
 |--

In [4]:
test_data.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- MSSubClass: integer (nullable = true)
 |-- MSZoning: string (nullable = true)
 |-- LotFrontage: string (nullable = true)
 |-- LotArea: integer (nullable = true)
 |-- Street: string (nullable = true)
 |-- Alley: string (nullable = true)
 |-- LotShape: string (nullable = true)
 |-- LandContour: string (nullable = true)
 |-- Utilities: string (nullable = true)
 |-- LotConfig: string (nullable = true)
 |-- LandSlope: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Condition1: string (nullable = true)
 |-- Condition2: string (nullable = true)
 |-- BldgType: string (nullable = true)
 |-- HouseStyle: string (nullable = true)
 |-- OverallQual: integer (nullable = true)
 |-- OverallCond: integer (nullable = true)
 |-- YearBuilt: integer (nullable = true)
 |-- YearRemodAdd: integer (nullable = true)
 |-- RoofStyle: string (nullable = true)
 |-- RoofMatl: string (nullable = true)
 |-- Exterior1st: string (nullable = true)
 |--

In [5]:
train_data.select([count(when(col(c).isNull(), c)).alias(c) for c in train_data.columns]).show()

24/06/16 09:44:49 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+---+----------+--------+-----------+-------+------+-----+--------+-----------+---------+---------+---------+------------+----------+----------+--------+----------+-----------+-----------+---------+------------+---------+--------+-----------+-----------+----------+----------+---------+---------+----------+--------+--------+------------+------------+----------+------------+----------+---------+-----------+-------+---------+----------+----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+-----------+------------+----------+----------+-----------+----------+-----------+------------+----------+----------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+------+-----+-----------+-------+------+------+--------+-------------+---------+
| Id|MSSubClass|MSZoning|LotFrontage|LotArea|Street|Alley|LotShape|LandContour|Utilities|LotConfig|LandSlope|Neighborhood|Condition1|Condition

In [6]:
numeric_columns = [
    'LotFrontage', 'MasVnrArea', 'GarageYrBlt', 'LotArea', 'OverallQual', 'OverallCond',
    'YearBuilt', 'YearRemodAdd', 'BsmtFinSF1', 'BsmtFinSF2', 'BsmtUnfSF', 'TotalBsmtSF',
    '1stFlrSF', '2ndFlrSF', 'LowQualFinSF', 'GrLivArea', 'BsmtFullBath', 'BsmtHalfBath',
    'FullBath', 'HalfBath', 'BedroomAbvGr', 'KitchenAbvGr', 'TotRmsAbvGrd', 'Fireplaces',
    'GarageCars', 'GarageArea', 'WoodDeckSF', 'OpenPorchSF', 'EnclosedPorch', '3SsnPorch',
    'ScreenPorch', 'PoolArea', 'MiscVal', 'MoSold', 'YrSold'
]

train_data = train_data.select([col(c).cast("double").alias(c) if c in numeric_columns else col(c) for c in train_data.columns])
test_data = test_data.select([col(c).cast("double").alias(c) if c in numeric_columns else col(c) for c in test_data.columns])


imputer = Imputer(inputCols=['LotFrontage', 'MasVnrArea', 'GarageYrBlt'],
                  outputCols=['LotFrontage_imputed', 'MasVnrArea_imputed', 'GarageYrBlt_imputed'])
train_data = imputer.fit(train_data).transform(train_data)
test_data = imputer.fit(test_data).transform(test_data)

# Drop original columns and rename imputed columns
train_data = train_data.drop('LotFrontage', 'MasVnrArea', 'GarageYrBlt')
test_data = test_data.drop('LotFrontage', 'MasVnrArea', 'GarageYrBlt')
train_data = train_data.withColumnRenamed('LotFrontage_imputed', 'LotFrontage')
train_data = train_data.withColumnRenamed('MasVnrArea_imputed', 'MasVnrArea')
train_data = train_data.withColumnRenamed('GarageYrBlt_imputed', 'GarageYrBlt')
test_data = test_data.withColumnRenamed('LotFrontage_imputed', 'LotFrontage')
test_data = test_data.withColumnRenamed('MasVnrArea_imputed', 'MasVnrArea')
test_data = test_data.withColumnRenamed('GarageYrBlt_imputed', 'GarageYrBlt')

In [7]:
# Get list of columns in train_data
train_columns = train_data.columns
print("Train Data Columns:")
print(train_columns)

# Get list of columns in test_data
test_columns = test_data.columns
print("Test Data Columns:")
print(test_columns)

Train Data Columns:
['Id', 'MSSubClass', 'MSZoning', 'LotArea', 'Street', 'Alley', 'LotShape', 'LandContour', 'Utilities', 'LotConfig', 'LandSlope', 'Neighborhood', 'Condition1', 'Condition2', 'BldgType', 'HouseStyle', 'OverallQual', 'OverallCond', 'YearBuilt', 'YearRemodAdd', 'RoofStyle', 'RoofMatl', 'Exterior1st', 'Exterior2nd', 'MasVnrType', 'ExterQual', 'ExterCond', 'Foundation', 'BsmtQual', 'BsmtCond', 'BsmtExposure', 'BsmtFinType1', 'BsmtFinSF1', 'BsmtFinType2', 'BsmtFinSF2', 'BsmtUnfSF', 'TotalBsmtSF', 'Heating', 'HeatingQC', 'CentralAir', 'Electrical', '1stFlrSF', '2ndFlrSF', 'LowQualFinSF', 'GrLivArea', 'BsmtFullBath', 'BsmtHalfBath', 'FullBath', 'HalfBath', 'BedroomAbvGr', 'KitchenAbvGr', 'KitchenQual', 'TotRmsAbvGrd', 'Functional', 'Fireplaces', 'FireplaceQu', 'GarageType', 'GarageFinish', 'GarageCars', 'GarageArea', 'GarageQual', 'GarageCond', 'PavedDrive', 'WoodDeckSF', 'OpenPorchSF', 'EnclosedPorch', '3SsnPorch', 'ScreenPorch', 'PoolArea', 'PoolQC', 'Fence', 'MiscFeature'

In [8]:
from pyspark.sql.types import StringType

# List of categorical and numerical features
categorical_features = [col for col in train_data.columns if train_data.schema[col].dataType == StringType() and col != 'SalePrice']
numerical_features = [col for col in train_data.columns if col not in categorical_features + ['Id', 'SalePrice']]

# Index and encode categorical variables
indexers = [StringIndexer(inputCol=column, outputCol=f"{column}_index") for column in categorical_features]
encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol=f"{indexer.getOutputCol()}_encoded") for indexer in indexers]

# Assemble all features into a single vector
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + numerical_features, outputCol="features")

# Create a pipeline for transformations
pipeline = Pipeline(stages=indexers + encoders + [assembler])

# Fit and transform the training data
pipeline_model = pipeline.fit(train_data)
train_data = pipeline_model.transform(train_data)

# Transform the test data
test_data = pipeline_model.transform(test_data)


24/06/16 09:44:54 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [9]:
# Define the model
gbt = GBTRegressor(featuresCol='features', labelCol='SalePrice')

# Train the Gradient Boosted Trees model
gbt_model = gbt.fit(train_data)
gbt_predictions = gbt_model.transform(train_data)

                                                                                

In [10]:
# Evaluate the models
evaluator = RegressionEvaluator(labelCol='SalePrice', predictionCol='prediction', metricName='rmse')
gbt_rmse = evaluator.evaluate(gbt_predictions)
print(f"Root Mean Squared Error (RMSE) on training data for Gradient Boosted Trees: {gbt_rmse}")


24/06/16 09:45:32 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


Root Mean Squared Error (RMSE) on training data for Gradient Boosted Trees: 17627.14806753273


                                                                                

In [12]:
# Make predictions on the test data using the best model (assuming GBT here)
test_predictions = gbt_model.transform(test_data)

# Prepare the submission file
#submission = test_data.select('Id').withColumn('SalePrice', col('prediction'))
#submission.select('Id', 'SalePrice').write.csv('submission.csv', header=True)


In [13]:
test_predictions.select('prediction').show(5)

+------------------+
|        prediction|
+------------------+
|121938.42338254508|
|152222.72998702098|
|196206.33819442635|
|190128.60808950805|
|184158.62942123946|
+------------------+
only showing top 5 rows



In [14]:
submission = test_predictions.select('Id', col('prediction').alias('SalePrice'))
submission.show(5)

+----+------------------+
|  Id|         SalePrice|
+----+------------------+
|1461|121938.42338254508|
|1462|152222.72998702098|
|1463|196206.33819442635|
|1464|190128.60808950805|
|1465|184158.62942123946|
+----+------------------+
only showing top 5 rows



In [23]:
ls

 2A_House_Prediction.ipynb
'2A_House_Prediction_Linear Regression.ipynb'
 Employee_Compensation.csv
'ML_system_using_PySpark (1).ipynb'
'Part-2 Question Notebook-.ipynb'
 iot_devices.json
 [0m[01;34mlrmodel[0m/
 [01;34mml-vm-notebook[0m/
 [01;34mspark-3.5.1-bin-hadoop3[0m/
 [01;34msubmission.csv[0m/
 test.csv
 train.csv
 week5_8_Pyspark_Assignment.ipynb


In [24]:
model_path = "House_Price_prediction_gbt_model"

# Save the model
gbt_model.save(model_path)

                                                                                

In [25]:
ls

 2A_House_Prediction.ipynb
'2A_House_Prediction_Linear Regression.ipynb'
 Employee_Compensation.csv
 [0m[01;34mHouse_Price_prediction_gbt_model[0m/
'ML_system_using_PySpark (1).ipynb'
'Part-2 Question Notebook-.ipynb'
 iot_devices.json
 [01;34mlrmodel[0m/
 [01;34mml-vm-notebook[0m/
 [01;34mspark-3.5.1-bin-hadoop3[0m/
 [01;34msubmission.csv[0m/
 test.csv
 train.csv
 week5_8_Pyspark_Assignment.ipynb
