In [1]:
# Mounting Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# Installing findspark
!pip install findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [3]:
# Installing pyspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 44 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 45.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=982876a45b9d140d235b736227f5eeb34fd4ebd7c386873fd45cee29f4165709
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [4]:
# Installing java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [5]:
# Installing spark
!wget https://downloads.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-hadoop2.tgz

--2022-07-31 09:42:01--  https://downloads.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-hadoop2.tgz
Resolving downloads.apache.org (downloads.apache.org)... 88.99.95.219, 135.181.214.104, 2a01:4f8:10a:201a::2, ...
Connecting to downloads.apache.org (downloads.apache.org)|88.99.95.219|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 274079476 (261M) [application/x-gzip]
Saving to: ‘spark-3.3.0-bin-hadoop2.tgz’


2022-07-31 09:42:19 (14.2 MB/s) - ‘spark-3.3.0-bin-hadoop2.tgz’ saved [274079476/274079476]



In [6]:
import findspark
findspark.init()

In [7]:
import pyspark.sql.functions as pyf

In [8]:
# Creating a spark session
from pyspark.sql import DataFrame, SparkSession
spark = SparkSession.builder.appName("House Price Prediction").getOrCreate()
spark

In [9]:
# Specifying the file path
file_location = "/content/drive/MyDrive/ColabNotebooks/train.csv"
file_type = "csv"
# CSV options
infer_schema = True
first_row_is_header = True
delimiter = ","

In [10]:
# The applied options are for CSV files. For other file types, these will be ignored.
DF = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

In [11]:
# Reviewing the dataset
DF.show(10, False)

+---+----------+--------+-----------+-------+------+-----+--------+-----------+---------+---------+---------+------------+----------+----------+--------+----------+-----------+-----------+---------+------------+---------+--------+-----------+-----------+----------+----------+---------+---------+----------+--------+--------+------------+------------+----------+------------+----------+---------+-----------+-------+---------+----------+----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+-----------+------------+----------+----------+-----------+----------+-----------+------------+----------+----------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+------+-----+-----------+-------+------+------+--------+-------------+---------+
|Id |MSSubClass|MSZoning|LotFrontage|LotArea|Street|Alley|LotShape|LandContour|Utilities|LotConfig|LandSlope|Neighborhood|Condition1|Condition

In [12]:
# Printing the info/schema 
DF.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- MSSubClass: integer (nullable = true)
 |-- MSZoning: string (nullable = true)
 |-- LotFrontage: string (nullable = true)
 |-- LotArea: integer (nullable = true)
 |-- Street: string (nullable = true)
 |-- Alley: string (nullable = true)
 |-- LotShape: string (nullable = true)
 |-- LandContour: string (nullable = true)
 |-- Utilities: string (nullable = true)
 |-- LotConfig: string (nullable = true)
 |-- LandSlope: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Condition1: string (nullable = true)
 |-- Condition2: string (nullable = true)
 |-- BldgType: string (nullable = true)
 |-- HouseStyle: string (nullable = true)
 |-- OverallQual: integer (nullable = true)
 |-- OverallCond: integer (nullable = true)
 |-- YearBuilt: integer (nullable = true)
 |-- YearRemodAdd: integer (nullable = true)
 |-- RoofStyle: string (nullable = true)
 |-- RoofMatl: string (nullable = true)
 |-- Exterior1st: string (nullable = true)
 |--

In [13]:
# Checking for null values
from pyspark.sql.functions import isnull, when, count, col
DF.select([count(when(isnull(c), c)).alias(c) for c in DF.columns]).show()

+---+----------+--------+-----------+-------+------+-----+--------+-----------+---------+---------+---------+------------+----------+----------+--------+----------+-----------+-----------+---------+------------+---------+--------+-----------+-----------+----------+----------+---------+---------+----------+--------+--------+------------+------------+----------+------------+----------+---------+-----------+-------+---------+----------+----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+-----------+------------+----------+----------+-----------+----------+-----------+------------+----------+----------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+------+-----+-----------+-------+------+------+--------+-------------+---------+
| Id|MSSubClass|MSZoning|LotFrontage|LotArea|Street|Alley|LotShape|LandContour|Utilities|LotConfig|LandSlope|Neighborhood|Condition1|Condition

In [14]:
DF.distinct().show()

+----+----------+--------+-----------+-------+------+-----+--------+-----------+---------+---------+---------+------------+----------+----------+--------+----------+-----------+-----------+---------+------------+---------+--------+-----------+-----------+----------+----------+---------+---------+----------+--------+--------+------------+------------+----------+------------+----------+---------+-----------+-------+---------+----------+----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+-----------+------------+----------+----------+-----------+----------+-----------+------------+----------+----------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+------+-----+-----------+-------+------+------+--------+-------------+---------+
|  Id|MSSubClass|MSZoning|LotFrontage|LotArea|Street|Alley|LotShape|LandContour|Utilities|LotConfig|LandSlope|Neighborhood|Condition1|Conditi

In [15]:
# Replacing NA to null values for imputation
def fixing_null_values(DF, col_name):
  if col_name != 'MasVnrType' and col_name != 'Electrical':
    new_DF = DF.withColumn(col_name, when((col(col_name)=='NA'), None).otherwise(col(col_name)).cast("float"))
  else:
    new_DF = DF.withColumn(col_name, when((col(col_name)=='NA'), None).otherwise(col(col_name)))
  return new_DF

DF2 = fixing_null_values(DF, 'LotFrontage')
DF3 = fixing_null_values(DF2, 'MasVnrType')
DF4 = fixing_null_values(DF3, 'MasVnrArea')
DF5 = fixing_null_values(DF4, 'Electrical')
DF6 = fixing_null_values(DF5, 'GarageYrBlt')
DF6.show()

+---+----------+--------+-----------+-------+------+-----+--------+-----------+---------+---------+---------+------------+----------+----------+--------+----------+-----------+-----------+---------+------------+---------+--------+-----------+-----------+----------+----------+---------+---------+----------+--------+--------+------------+------------+----------+------------+----------+---------+-----------+-------+---------+----------+----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+-----------+------------+----------+----------+-----------+----------+-----------+------------+----------+----------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+------+-----+-----------+-------+------+------+--------+-------------+---------+
| Id|MSSubClass|MSZoning|LotFrontage|LotArea|Street|Alley|LotShape|LandContour|Utilities|LotConfig|LandSlope|Neighborhood|Condition1|Condition

In [16]:
# Confirming that there are null values in the dataset
DF6.distinct().show()

+----+----------+--------+-----------+-------+------+-----+--------+-----------+---------+---------+---------+------------+----------+----------+--------+----------+-----------+-----------+---------+------------+---------+--------+-----------+-----------+----------+----------+---------+---------+----------+--------+--------+------------+------------+----------+------------+----------+---------+-----------+-------+---------+----------+----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+-----------+------------+----------+----------+-----------+----------+-----------+------------+----------+----------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+------+-----+-----------+-------+------+------+--------+-------------+---------+
|  Id|MSSubClass|MSZoning|LotFrontage|LotArea|Street|Alley|LotShape|LandContour|Utilities|LotConfig|LandSlope|Neighborhood|Condition1|Conditi

In [17]:
from pyspark.sql.functions import isnull, when, count, col
DF6.select([count(when(isnull(c), c)).alias(c) for c in DF6.columns]).show()

+---+----------+--------+-----------+-------+------+-----+--------+-----------+---------+---------+---------+------------+----------+----------+--------+----------+-----------+-----------+---------+------------+---------+--------+-----------+-----------+----------+----------+---------+---------+----------+--------+--------+------------+------------+----------+------------+----------+---------+-----------+-------+---------+----------+----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+-----------+------------+----------+----------+-----------+----------+-----------+------------+----------+----------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+------+-----+-----------+-------+------+------+--------+-------------+---------+
| Id|MSSubClass|MSZoning|LotFrontage|LotArea|Street|Alley|LotShape|LandContour|Utilities|LotConfig|LandSlope|Neighborhood|Condition1|Condition

In [18]:
# Null value imputation
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=['LotFrontage', 'MasVnrArea',  'GarageYrBlt'], outputCols=["{}_imputed".format(c) for c in ['LotFrontage', 'MasVnrArea', 'GarageYrBlt']]).setStrategy("median")

In [19]:
imputer.fit(DF6).transform(DF6).show()

+---+----------+--------+-----------+-------+------+-----+--------+-----------+---------+---------+---------+------------+----------+----------+--------+----------+-----------+-----------+---------+------------+---------+--------+-----------+-----------+----------+----------+---------+---------+----------+--------+--------+------------+------------+----------+------------+----------+---------+-----------+-------+---------+----------+----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+-----------+------------+----------+----------+-----------+----------+-----------+------------+----------+----------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+------+-----+-----------+-------+------+------+--------+-------------+---------+-------------------+------------------+-------------------+
| Id|MSSubClass|MSZoning|LotFrontage|LotArea|Street|Alley|LotShape|LandContour|Util

In [20]:
# Replacing the null values in categorical columns
DF7 = DF6.na.fill("None",["MasVnrType"]).na.fill("SBrkr",["Electrical"])

In [21]:
# Dropping these columns post null value imputation
DF8 = DF7.drop('LotFrontage', 'MasVnrArea',  'GarageYrBlt')

In [22]:
DF8.show()

+---+----------+--------+-------+------+-----+--------+-----------+---------+---------+---------+------------+----------+----------+--------+----------+-----------+-----------+---------+------------+---------+--------+-----------+-----------+----------+---------+---------+----------+--------+--------+------------+------------+----------+------------+----------+---------+-----------+-------+---------+----------+----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+-----------+------------+----------+----------+-----------+----------+------------+----------+----------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+------+-----+-----------+-------+------+------+--------+-------------+---------+
| Id|MSSubClass|MSZoning|LotArea|Street|Alley|LotShape|LandContour|Utilities|LotConfig|LandSlope|Neighborhood|Condition1|Condition2|BldgType|HouseStyle|OverallQual|OverallCond|Y

In [23]:
# Checking for null values one last time - no null values found
from pyspark.sql.functions import isnull, when, count, col
DF8.select([count(when(isnull(c), c)).alias(c) for c in DF8.columns]).show()

+---+----------+--------+-------+------+-----+--------+-----------+---------+---------+---------+------------+----------+----------+--------+----------+-----------+-----------+---------+------------+---------+--------+-----------+-----------+----------+---------+---------+----------+--------+--------+------------+------------+----------+------------+----------+---------+-----------+-------+---------+----------+----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+-----------+------------+----------+----------+-----------+----------+------------+----------+----------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+------+-----+-----------+-------+------+------+--------+-------------+---------+
| Id|MSSubClass|MSZoning|LotArea|Street|Alley|LotShape|LandContour|Utilities|LotConfig|LandSlope|Neighborhood|Condition1|Condition2|BldgType|HouseStyle|OverallQual|OverallCond|Y

In [24]:
# Checking for duplicate values - no duplicate values found
DF8.groupBy(DF8.columns).count().filter("count > 1").show()

+---+----------+--------+-------+------+-----+--------+-----------+---------+---------+---------+------------+----------+----------+--------+----------+-----------+-----------+---------+------------+---------+--------+-----------+-----------+----------+---------+---------+----------+--------+--------+------------+------------+----------+------------+----------+---------+-----------+-------+---------+----------+----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+-----------+------------+----------+----------+-----------+----------+------------+----------+----------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+------+-----+-----------+-------+------+------+--------+-------------+---------+-----+
| Id|MSSubClass|MSZoning|LotArea|Street|Alley|LotShape|LandContour|Utilities|LotConfig|LandSlope|Neighborhood|Condition1|Condition2|BldgType|HouseStyle|OverallQual|Overall

In [25]:
DF.columns

['Id',
 'MSSubClass',
 'MSZoning',
 'LotFrontage',
 'LotArea',
 'Street',
 'Alley',
 'LotShape',
 'LandContour',
 'Utilities',
 'LotConfig',
 'LandSlope',
 'Neighborhood',
 'Condition1',
 'Condition2',
 'BldgType',
 'HouseStyle',
 'OverallQual',
 'OverallCond',
 'YearBuilt',
 'YearRemodAdd',
 'RoofStyle',
 'RoofMatl',
 'Exterior1st',
 'Exterior2nd',
 'MasVnrType',
 'MasVnrArea',
 'ExterQual',
 'ExterCond',
 'Foundation',
 'BsmtQual',
 'BsmtCond',
 'BsmtExposure',
 'BsmtFinType1',
 'BsmtFinSF1',
 'BsmtFinType2',
 'BsmtFinSF2',
 'BsmtUnfSF',
 'TotalBsmtSF',
 'Heating',
 'HeatingQC',
 'CentralAir',
 'Electrical',
 '1stFlrSF',
 '2ndFlrSF',
 'LowQualFinSF',
 'GrLivArea',
 'BsmtFullBath',
 'BsmtHalfBath',
 'FullBath',
 'HalfBath',
 'BedroomAbvGr',
 'KitchenAbvGr',
 'KitchenQual',
 'TotRmsAbvGrd',
 'Functional',
 'Fireplaces',
 'FireplaceQu',
 'GarageType',
 'GarageYrBlt',
 'GarageFinish',
 'GarageCars',
 'GarageArea',
 'GarageQual',
 'GarageCond',
 'PavedDrive',
 'WoodDeckSF',
 'OpenPorchSF'

In [26]:
DF.summary()

DataFrame[summary: string, Id: string, MSSubClass: string, MSZoning: string, LotFrontage: string, LotArea: string, Street: string, Alley: string, LotShape: string, LandContour: string, Utilities: string, LotConfig: string, LandSlope: string, Neighborhood: string, Condition1: string, Condition2: string, BldgType: string, HouseStyle: string, OverallQual: string, OverallCond: string, YearBuilt: string, YearRemodAdd: string, RoofStyle: string, RoofMatl: string, Exterior1st: string, Exterior2nd: string, MasVnrType: string, MasVnrArea: string, ExterQual: string, ExterCond: string, Foundation: string, BsmtQual: string, BsmtCond: string, BsmtExposure: string, BsmtFinType1: string, BsmtFinSF1: string, BsmtFinType2: string, BsmtFinSF2: string, BsmtUnfSF: string, TotalBsmtSF: string, Heating: string, HeatingQC: string, CentralAir: string, Electrical: string, 1stFlrSF: string, 2ndFlrSF: string, LowQualFinSF: string, GrLivArea: string, BsmtFullBath: string, BsmtHalfBath: string, FullBath: string, H

In [27]:
columnList = [item[0] for item in DF8.dtypes if item[1].startswith('string')]

In [28]:
output_column_list = list(map(lambda x: x+"_index", columnList))

In [29]:
# String indexer
from pyspark.ml.feature import StringIndexer, OneHotEncoder
indexers = StringIndexer(inputCols=columnList, 
                         outputCols=output_column_list)
strindexedDF = indexers.fit(DF8).transform(DF8)
DF9 = strindexedDF.select("*")


In [30]:
DF9.show()

+---+----------+--------+-------+------+-----+--------+-----------+---------+---------+---------+------------+----------+----------+--------+----------+-----------+-----------+---------+------------+---------+--------+-----------+-----------+----------+---------+---------+----------+--------+--------+------------+------------+----------+------------+----------+---------+-----------+-------+---------+----------+----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+-----------+------------+----------+----------+-----------+----------+------------+----------+----------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+------+-----+-----------+-------+------+------+--------+-------------+---------+--------------+------------+-----------+--------------+-----------------+---------------+---------------+---------------+------------------+----------------+----------------+----

In [31]:
# Dropping columns post String Indexing
DF10 = DF9.drop(*columnList)

In [32]:
DF10.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- MSSubClass: integer (nullable = true)
 |-- LotArea: integer (nullable = true)
 |-- OverallQual: integer (nullable = true)
 |-- OverallCond: integer (nullable = true)
 |-- YearBuilt: integer (nullable = true)
 |-- YearRemodAdd: integer (nullable = true)
 |-- BsmtFinSF1: integer (nullable = true)
 |-- BsmtFinSF2: integer (nullable = true)
 |-- BsmtUnfSF: integer (nullable = true)
 |-- TotalBsmtSF: integer (nullable = true)
 |-- 1stFlrSF: integer (nullable = true)
 |-- 2ndFlrSF: integer (nullable = true)
 |-- LowQualFinSF: integer (nullable = true)
 |-- GrLivArea: integer (nullable = true)
 |-- BsmtFullBath: integer (nullable = true)
 |-- BsmtHalfBath: integer (nullable = true)
 |-- FullBath: integer (nullable = true)
 |-- HalfBath: integer (nullable = true)
 |-- BedroomAbvGr: integer (nullable = true)
 |-- KitchenAbvGr: integer (nullable = true)
 |-- TotRmsAbvGrd: integer (nullable = true)
 |-- Fireplaces: integer (nullable = true)
 |-- Garage

In [33]:
DF10.show()

+---+----------+-------+-----------+-----------+---------+------------+----------+----------+---------+-----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+------------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+-------+------+------+---------+--------------+------------+-----------+--------------+-----------------+---------------+---------------+---------------+------------------+----------------+----------------+--------------+----------------+---------------+--------------+-----------------+-----------------+----------------+---------------+---------------+----------------+--------------+--------------+------------------+------------------+------------------+-------------+---------------+----------------+----------------+-----------------+----------------+-----------------+----------------+------------------+----------------+----------------+---------------

In [34]:
DF10.columns

['Id',
 'MSSubClass',
 'LotArea',
 'OverallQual',
 'OverallCond',
 'YearBuilt',
 'YearRemodAdd',
 'BsmtFinSF1',
 'BsmtFinSF2',
 'BsmtUnfSF',
 'TotalBsmtSF',
 '1stFlrSF',
 '2ndFlrSF',
 'LowQualFinSF',
 'GrLivArea',
 'BsmtFullBath',
 'BsmtHalfBath',
 'FullBath',
 'HalfBath',
 'BedroomAbvGr',
 'KitchenAbvGr',
 'TotRmsAbvGrd',
 'Fireplaces',
 'GarageCars',
 'GarageArea',
 'WoodDeckSF',
 'OpenPorchSF',
 'EnclosedPorch',
 '3SsnPorch',
 'ScreenPorch',
 'PoolArea',
 'MiscVal',
 'MoSold',
 'YrSold',
 'SalePrice',
 'MSZoning_index',
 'Street_index',
 'Alley_index',
 'LotShape_index',
 'LandContour_index',
 'Utilities_index',
 'LotConfig_index',
 'LandSlope_index',
 'Neighborhood_index',
 'Condition1_index',
 'Condition2_index',
 'BldgType_index',
 'HouseStyle_index',
 'RoofStyle_index',
 'RoofMatl_index',
 'Exterior1st_index',
 'Exterior2nd_index',
 'MasVnrType_index',
 'ExterQual_index',
 'ExterCond_index',
 'Foundation_index',
 'BsmtQual_index',
 'BsmtCond_index',
 'BsmtExposure_index',
 'Bsmt

In [35]:
input_cols = ['MSSubClass',
 'LotArea',
 'OverallQual',
 'OverallCond',
 'YearBuilt',
 'YearRemodAdd',
 'BsmtFinSF1',
 'BsmtFinSF2',
 'BsmtUnfSF',
 'TotalBsmtSF',
 '1stFlrSF',
 '2ndFlrSF',
 'LowQualFinSF',
 'GrLivArea',
 'BsmtFullBath',
 'BsmtHalfBath',
 'FullBath',
 'HalfBath',
 'BedroomAbvGr',
 'KitchenAbvGr',
 'TotRmsAbvGrd',
 'Fireplaces',
 'GarageCars',
 'GarageArea',
 'WoodDeckSF',
 'OpenPorchSF',
 'EnclosedPorch',
 '3SsnPorch',
 'ScreenPorch',
 'PoolArea',
 'MiscVal',
 'MoSold',
 'YrSold']

In [36]:
output_cols = list(map(lambda x: x+"_scaled", input_cols))

In [37]:
# Vectorizing the independent features
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=input_cols,outputCol="Independent_Features")

In [38]:
DF11 = featureassembler.transform(DF10)

In [39]:
DF11.show()

+---+----------+-------+-----------+-----------+---------+------------+----------+----------+---------+-----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+------------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+-------+------+------+---------+--------------+------------+-----------+--------------+-----------------+---------------+---------------+---------------+------------------+----------------+----------------+--------------+----------------+---------------+--------------+-----------------+-----------------+----------------+---------------+---------------+----------------+--------------+--------------+------------------+------------------+------------------+-------------+---------------+----------------+----------------+-----------------+----------------+-----------------+----------------+------------------+----------------+----------------+---------------

In [40]:
# Import StandardScaler from pyspark.ml.feature package
from pyspark.ml.feature import StandardScaler

# Create the StandardScaler object. It only take feature column (dense vector)
stdscaler = StandardScaler(inputCol="Independent_Features", outputCol="Scaled_Features")

# Fit the StandardScaler object on the output of the dense vector data and transform
stdscaledDF = stdscaler.fit(DF11).transform(DF11)
stdscaledDF.select("*").show()

+---+----------+-------+-----------+-----------+---------+------------+----------+----------+---------+-----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+------------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+-------+------+------+---------+--------------+------------+-----------+--------------+-----------------+---------------+---------------+---------------+------------------+----------------+----------------+--------------+----------------+---------------+--------------+-----------------+-----------------+----------------+---------------+---------------+----------------+--------------+--------------+------------------+------------------+------------------+-------------+---------------+----------------+----------------+-----------------+----------------+-----------------+----------------+------------------+----------------+----------------+---------------

In [41]:
DF12 = stdscaledDF.drop(*input_cols, "Independent_Features")

In [42]:
DF12.show()

+---+---------+--------------+------------+-----------+--------------+-----------------+---------------+---------------+---------------+------------------+----------------+----------------+--------------+----------------+---------------+--------------+-----------------+-----------------+----------------+---------------+---------------+----------------+--------------+--------------+------------------+------------------+------------------+-------------+---------------+----------------+----------------+-----------------+----------------+-----------------+----------------+------------------+----------------+----------------+----------------+------------+-----------+-----------------+--------------+-------------------+--------------------+
| Id|SalePrice|MSZoning_index|Street_index|Alley_index|LotShape_index|LandContour_index|Utilities_index|LotConfig_index|LandSlope_index|Neighborhood_index|Condition1_index|Condition2_index|BldgType_index|HouseStyle_index|RoofStyle_index|RoofMatl_index|Ext

In [43]:
DF12.columns

['Id',
 'SalePrice',
 'MSZoning_index',
 'Street_index',
 'Alley_index',
 'LotShape_index',
 'LandContour_index',
 'Utilities_index',
 'LotConfig_index',
 'LandSlope_index',
 'Neighborhood_index',
 'Condition1_index',
 'Condition2_index',
 'BldgType_index',
 'HouseStyle_index',
 'RoofStyle_index',
 'RoofMatl_index',
 'Exterior1st_index',
 'Exterior2nd_index',
 'MasVnrType_index',
 'ExterQual_index',
 'ExterCond_index',
 'Foundation_index',
 'BsmtQual_index',
 'BsmtCond_index',
 'BsmtExposure_index',
 'BsmtFinType1_index',
 'BsmtFinType2_index',
 'Heating_index',
 'HeatingQC_index',
 'CentralAir_index',
 'Electrical_index',
 'KitchenQual_index',
 'Functional_index',
 'FireplaceQu_index',
 'GarageType_index',
 'GarageFinish_index',
 'GarageQual_index',
 'GarageCond_index',
 'PavedDrive_index',
 'PoolQC_index',
 'Fence_index',
 'MiscFeature_index',
 'SaleType_index',
 'SaleCondition_index',
 'Scaled_Features']

In [44]:
# Import VectorAssembler from pyspark.ml.feature package
from pyspark.ml.feature import VectorAssembler
# Create a list of all the variables that you want to create feature vectors
# These features are then further used for training model
features_col = ['MSZoning_index',
 'Street_index',
 'Alley_index',
 'LotShape_index',
 'LandContour_index',
 'Utilities_index',
 'LotConfig_index',
 'LandSlope_index',
 'Neighborhood_index',
 'Condition1_index',
 'Condition2_index',
 'BldgType_index',
 'HouseStyle_index',
 'RoofStyle_index',
 'RoofMatl_index',
 'Exterior1st_index',
 'Exterior2nd_index',
 'MasVnrType_index',
 'ExterQual_index',
 'ExterCond_index',
 'Foundation_index',
 'BsmtQual_index',
 'BsmtCond_index',
 'BsmtExposure_index',
 'BsmtFinType1_index',
 'BsmtFinType2_index',
 'Heating_index',
 'HeatingQC_index',
 'CentralAir_index',
 'Electrical_index',
 'KitchenQual_index',
 'Functional_index',
 'FireplaceQu_index',
 'GarageType_index',
 'GarageFinish_index',
 'GarageQual_index',
 'GarageCond_index',
 'PavedDrive_index',
 'PoolQC_index',
 'Fence_index',
 'MiscFeature_index',
 'SaleType_index',
 'SaleCondition_index',
 'Scaled_Features']
# Create the VectorAssembler object
assembler = VectorAssembler(inputCols=features_col, outputCol="independent_features")
DF13 = assembler.transform(DF12)
final_DF = DF13.select('independent_features', 'SalePrice')
final_DF_without_sp = DF13.select('independent_features')

In [45]:
final_DF.show()

+--------------------+---------+
|independent_features|SalePrice|
+--------------------+---------+
|(76,[8,12,17,18,2...|   208500|
|(76,[6,8,9,15,16,...|   181500|
|(76,[3,8,12,17,18...|   223500|
|(76,[3,6,8,12,15,...|   140000|
|(76,[3,6,8,12,17,...|   250000|
|(76,[3,8,12,20,21...|   143000|
|(76,[8,17,18,21,2...|   307000|
|(76,[3,6,8,9,12,1...|   200000|
|(76,[0,8,9,12,15,...|   129900|
|(76,[6,8,9,10,11,...|   118000|
|(76,[8,13,15,16,2...|   129500|
|(76,[3,8,12,13,15...|   345000|
|(76,[3,8,13,15,16...|   144000|
|(76,[3,8,17,18,21...|   279500|
|(76,[3,6,13,15,16...|   157000|
|(76,[0,6,8,12,15,...|   132000|
|(76,[3,6,15,16,17...|   149000|
|(76,[8,11,15,16,2...|    90000|
|(76,[8,9,24,30,33...|   159000|
|(76,[13,15,16,20,...|   139000|
+--------------------+---------+
only showing top 20 rows



In [46]:
# Train - test split
# We spilt the data into 70-30 set
# Training Set - 70% obesevations
# Testing Set - 30% observations
trainDF, testDF =  final_DF.randomSplit([0.7,0.3], seed = 42)

# print the count of observations in each set
print("Observations in training set = ", trainDF.count())
print("Observations in testing set = ", testDF.count())

Observations in training set =  1069
Observations in testing set =  391


In [47]:
# Linear Regression
from pyspark.ml.regression import LinearRegression
regressor=LinearRegression(featuresCol='independent_features', labelCol='SalePrice')
lr_model=regressor.fit(trainDF)

In [48]:
# Coefficients
lr_model.coefficients

DenseVector([270.2467, -43388.1032, 392.7587, -911.039, -2625.3131, -65764.4891, 1242.4279, 9041.409, 714.3812, -3560.0153, -11164.3189, 4607.1104, 1243.1306, 4003.8817, -7773.0248, -523.4145, 242.8984, 1003.1548, 11738.5977, 2561.3484, -2991.9038, 4683.5881, -5495.0564, 2953.3575, 2038.8206, -1016.4128, -1252.9206, -1247.6939, 3124.2432, -359.0977, 5567.7165, -5572.0781, 1319.3323, 1134.1041, 2661.2289, 4863.5737, -178.8684, -2522.6594, 36982.5559, -485.1077, -15367.8055, 3329.9908, -1351.1044, -12650.1528, 4013.1611, 20297.8458, 6596.0765, 6150.0566, -319.3888, 2871.3266, 971.2507, -84.3553, 3190.0386, 7729.8117, 10181.3915, -1075.5595, 13569.0168, 3927.7516, 1080.5741, 2329.5801, -296.8889, -4774.8182, -2639.101, 5376.1262, 1726.8075, 9416.3419, 1974.5591, 3026.1021, -428.5109, 101.7383, 708.6965, 3806.3385, -5058.7232, 987.7361, -513.6281, -1414.1311])

In [49]:
# Intercepts
lr_model.intercept

1700744.539585983

In [50]:
# Prediction
pred_results_lr=lr_model.evaluate(testDF)

In [51]:
pred_results_lr.predictions.show()

+--------------------+---------+------------------+
|independent_features|SalePrice|        prediction|
+--------------------+---------+------------------+
|(76,[0,1,4,8,15,1...|    81000| 78111.17403904535|
|(76,[0,2,3,6,8,12...|   265979|247877.47115219804|
|(76,[0,2,3,8,9,12...|   153900| 152602.2934732379|
|(76,[0,2,3,8,11,1...|   164700|156553.91406760528|
|(76,[0,2,3,8,11,1...|   199900|185608.80256826105|
|(76,[0,2,3,8,12,1...|   239000|251621.75504089007|
|(76,[0,2,4,7,8,11...|    93000| 156241.1076362012|
|(76,[0,2,4,8,12,1...|   135000|136276.19655683707|
|(76,[0,2,4,8,12,1...|   159500|199258.17998644453|
|(76,[0,2,6,8,9,12...|   124000| 123821.7729776036|
|(76,[0,2,6,8,11,1...|   176000|140215.15490507404|
|(76,[0,2,6,8,12,1...|   256000|253779.73397805402|
|(76,[0,2,6,8,12,1...|   155000|194999.41995743616|
|(76,[0,2,6,8,12,1...|    96500| 53863.15823792829|
|(76,[0,2,6,8,15,1...|   152000|161659.04724714858|
|(76,[0,2,8,9,12,1...|    40000| 83433.44355540932|
|(76,[0,2,8,

In [52]:
# Printing metrics
print("Linear Regression - RMSE: ",pred_results_lr.meanSquaredError)
print("Linear Regression - MAE: ",pred_results_lr.meanAbsoluteError)
print("Linear Regression - R-squared: ",pred_results_lr.r2)

Linear Regression - RMSE:  847888323.1715356
Linear Regression - MAE:  21392.145999596385
Linear Regression - R-squared:  0.8650209198883039


In [53]:
# Saving the metrics to a file
with open("Train_data_metrics.csv",'a') as f:
    print("Training Dataset Metrics ", file=f)
    print("Linear Regression - RMSE: ",pred_results_lr.meanSquaredError, file=f)
    print("Linear Regression - MAE: ",pred_results_lr.meanAbsoluteError, file=f)
    print("Linear Regression - R-squared: ",pred_results_lr.r2, file=f)

In [54]:
# Random Forest Regression
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(featuresCol='independent_features', labelCol='SalePrice',numTrees=20, maxDepth=7)
rf.setSeed(17)
rf_model = rf.fit(trainDF)

In [55]:
pred_results_rf=rf_model.transform(testDF)

In [56]:
pred_results_rf.show()

+--------------------+---------+------------------+
|independent_features|SalePrice|        prediction|
+--------------------+---------+------------------+
|(76,[0,1,4,8,15,1...|    81000|113093.79036637221|
|(76,[0,2,3,6,8,12...|   265979|254011.68384945308|
|(76,[0,2,3,8,9,12...|   153900|143197.31857304118|
|(76,[0,2,3,8,11,1...|   164700| 170827.6406464277|
|(76,[0,2,3,8,11,1...|   199900|188843.20897964516|
|(76,[0,2,3,8,12,1...|   239000|229571.61953414595|
|(76,[0,2,4,7,8,11...|    93000| 97651.46666185487|
|(76,[0,2,4,8,12,1...|   135000| 138732.3791941519|
|(76,[0,2,4,8,12,1...|   159500| 183267.1057597694|
|(76,[0,2,6,8,9,12...|   124000|125660.15605513968|
|(76,[0,2,6,8,11,1...|   176000|177378.05356553514|
|(76,[0,2,6,8,12,1...|   256000|221092.75470534485|
|(76,[0,2,6,8,12,1...|   155000| 188909.6741650182|
|(76,[0,2,6,8,12,1...|    96500| 98379.06987803143|
|(76,[0,2,6,8,15,1...|   152000|138057.07096057403|
|(76,[0,2,8,9,12,1...|    40000|105092.69442645519|
|(76,[0,2,8,

In [57]:
# Gradient Boost Regression
from pyspark.ml.regression import GBTRegressor
gbtr = GBTRegressor(featuresCol='independent_features', labelCol='SalePrice', maxIter=17)
gbr_model = gbtr.fit(trainDF)

In [58]:
pred_results_gb = gbr_model.transform(testDF)

In [59]:
pred_results_gb.show()

+--------------------+---------+------------------+
|independent_features|SalePrice|        prediction|
+--------------------+---------+------------------+
|(76,[0,1,4,8,15,1...|    81000| 90830.54312165102|
|(76,[0,2,3,6,8,12...|   265979|  244907.006134428|
|(76,[0,2,3,8,9,12...|   153900|    122665.5843316|
|(76,[0,2,3,8,11,1...|   164700|179627.46214102293|
|(76,[0,2,3,8,11,1...|   199900| 200318.6620309268|
|(76,[0,2,3,8,12,1...|   239000|231142.75355385136|
|(76,[0,2,4,7,8,11...|    93000| 67419.21244850337|
|(76,[0,2,4,8,12,1...|   135000|140627.40997521905|
|(76,[0,2,4,8,12,1...|   159500|218067.05168094335|
|(76,[0,2,6,8,9,12...|   124000|104964.50728494556|
|(76,[0,2,6,8,11,1...|   176000| 174440.8209731902|
|(76,[0,2,6,8,12,1...|   256000|  142399.427979207|
|(76,[0,2,6,8,12,1...|   155000|162926.50989702842|
|(76,[0,2,6,8,12,1...|    96500|119275.89851125528|
|(76,[0,2,6,8,15,1...|   152000| 129900.5331088651|
|(76,[0,2,8,9,12,1...|    40000| 94949.34302471572|
|(76,[0,2,8,

In [60]:
# Evaluating metrics for RF and GB regression algorithms
from pyspark.ml.evaluation import RegressionEvaluator
def metrics(df, model_name):
  rmse=RegressionEvaluator(labelCol="SalePrice", predictionCol="prediction", metricName="rmse")
  rmse=rmse.evaluate(df) 
 
  mae=RegressionEvaluator(labelCol="SalePrice", predictionCol="prediction", metricName="mae")
  mae=mae.evaluate(df) 
 
  r2=RegressionEvaluator(labelCol="SalePrice", predictionCol="prediction", metricName="r2")
  r2=r2.evaluate(df)

  with open("Train_data_metrics.csv",'a') as f:
    print("{0} - RMSE: {1}".format(model_name,rmse), file=f)
    print("{0} - MAE: {1}".format(model_name,mae), file=f)
    print("{0} - R-squared: {1}".format(model_name,r2), file=f)

metrics(pred_results_rf, "Random Forest Regression")
metrics(pred_results_gb, "Gradient Boost Regression")

In [61]:
# Saving the Models
lr_model.write().overwrite().save("/content/drive/MyDrive/ColabNotebooks/linearmodel")
rf_model.write().overwrite().save("/content/drive/MyDrive/ColabNotebooks/rfmodel")
gbr_model.write().overwrite().save("/content/drive/MyDrive/ColabNotebooks/gbrmodel")
