# **Regression Using Pyspark**

## **HOUSE PRICE PREDICTION**

There are variables, the description are:

Data fields Here's a brief version of what you'll find in the data description file.

* SalePrice - the property's sale price in dollars. This is the target variable
  that you're trying to predict.
* MSZoning: The general zoning classification
* MSSubClass: The building class
* LotFrontage: Linear feet of street connected to property            
* LotArea: Lot size in square feet
* Street: Type of road access
* Alley: Type of alley access
* LotShape: General shape of property
* LandContour: Flatness of the property
* Utilities: Type of utilities available
* LotConfig: Lot configuration
* LandSlope: Slope of property
* Neighborhood: Physical locations within Ames city limits
* Condition1: Proximity to main road or railroad
* Condition2: Proximity to main road or railroad (if a second is present)
* BldgType: Type of dwelling
* HouseStyle: Style of dwelling
* OverallQual: Overall material and finish quality
* OverallCond: Overall condition rating
* YearBuilt: Original construction date
* YearRemodAdd: Remodel date
* RoofStyle: Type of roof
* RoofMatl: Roof material
* Exterior1st: Exterior covering on house
* Exterior2nd: Exterior covering on house (if more than one material)
* MasVnrType: Masonry veneer type
* MasVnrArea: Masonry veneer area in square feet
* ExterQual: Exterior material quality
* ExterCond: Present condition of the material on the exterior
* Foundation: Type of foundation
* BsmtQual: Height of the basement
* BsmtCond: General condition of the basement
* BsmtExposure: Walkout or garden level basement walls
* BsmtFinType1: Quality of basement finished area
* BsmtFinSF1: Type 1 finished square feet
* BsmtFinType2: Quality of second finished area (if present)
* BsmtFinSF2: Type 2 finished square feet
* BsmtUnfSF: Unfinished square feet of basement area
* TotalBsmtSF: Total square feet of basement area
* Heating: Type of heating
* HeatingQC: Heating quality and condition
* CentralAir: Central air conditioning
* Electrical: Electrical system
* 1stFlrSF: First Floor square feet
* 2ndFlrSF: Second floor square feet
* LowQualFinSF: Low quality finished square feet (all floors)
* GrLivArea: Above grade (ground) living area square feet
* BsmtFullBath: Basement full bathrooms
* BsmtHalfBath: Basement half bathrooms
* FullBath: Full bathrooms above grade
* HalfBath: Half baths above grade
* Bedroom: Number of bedrooms above basement level
* Kitchen: Number of kitchens
* KitchenQual: Kitchen quality
* TotRmsAbvGrd: Total rooms above grade (does not include bathrooms)
* Functional: Home functionality rating
* Fireplaces: Number of fireplaces
* FireplaceQu: Fireplace quality
* GarageType: Garage location
* GarageYrBlt: Year garage was built
* GarageFinish: Interior finish of the garage
* GarageCars: Size of garage in car capacity
* GarageArea: Size of garage in square feet
* GarageQual: Garage quality
* GarageCond: Garage condition
* PavedDrive: Paved driveway
* WoodDeckSF: Wood deck area in square feet
* OpenPorchSF: Open porch area in square feet
* EnclosedPorch: Enclosed porch area in square feet
* 3SsnPorch: Three season porch area in square feet
* ScreenPorch: Screen porch area in square feet
* PoolArea: Pool area in square feet
* PoolQC: Pool quality
* Fence: Fence quality
* MiscFeature: Miscellaneous feature not covered in other categories
* MiscVal: $Value of miscellaneous feature
* MoSold: Month Sold
* YrSold: Year Sold
* SaleType: Type of sale
* SaleCondition: Condition of sale




### **Pyspark Initializasing**

In [None]:
!pip3 -q install pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('HousePricePrediction').master("local[*]").getOrCreate()
sc = spark.sparkContext

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


###  **Load Libraries**

In [None]:
# Data Frame spark profiling
from pyspark.sql.types import IntegerType, StringType, DoubleType, ShortType, DecimalType
import pyspark.sql.functions as func
from pyspark.sql.functions import isnull
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import mean
from pyspark.sql.functions import round
from pyspark.sql.types import Row
import matplotlib.pyplot as plt
from pyspark.sql.functions import udf

# Modeling + Evaluation
from pyspark.ml.feature import VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer
from pyspark.sql.functions import when
from pyspark.sql import functions as F
from pyspark.sql.functions import avg
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import DecisionTreeRegressor, RandomForestRegressor, GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from sklearn.metrics import log_loss
from pyspark.sql.functions import corr
import pyspark.sql.functions as fn
from pyspark.sql.functions import rank,sum,col
from pyspark.sql import Window

window = Window.rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)

### **Load Data to Spark DataFrame**

In [None]:
!wget -O data.csv -q https://raw.githubusercontent.com/SuyashMaury/House-Price-Prediction-using-PySpark/main/data.csv


In [None]:
data= spark.read.csv('data.csv', inferSchema=True,header=True)
data.show(5)


+---+----------+--------+-----------+-------+------+-----+--------+-----------+---------+---------+---------+------------+----------+----------+--------+----------+-----------+-----------+---------+------------+---------+--------+-----------+-----------+----------+----------+---------+---------+----------+--------+--------+------------+------------+----------+------------+----------+---------+-----------+-------+---------+----------+----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+-----------+------------+----------+----------+-----------+----------+-----------+------------+----------+----------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+------+-----+-----------+-------+------+------+--------+-------------+---------+
| Id|MSSubClass|MSZoning|LotFrontage|LotArea|Street|Alley|LotShape|LandContour|Utilities|LotConfig|LandSlope|Neighborhood|Condition1|Condition

### Check Data

In [None]:
type(data)

pyspark.sql.dataframe.DataFrame

In [None]:
#Print Schema
len(data.columns), data.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- MSSubClass: integer (nullable = true)
 |-- MSZoning: string (nullable = true)
 |-- LotFrontage: string (nullable = true)
 |-- LotArea: integer (nullable = true)
 |-- Street: string (nullable = true)
 |-- Alley: string (nullable = true)
 |-- LotShape: string (nullable = true)
 |-- LandContour: string (nullable = true)
 |-- Utilities: string (nullable = true)
 |-- LotConfig: string (nullable = true)
 |-- LandSlope: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Condition1: string (nullable = true)
 |-- Condition2: string (nullable = true)
 |-- BldgType: string (nullable = true)
 |-- HouseStyle: string (nullable = true)
 |-- OverallQual: integer (nullable = true)
 |-- OverallCond: integer (nullable = true)
 |-- YearBuilt: integer (nullable = true)
 |-- YearRemodAdd: integer (nullable = true)
 |-- RoofStyle: string (nullable = true)
 |-- RoofMatl: string (nullable = true)
 |-- Exterior1st: string (nullable = true)
 |--

(81, None)

In [None]:
#rename Target('SalePrice') to 'label
df_final = data.withColumnRenamed('SalePrice','label')

In [None]:
#Change data types in data train
df_final=df_final.withColumn("LotFrontage", df_final["LotFrontage"].cast(IntegerType()))
df_final=df_final.withColumn("OverallQual", df_final["OverallQual"].cast(StringType()))
df_final=df_final.withColumn("OverallCond", df_final["OverallCond"].cast(StringType()))
df_final=df_final.withColumn("MasVnrArea", df_final["MasVnrArea"].cast(IntegerType()))
df_final=df_final.withColumn("GarageYrBlt", df_final["GarageYrBlt"].cast(IntegerType()))

In [None]:
df_final.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- MSSubClass: integer (nullable = true)
 |-- MSZoning: string (nullable = true)
 |-- LotFrontage: integer (nullable = true)
 |-- LotArea: integer (nullable = true)
 |-- Street: string (nullable = true)
 |-- Alley: string (nullable = true)
 |-- LotShape: string (nullable = true)
 |-- LandContour: string (nullable = true)
 |-- Utilities: string (nullable = true)
 |-- LotConfig: string (nullable = true)
 |-- LandSlope: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Condition1: string (nullable = true)
 |-- Condition2: string (nullable = true)
 |-- BldgType: string (nullable = true)
 |-- HouseStyle: string (nullable = true)
 |-- OverallQual: string (nullable = true)
 |-- OverallCond: string (nullable = true)
 |-- YearBuilt: integer (nullable = true)
 |-- YearRemodAdd: integer (nullable = true)
 |-- RoofStyle: string (nullable = true)
 |-- RoofMatl: string (nullable = true)
 |-- Exterior1st: string (nullable = true)
 |-- 

### Categorical and Nummerical variable in df-final

In [None]:
#Categorical and numerical variable
#just will select string data type
cat_cols = [item[0] for item in df_final.dtypes if item[1].startswith('string')]
print("cat_cols:", cat_cols)

#just will select integer or double data type
num_cols = [item[0] for item in df_final.dtypes if item[1].startswith('int') | item[1].startswith('double')]
print("num_cols:", num_cols)

cat_cols: ['MSZoning', 'Street', 'Alley', 'LotShape', 'LandContour', 'Utilities', 'LotConfig', 'LandSlope', 'Neighborhood', 'Condition1', 'Condition2', 'BldgType', 'HouseStyle', 'OverallQual', 'OverallCond', 'RoofStyle', 'RoofMatl', 'Exterior1st', 'Exterior2nd', 'MasVnrType', 'ExterQual', 'ExterCond', 'Foundation', 'BsmtQual', 'BsmtCond', 'BsmtExposure', 'BsmtFinType1', 'BsmtFinType2', 'Heating', 'HeatingQC', 'CentralAir', 'Electrical', 'KitchenQual', 'Functional', 'FireplaceQu', 'GarageType', 'GarageFinish', 'GarageQual', 'GarageCond', 'PavedDrive', 'PoolQC', 'Fence', 'MiscFeature', 'SaleType', 'SaleCondition']
num_cols: ['Id', 'MSSubClass', 'LotFrontage', 'LotArea', 'YearBuilt', 'YearRemodAdd', 'MasVnrArea', 'BsmtFinSF1', 'BsmtFinSF2', 'BsmtUnfSF', 'TotalBsmtSF', '1stFlrSF', '2ndFlrSF', 'LowQualFinSF', 'GrLivArea', 'BsmtFullBath', 'BsmtHalfBath', 'FullBath', 'HalfBath', 'BedroomAbvGr', 'KitchenAbvGr', 'TotRmsAbvGrd', 'Fireplaces', 'GarageYrBlt', 'GarageCars', 'GarageArea', 'WoodDeckS

In [None]:
#Save column Id
num_id=num_cols.pop(0)
print("num_id:", num_id)
num_id=[num_id]
print(num_id)
#Remove column 'label' from numerical columns group
num_cols.remove('label') #label is removed because it's the target to validate the model
print("num_cols:", num_cols)

num_id: Id
['Id']
num_cols: ['MSSubClass', 'LotFrontage', 'LotArea', 'YearBuilt', 'YearRemodAdd', 'MasVnrArea', 'BsmtFinSF1', 'BsmtFinSF2', 'BsmtUnfSF', 'TotalBsmtSF', '1stFlrSF', '2ndFlrSF', 'LowQualFinSF', 'GrLivArea', 'BsmtFullBath', 'BsmtHalfBath', 'FullBath', 'HalfBath', 'BedroomAbvGr', 'KitchenAbvGr', 'TotRmsAbvGrd', 'Fireplaces', 'GarageYrBlt', 'GarageCars', 'GarageArea', 'WoodDeckSF', 'OpenPorchSF', 'EnclosedPorch', '3SsnPorch', 'ScreenPorch', 'PoolArea', 'MiscVal', 'MoSold', 'YrSold']


In [None]:
#count number of observation
df_final.count()

1460

In [None]:
# Check summary statistic of numerical columns
df_final.select(num_cols).describe().show()

+-------+------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+-----------------+-----------------+------------------+-----------------+------------------+-----------------+-----------------+-------------------+--------------------+------------------+-------------------+------------------+-------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+
|summary|        MSSubClass|      LotFrontage|           LotArea|         YearBuilt|      YearRemodAdd|        MasVnrArea|       BsmtFinSF1|       BsmtFinSF2|        BsmtUnfSF|       TotalBsmtSF|         1stFlrSF|          2ndFlrSF|     LowQualFinSF|        GrLivArea|       BsmtFullBath|        BsmtHalfBath|          FullBath|           HalfBath| 

In [None]:
df_final.head()


Row(Id=1, MSSubClass=60, MSZoning='RL', LotFrontage=65, LotArea=8450, Street='Pave', Alley='NA', LotShape='Reg', LandContour='Lvl', Utilities='AllPub', LotConfig='Inside', LandSlope='Gtl', Neighborhood='CollgCr', Condition1='Norm', Condition2='Norm', BldgType='1Fam', HouseStyle='2Story', OverallQual='7', OverallCond='5', YearBuilt=2003, YearRemodAdd=2003, RoofStyle='Gable', RoofMatl='CompShg', Exterior1st='VinylSd', Exterior2nd='VinylSd', MasVnrType='BrkFace', MasVnrArea=196, ExterQual='Gd', ExterCond='TA', Foundation='PConc', BsmtQual='Gd', BsmtCond='TA', BsmtExposure='No', BsmtFinType1='GLQ', BsmtFinSF1=706, BsmtFinType2='Unf', BsmtFinSF2=0, BsmtUnfSF=150, TotalBsmtSF=856, Heating='GasA', HeatingQC='Ex', CentralAir='Y', Electrical='SBrkr', 1stFlrSF=856, 2ndFlrSF=854, LowQualFinSF=0, GrLivArea=1710, BsmtFullBath=1, BsmtHalfBath=0, FullBath=2, HalfBath=1, BedroomAbvGr=3, KitchenAbvGr=1, KitchenQual='Gd', TotRmsAbvGrd=8, Functional='Typ', Fireplaces=0, FireplaceQu='NA', GarageType='Attc

## **Check Missing Value in data**

In [None]:
#Check Missing Value in Pyspark Dataframe
def count_nulls(df_final):
    """Input pyspark dataframe and return list of columns with missing value and it's total value"""
    null_counts = []          #make an empty list to hold our results
    for col in df_final.dtypes:     #iterate through the column data types we saw above, e.g. ('C0', 'bigint')
        cname = col[0]        #splits out the column name, e.g. 'C0'
        ctype = col[1]        #splits out the column type, e.g. 'bigint'
        nulls = df_final.where( df_final[cname].isNull() ).count() #check count of null in column name
        result = tuple([cname, nulls])  #new tuple, (column name, null count)
        null_counts.append(result)      #put the new tuple in our result list
    null_counts=[(x,y) for (x,y) in null_counts if y!=0]  #view just columns that have missing values
    return null_counts

In [None]:
#call function check missing values
null_counts = count_nulls(df_final)
null_counts

[('LotFrontage', 259), ('MasVnrArea', 8), ('GarageYrBlt', 81)]

#### From null_counts, we just take information of columns name and save in list "list_cols_miss", like in the script below:

In [None]:
list_cols_miss=[x[0] for x in null_counts]
list_cols_miss

['LotFrontage', 'MasVnrArea', 'GarageYrBlt']

##### From list_cols_miss create dataframe called "df_miss". The aim is to define categorical and numerical columns which have missing values. As the process of handling missing values will be diffrent betwen numerical dan categorical variables.

In [None]:
#Create dataframe which just has list_cols_miss
df_miss= df_final.select(*list_cols_miss)

#view data types in df_miss
df_miss.dtypes

[('LotFrontage', 'int'), ('MasVnrArea', 'int'), ('GarageYrBlt', 'int')]

#### After we create "df_miss", we split to categorical and numerical columns which have missing value. for categorical columns we called "catcolums_miss" and for numerical columns we called "numcolumns_miss". Like in script below:

In [None]:
#Define categorical columns and numerical columns which have missing value.
### for categorical columns
catcolums_miss=[item[0] for item in df_miss.dtypes if item[1].startswith('string')]  #will select name of column with string data type
print("catcolums_miss:", catcolums_miss)

### for numerical columns
numcolumns_miss = [item[0] for item in df_miss.dtypes if item[1].startswith('int') | item[1].startswith('double')] #will select name of column with integer or double data type
print("numcolumns_miss:", numcolumns_miss)

catcolums_miss: []
numcolumns_miss: ['LotFrontage', 'MasVnrArea', 'GarageYrBlt']


## Handle Missing Values

For categorical columns we use most frequent to fill missing values. Therefore we have to count categories which has max values in each columns by counting and sorting descenting each columns in dataframe with no has missing values. Therefore we drop missing values and save in new dataframe called "df_Nomiss". For numerical columns we fill missing values with average in it's columns.

### **Note: We just have numerical columns which have missing value**

In [None]:
#fill missing value in numerical variable with average
for i in numcolumns_miss:
    meanvalue = df_final.select(round(mean(i))).collect()[0][0] #calculate average in each numerical column
    print(i, meanvalue) #print name of columns and it's average value
    df_final=df_final.na.fill({i:meanvalue}) #fill missing value in each columns with it's average value

LotFrontage 70.0
MasVnrArea 104.0
GarageYrBlt 1979.0


In [None]:
#Check Missing value after filling
null_counts = count_nulls(df_final)
null_counts

[]

Now, output null_counts is null, it's mean no more columns that have missing values.


#  **EDA**
## Here i do some EDA on categorical column using SQL code  and i also made some  visualization charts using mongodb server that is mentioned in PPT

In [None]:
df_final.createOrReplaceTempView("my_table")

# List of categorical columns
categorical_columns = ['Utilities', 'LotConfig', 'LandSlope', 'Neighborhood', 'Condition1', 'Condition2', 'BldgType', 'BsmtFinType1', 'BsmtFinType2', 'Heating', 'HeatingQC']

# Iterate over each categorical column
for column in categorical_columns:
    # Using SQL query to count categories for the current column
    query = f"""SELECT {column}, COUNT(*) as CategoryCount,COUNT(*) / (SELECT COUNT(*) FROM my_table) * 100 as Percentage FROM my_table GROUP BY {column}"""
    # Execute the query
    result = spark.sql(query)
    # Display the result
    print(f"Counts and Percentage for {column} column:")
    result.show()

Counts and Percentage for Utilities column:
+---------+-------------+------------------+
|Utilities|CategoryCount|        Percentage|
+---------+-------------+------------------+
|   NoSeWa|            1|0.0684931506849315|
|   AllPub|         1459| 99.93150684931507|
+---------+-------------+------------------+

Counts and Percentage for LotConfig column:
+---------+-------------+------------------+
|LotConfig|CategoryCount|        Percentage|
+---------+-------------+------------------+
|   Inside|         1052| 72.05479452054794|
|      FR3|            4| 0.273972602739726|
|   Corner|          263|18.013698630136986|
|  CulDSac|           94| 6.438356164383562|
|      FR2|           47| 3.219178082191781|
+---------+-------------+------------------+

Counts and Percentage for LandSlope column:
+---------+-------------+------------------+
|LandSlope|CategoryCount|        Percentage|
+---------+-------------+------------------+
|      Sev|           13|0.8904109589041096|
|      Gtl|

## Insignificant Categories in Data
From  above we can see, there are much categories with minimal number let's say under 7 from total observations 1460. Those minimal numbers of categories called insignificant categories and will replace with the largest numbers of catories in each categorical columns.

---



---



In [None]:
# REPLACING Insignificant Categories in Data
threshold=97
threshold2=0.7

In [None]:
def replace_cat2(f,cols):
    """input are dataframe and categorical variables, replace insignificant categories (percentage <=0.7) with largest number
    of catgories and output is new dataframe """
    df_percent=f.groupBy(cols).count().sort(col("count").desc())\
                .withColumn('total',sum(col('count')).over(window))\
                .withColumn('Percent',col('count')*100/col('total')) #calculate the percentage-save in Percent columns from each categories
    dominant_cat=df_percent.select(df_percent['Percent']).collect()[0][0] #calculate the highest percentage of category
    count_dist=f.select([cols]).distinct().count() #calculate distinct values in that columns
    if count_dist > 2 and dominant_cat <= threshold :
        print('column:', cols)
        cols_names.append(cols)  #combine with previous list
        replacement=f.groupBy(cols).count().sort(col("count").desc()).collect()[0][0] #define dominant category
        print("replacement:",replacement)
        replacing.append(replacement) #combine with previous list
        insign_cat=df_percent.filter(df_percent['Percent']< threshold2).select(df_percent[cols]).collect() #calculate insignificant categories
        insign_cat=[r[cols] for r in insign_cat] #just take the values
        category.append(insign_cat) #combine with previous list
        print("insign_cat:",insign_cat)
        f=f.replace(insign_cat,replacement, cols) #replace insignificant categories with dominant categories
    return f

In [None]:
df_percent=df_final.groupBy('Condition1').count().sort(col("count").desc())\
                    .withColumn('total',sum(col('count')).over(window))\
                    .withColumn('Percent',col('count')*100/col('total')) #calculate the percentage-save in Percent columns from each categories
df_percent.show()
#dominant_cat=df_percent.select(df_percent['Percent']).collect()[0][0]
#dominant_cat

+----------+-----+-----+------------------+
|Condition1|count|total|           Percent|
+----------+-----+-----+------------------+
|      Norm| 1260| 1460|  86.3013698630137|
|     Feedr|   81| 1460|5.5479452054794525|
|    Artery|   48| 1460| 3.287671232876712|
|      RRAn|   26| 1460|1.7808219178082192|
|      PosN|   19| 1460|1.3013698630136987|
|      RRAe|   11| 1460|0.7534246575342466|
|      PosA|    8| 1460| 0.547945205479452|
|      RRNn|    5| 1460|0.3424657534246575|
|      RRNe|    2| 1460| 0.136986301369863|
+----------+-----+-----+------------------+



In [None]:
#call function replacing insignificant categories
replacing=[]
cols_names=[]
category=[]
for cols in cat_cols:
    df_final=replace_cat2(df_final,cols)

column: MSZoning
replacement: RL
insign_cat: ['C (all)']
column: Alley
replacement: NA
insign_cat: []
column: LotShape
replacement: Reg
insign_cat: ['IR3']
column: LandContour
replacement: Lvl
insign_cat: []
column: LotConfig
replacement: Inside
insign_cat: ['FR3']
column: LandSlope
replacement: Gtl
insign_cat: []
column: Neighborhood
replacement: NAmes
insign_cat: ['NPkVill', 'Blueste']
column: Condition1
replacement: Norm
insign_cat: ['PosA', 'RRNn', 'RRNe']
column: BldgType
replacement: 1Fam
insign_cat: []
column: HouseStyle
replacement: 1Story
insign_cat: ['2.5Fin']
column: OverallQual
replacement: 5
insign_cat: ['2', '1']
column: OverallCond
replacement: 5
insign_cat: ['2', '1']
column: RoofStyle
replacement: Gable
insign_cat: ['Mansard', 'Shed']
column: Exterior1st
replacement: VinylSd
insign_cat: ['Stone', 'BrkComm', 'AsphShn', 'ImStucc', 'CBlock']
column: Exterior2nd
replacement: VinylSd
insign_cat: ['ImStucc', 'Brk Cmn', 'Stone', 'AsphShn', 'Other', 'CBlock']
column: MasVnrTyp

In [None]:
type(df_final)

pyspark.sql.dataframe.DataFrame

In [None]:
#check length in list cols_names, category and replacing
len(cols_names), len(category), len(replacing)

(38, 38, 38)

Those three list will be created into dataframe called g.

In [None]:
#Create dataframe of replaced categories
g=spark.createDataFrame(list(zip(cols_names, replacing, category)),['cols_names', 'replacing', 'category'])
g.show(9)

+------------+---------+------------------+
|  cols_names|replacing|          category|
+------------+---------+------------------+
|    MSZoning|       RL|         [C (all)]|
|       Alley|       NA|                []|
|    LotShape|      Reg|             [IR3]|
| LandContour|      Lvl|                []|
|   LotConfig|   Inside|             [FR3]|
|   LandSlope|      Gtl|                []|
|Neighborhood|    NAmes|[NPkVill, Blueste]|
|  Condition1|     Norm|[PosA, RRNn, RRNe]|
|    BldgType|     1Fam|                []|
+------------+---------+------------------+
only showing top 9 rows



In [None]:
g.dtypes

[('cols_names', 'string'),
 ('replacing', 'string'),
 ('category', 'array<string>')]

In [None]:
g.printSchema()

root
 |-- cols_names: string (nullable = true)
 |-- replacing: string (nullable = true)
 |-- category: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [None]:
f_percent=df_final.groupBy('MSZoning').count().sort(col("count").desc())\
                       .withColumn('total',sum(col('count')).over(window))\
                       .withColumn('Percent',col('count')*100/col('total'))

In [None]:
f_percent.show()

+--------+-----+-----+------------------+
|MSZoning|count|total|           Percent|
+--------+-----+-----+------------------+
|      RL| 1161| 1460| 79.52054794520548|
|      RM|  218| 1460|14.931506849315069|
|      FV|   65| 1460|4.4520547945205475|
|      RH|   16| 1460| 1.095890410958904|
+--------+-----+-----+------------------+



In [None]:
f_percent.show()

+--------+-----+-----+------------------+
|MSZoning|count|total|           Percent|
+--------+-----+-----+------------------+
|      RL| 1161| 1460| 79.52054794520548|
|      RM|  218| 1460|14.931506849315069|
|      FV|   65| 1460|4.4520547945205475|
|      RH|   16| 1460| 1.095890410958904|
+--------+-----+-----+------------------+



## Handle of outlier in data

Outlier is observations that fall below lower side or above upper side.

To handle outlier we approach by replacing the value greater than upper side with upper side value and also replacing the value lower than lower side with lower side value. So, we need calculate upper and lower side from quantile value, quantile is probability distribution of variable. In General, there are three quantile:

* Q1 = the value that cut off 25% of the first data when it is sorted in ascending order.
* Q2 = cut off data, or median, it's 50 % of the data
* Q3 = the value that cut off 75% of the first data when it is sorted in ascending order.
IQR or interquartile range is range between Q1 and Q3. IQR = Q3 - Q1.

Upper side = Q3 + 1.5 * IQR

Lower side = Q1 - 1.5 * IQR

In [None]:
df_final.select(num_cols).describe().show()

+-------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+-----------------+------------------+-----------------+------------------+-----------------+-----------------+-------------------+--------------------+------------------+-------------------+------------------+-------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+
|summary|        MSSubClass|       LotFrontage|           LotArea|         YearBuilt|      YearRemodAdd|        MasVnrArea|       BsmtFinSF1|       BsmtFinSF2|        BsmtUnfSF|       TotalBsmtSF|         1stFlrSF|          2ndFlrSF|     LowQualFinSF|        GrLivArea|       BsmtFullBath|        BsmtHalfBath|          FullBath|           HalfBath

### Calculate upper&lower side in  dataframe

In [None]:
#create quantile dataframe
def quantile(e):
    """Input is dataframe and return new dataframe with value of quantile from numerical columns"""
    percentiles = [0.25, 0.5, 0.75]
    quant=spark.createDataFrame(zip(percentiles, *e.approxQuantile(num_cols, percentiles, 0.0)),
                               ['percentile']+num_cols) #calculate quantile from pyspark dataframe, 0.0 is relativeError,
                               #The relative target precision to achieve (>= 0). If set to zero,#the exact quantiles are computed, which could be very expensive
                                #and aggregate the result with percentiles variable, #then create pyspark dataframe
    return quant

In [None]:
#call function quantile
quantile=quantile(df_final)

In [None]:
#function to calculate upper side
def upper_value(b,c):
    """Input is quantile dataframe and name of numerical column and Retrun upper value from the column"""
    q1 = b.select(c).collect()[0][0] #select value of q1 from the column
    q2 = b.select(c).collect()[1][0] #select value of q2 from the column
    q3 = b.select(c).collect()[2][0] #select value of q3 from the column
    IQR=q3-q1  #calculate the value of IQR
    upper= q3 + (IQR*1.5)   #calculate the value of upper side
    return upper

In [None]:
#function to calculate lower side
def lower_value(b,c):
    """Input is quantile dataframe and name of numerical column and Retrun lower value from the column"""
    q1 = b.select(c).collect()[0][0] #select value of q1 from the column
    q2 = b.select(c).collect()[1][0] #select value of q2 from the column
    q3 = b.select(c).collect()[2][0] #select value of q3 from the column
    IQR=q3-q1                   #calculate the value of IQR
    lower= q1 - (IQR*1.5)       #calculate the value of lower side
    return lower

### Replacing the outlier

In [None]:
#function for replacing outlier by upper side
def replce_outlier_up2(d,col, value):
    """Input is name of numerical column and it's upper side value"""
    #global d
    d=d.withColumn(col, F.when(d[col] > value , value).otherwise(d[col]))
    return d

In [None]:
#function for replacing outlier by lower side
def replce_outlier_low2(d,col, value):
    """Input is name of numerical column and it's lower side value"""
    #global df_final
    d=d.withColumn(col, F.when(d[col] < value , value).otherwise(d[col]))
    return d

In [None]:
#call function to calculate lower side and replace value under lower side with value lower side
for i in num_cols:
    lower=lower_value(quantile,i)
    df_final=replce_outlier_low2(df_final, i, lower)

In [None]:
#call function to calculate upper side and replace value above upper side with value upper side
for x in num_cols:
    upper=upper_value(quantile,x)
    df_final=replce_outlier_up2(df_final, x, upper)

In [None]:
type(df_final)

pyspark.sql.dataframe.DataFrame

Check distribution after replacing outlier

In [None]:
df_final.select(num_cols).describe().show()

+-------+-----------------+-----------------+------------------+------------------+------------------+-----------------+-----------------+----------+-----------------+------------------+-----------------+------------------+------------+------------------+------------------+------------+------------------+-------------------+------------------+------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+-------------+---------+-----------+--------+-------+------------------+------------------+
|summary|       MSSubClass|      LotFrontage|           LotArea|         YearBuilt|      YearRemodAdd|       MasVnrArea|       BsmtFinSF1|BsmtFinSF2|        BsmtUnfSF|       TotalBsmtSF|         1stFlrSF|          2ndFlrSF|LowQualFinSF|         GrLivArea|      BsmtFullBath|BsmtHalfBath|          FullBath|           HalfBath|      BedroomAbvGr|KitchenAbvGr|     TotRmsAbvGrd|        Fireplaces|       GarageYrBlt|     

## **String Indexer and One Hot Encoder for categorical variables**

In [None]:
categorical_cols = ['MSZoning', 'Street', 'Alley', 'LotShape', 'LandContour', 'Utilities', 'LotConfig', 'LandSlope', 'Neighborhood', 'Condition1', 'Condition2', 'BldgType', 'HouseStyle', 'OverallQual', 'OverallCond', 'RoofStyle', 'RoofMatl', 'Exterior1st', 'Exterior2nd', 'MasVnrType', 'ExterQual', 'ExterCond', 'Foundation', 'BsmtQual', 'BsmtCond', 'BsmtExposure', 'BsmtFinType1', 'BsmtFinType2', 'Heating', 'HeatingQC', 'CentralAir', 'Electrical', 'KitchenQual', 'Functional', 'FireplaceQu', 'GarageType', 'GarageFinish', 'GarageQual', 'GarageCond', 'PavedDrive', 'PoolQC', 'Fence', 'MiscFeature', 'SaleType', 'SaleCondition']
numerical_cols = ['Id', 'MSSubClass', 'LotFrontage', 'LotArea', 'YearBuilt', 'YearRemodAdd', 'MasVnrArea', 'BsmtFinSF1', 'BsmtFinSF2', 'BsmtUnfSF', 'TotalBsmtSF', '1stFlrSF', '2ndFlrSF', 'LowQualFinSF', 'GrLivArea', 'BsmtFullBath', 'BsmtHalfBath', 'FullBath', 'HalfBath', 'BedroomAbvGr', 'KitchenAbvGr', 'TotRmsAbvGrd', 'Fireplaces', 'GarageYrBlt', 'GarageCars', 'GarageArea', 'WoodDeckSF', 'OpenPorchSF', 'EnclosedPorch', '3SsnPorch', 'ScreenPorch', 'PoolArea', 'MiscVal', 'MoSold', 'YrSold']
# Step 1: Indexing categorical features
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index", handleInvalid="keep") for col in categorical_cols]

# Step 2: One-hot encoding indexed categorical features
encoders = [OneHotEncoder(inputCol=col + "_index", outputCol=col + "_encoded") for col in categorical_cols]

# Step 3: Assemble features into a single vector
input_cols = numerical_cols + [col + "_encoded" for col in categorical_cols]
assembler = VectorAssembler(inputCols=input_cols, outputCol="features")

# **Apply Machine Learning Algorithms**

**Algorithms**
* Decision Tree- This algorithm will find the most significant independent variable to create a values.
* Random Forest -This algorithm build multiple decision trees and merges them together and use bagging method.
* Gradient Boosting- This algorithm use boosting ensemble technic. This technique employs the logic in which the subsequent predictors learn from the mistakes of the previous predictors.

**Evaluation**
* To evaluate model we use metrics, below:
  * R squared
 * Mean Absolute Error
 * Mean Squared Error

### **Decision Tree Regressor**

In [None]:
dt = DecisionTreeRegressor(featuresCol="features", labelCol="label")

# Step 5: Create the pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, dt])

# Step 6: Split the data into training and testing sets (80% training, 20% testing)
train_data, test_data = df_final.randomSplit([0.8, 0.2], seed=42)

# Step 7: Fit the pipeline to your training DataFrame
model = pipeline.fit(train_data)

# Step 8: Make predictions on the test data
predictions = model.transform(test_data)

# Display the predictions
predictions.select("label", "prediction").show()

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")
r2_dt = evaluator.evaluate(predictions)
print("R-squared (R2) on test data: {:.3f}".format(r2_dt))

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae")
mae_dt = evaluator.evaluate(predictions)
print("Mean Absolute Error (MAE) on test data: {:.3f}".format(mae_dt))

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mse")
mse_dt = evaluator.evaluate(predictions)
print("Mean Squared Error (MSE) on test data: {:.3f}".format(mse_dt))

+------+------------------+
| label|        prediction|
+------+------------------+
|223500|208481.77083333334|
|307000| 268538.9183673469|
|129900|189008.48101265822|
|279500|259204.49152542374|
|139000|134533.68265682657|
|129900|134533.68265682657|
| 68500| 95391.46428571429|
|309000|259204.49152542374|
|319900|230279.61290322582|
|239686|230279.61290322582|
|249700|259204.49152542374|
|127000|134533.68265682657|
|114500|134533.68265682657|
|180500|189008.48101265822|
|202500|184589.74545454545|
|225000|230279.61290322582|
|127000|112080.31496062993|
|205000|189008.48101265822|
|383970|         386263.75|
|139000|134533.68265682657|
+------+------------------+
only showing top 20 rows

R-squared (R2) on test data: 0.712
Mean Absolute Error (MAE) on test data: 28404.196
Mean Squared Error (MSE) on test data: 1747699787.599


### **Random Forest Regressor**

In [None]:
rf = RandomForestRegressor(featuresCol="features", labelCol="label")

# Step 5: Create the pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, rf])

# Step 6: Split the data into training and testing sets (80% training, 20% testing)
train_data, test_data = df_final.randomSplit([0.8, 0.2], seed=42)

# Step 7: Fit the pipeline to your training DataFrame
model = pipeline.fit(train_data)

# Step 8: Make predictions on the test data
predictions = model.transform(test_data)

# Display the predictions
predictions.select("label", "prediction").show()

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")
r2_rf = evaluator.evaluate(predictions)
print("R-squared (R2) on test data: {:.3f}".format(r2_rf))

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae")
mae_rf = evaluator.evaluate(predictions)
print("Mean Absolute Error (MAE) on test data: {:.3f}".format(mae_rf))

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mse")
mse_rf = evaluator.evaluate(predictions)
print("Mean Squared Error (MSE) on test data: {:.3f}".format(mse_rf))

+------+------------------+
| label|        prediction|
+------+------------------+
|223500|209298.13371491866|
|307000|  247831.286782653|
|129900|161563.35907113558|
|279500|261189.27733907322|
|139000| 133847.8740661714|
|129900|144787.29362738685|
| 68500| 94723.74639378983|
|309000| 319506.3183183296|
|319900|251415.92095800414|
|239686|240966.14393219515|
|249700|261589.70483553354|
|127000|128890.65160832915|
|114500|135917.96002201608|
|180500|169315.74289033754|
|202500|  207033.818476745|
|225000| 216467.9688425953|
|127000|121883.75509757343|
|205000|183186.32383235445|
|383970|364988.84494622453|
|139000|137391.41711368735|
+------+------------------+
only showing top 20 rows

R-squared (R2) on test data: 0.816
Mean Absolute Error (MAE) on test data: 21797.316
Mean Squared Error (MSE) on test data: 1115019905.174


### **GBT Regressor**

In [None]:
gbt = GBTRegressor(featuresCol="features", labelCol="label", maxIter=10)  # You can adjust maxIter as needed

# Step 5: Create the pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, gbt])

# Step 6: Split the data into training and testing sets (80% training, 20% testing)
train_data, test_data = df_final.randomSplit([0.8, 0.2], seed=42)

# Step 7: Fit the pipeline to your training DataFrame
model = pipeline.fit(train_data)

# Step 8: Make predictions on the test data
predictions = model.transform(test_data)

# Display the predictions
predictions.select("label", "prediction").show()

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")
r2_gbt = evaluator.evaluate(predictions)
print("R-squared (R2) on test data: {:.3f}".format(r2_gbt))

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae")
mae_gbt = evaluator.evaluate(predictions)
print("Mean Absolute Error (MAE) on test data: {:.3f}".format(mae_gbt))

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mse")
mse_gbt = evaluator.evaluate(predictions)
print("Mean Squared Error (MSE) on test data: {:.3f}".format(mse_gbt))

+------+------------------+
| label|        prediction|
+------+------------------+
|223500|213887.52737999443|
|307000|283079.21743674704|
|129900|176731.76764702765|
|279500|244247.99809336528|
|139000|128793.28674214259|
|129900|119800.20431777058|
| 68500| 84074.91422691618|
|309000|241224.87282667472|
|319900|245232.27713023953|
|239686|241474.35781267268|
|249700|258436.53232139404|
|127000| 134977.3751494976|
|114500|126309.78860748098|
|180500|182661.94718232905|
|202500|179706.16138912563|
|225000|234735.91274524224|
|127000|105553.89296536303|
|205000|186908.24757062792|
|383970|393880.75447477284|
|139000|127697.67775720375|
+------+------------------+
only showing top 20 rows

R-squared (R2) on test data: 0.759
Mean Absolute Error (MAE) on test data: 25219.257
Mean Squared Error (MSE) on test data: 1459153313.928


## **Applying Algorithm with Hyper Parameter Tuning and Cross validation**

###  Decision Tree Regressor

In [None]:
dt = DecisionTreeRegressor(featuresCol="features", labelCol="label")

# Step 5: Create ParamGrid for hyperparameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [5, 10, 15]) \
    .addGrid(dt.maxBins, [32, 64, 128]) \
    .build()

# Step 6: Create CrossValidator with Decision Tree Regressor and ParamGrid
crossval = CrossValidator(estimator=dt,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2"),
                          numFolds=5)  # You can adjust the number of folds as needed

# Step 7: Create the pipeline with Decision Tree Regressor and CrossValidator
pipeline_dt_cv = Pipeline(stages=indexers + encoders + [assembler, crossval])

# Step 8: Split the data into training and testing sets (80% training, 20% testing)
train_data, test_data = df_final.randomSplit([0.8, 0.2], seed=42)

# Step 9: Fit the pipeline to your training DataFrame
model_dt_cv = pipeline_dt_cv.fit(train_data)

# Step 10: Make predictions on the test data
predictions_dt_cv = model_dt_cv.transform(test_data)

# Step 11: Evaluate the model
evaluator_dt = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")
r2_dt_cv = evaluator_dt.evaluate(predictions_dt_cv)
print("Result Using Hyperparameter and Cross-validation")
print("Decision Tree - R-squared (R2): {:.3f}".format(r2_dt_cv))

Result Using Hyperparameter and Cross-validation
Decision Tree - R-squared (R2): 0.711


### Random Forest Regressor

In [None]:
rf = RandomForestRegressor(featuresCol="features", labelCol="label")

# Step 5: Create ParamGrid for hyperparameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20, 30]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .addGrid(rf.maxBins, [32, 64, 128]) \
    .build()

# Step 6: Create CrossValidator with Random Forest Regressor and ParamGrid
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2"),
                          numFolds=5)  # You can adjust the number of folds as needed

# Step 7: Create the pipeline with Random Forest Regressor and CrossValidator
pipeline_rf_cv = Pipeline(stages=indexers + encoders + [assembler, crossval])

# Step 8: Split the data into training and testing sets (80% training, 20% testing)
train_data, test_data = df_final.randomSplit([0.8, 0.2], seed=42)

# Step 9: Fit the pipeline to your training DataFrame
model_rf_cv = pipeline_rf_cv.fit(train_data)

# Step 10: Make predictions on the test data
predictions_rf_cv = model_rf_cv.transform(test_data)

# Step 11: Evaluate the model
evaluator_rf = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")
r2_rf_cv = evaluator_rf.evaluate(predictions_rf_cv)
print("Result Using Hyperparameter and Cross-validation")
print("Random Forest - R-squared (R2): {:.3f}".format(r2_rf_cv))

Result Using Hyperparameter and Cross-validation
Random Forest - R-squared (R2): 0.884


### GBT Regressor

In [None]:
gbt = GBTRegressor(featuresCol="features", labelCol="label")

# Step 5: Create ParamGrid for hyperparameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [5, 10]) \
    .addGrid(gbt.maxBins, [15,20]) \
    .addGrid(gbt.stepSize, [0.1]) \
    .build()

# Step 6: Create CrossValidator with Gradient-Boosted Tree Regressor and ParamGrid
crossval = CrossValidator(estimator=gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2"),
                          numFolds=5)  # You can adjust the number of folds as needed

# Step 7: Create the pipeline with Gradient-Boosted Tree Regressor and CrossValidator
pipeline_gbt_cv = Pipeline(stages=indexers + encoders + [assembler, crossval])

# Step 8: Split the data into training and testing sets (80% training, 20% testing)
train_data, test_data = df_final.randomSplit([0.8, 0.2], seed=42)

# Step 9: Fit the pipeline to your training DataFrame
model_gbt_cv = pipeline_gbt_cv.fit(train_data)

# Step 10: Make predictions on the test data
predictions_gbt_cv = model_gbt_cv.transform(test_data)

# Step 11: Evaluate the model
evaluator_gbt = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")
r2_gbt_cv = evaluator_gbt.evaluate(predictions_gbt_cv)
print("Result Using Hyperparameter and Cross-validation")
print("Gradient-Boosted Tree - R-squared (R2): {:.3f}".format(r2_gbt_cv))

Result Using Hyperparameter and Cross-validation
Gradient-Boosted Tree - R-squared (R2): 0.743
