**This notebook aims to analyze housing price by using PySpark libraries.**

- Cleaning the data
- Exploratory data analysis
- Created new features
- Corrleation analysis
- Converted categorial data to numerical
- Linear regression for house price prediction
- Hyperparameter tuning

In [None]:
!pip install pyspark

In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os
import seaborn as sns
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
from pyspark.sql import SparkSession


In [None]:
spark  = SparkSession.builder.appName('Dataframe').getOrCreate()

In [None]:
# Read the datasets
df_housing = spark.read.option('header', 'true').csv(os.path.join(dirname, 'housing.csv'), inferSchema = True)

In [None]:
#Showing first 20 rows
df_housing.head(3)

In [None]:
#Check the schema
df_housing.printSchema()

In [None]:
#print column names
df_housing.columns

In [None]:
#Checking data entries for each column
df_housing.select(['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms']).describe().show()

In [None]:
df_housing.select(['population',
 'households',
 'median_income',
 'median_house_value',
 'ocean_proximity']).describe().show()

In [None]:
df_housing = df_housing.withColumnRenamed('median_house_value','price')

In [None]:
#Data imputation
df_housing.na.drop()

In [None]:
type(df_housing['price'])

**Perliminary analysis**

In [None]:
#Checking if the prices are normally distributed
sns.distplot(df_housing.select('price').toPandas(), color="skyblue")
df_housing.select(F.skewness('price'), F.kurtosis('price')).show()

In [None]:
#Housing prices greater than 500,000 (expensive houses)
print("No of houses: %i" % df_housing.select('price').count())
print("No of houses greater than $500000 are: %i" % df_housing.filter(df_housing["price"] > 500000).count())

In [None]:
#Distribution of prices
sns.set_style("darkgrid")
sns.histplot(df_housing.select('price').toPandas(), bins = 10)

In [None]:
#Average price of house
import matplotlib.pyplot as plt
df1 = df_housing.groupby('total_rooms').avg().sort('total_rooms').select(['total_rooms','avg(price)'])
df_p = df1.toPandas()
plt.figure(figsize = (15, 8))
sns.scatterplot(x = df_p['total_rooms'], y = df_p['avg(price)'] )

In [None]:
#Adding a column of per-capita income to the dataframe

df_housing = df_housing.withColumn('per_capita_income', df_housing['median_income']*10000/df_housing['population'])

In [None]:
#per_capita_income distribution 
g = sns.histplot(df_housing.select('per_capita_income').toPandas())
g.set(xlim = (0, 500))


In [None]:
#Per-capita-income and prices of the home
df_p = df_housing.toPandas()
sns.scatterplot(x = df_p['per_capita_income'], y = df_p['price'])

#A lot of data has near $100 per-capita income - data is skewed towards zero. 

In [None]:
#Counting per capita that are less than $100
count_blocks = df_housing.filter('per_capita_income <  100').count()/df_housing.select('per_capita_income').count()*100
print("Percentage of blocks below $100 per capita: %2f" % count_blocks)

In [None]:
#Checking unique values in ocean_proximity
df_housing.select('ocean_proximity').distinct().show()

In [None]:
#Where does wealthy people live?
df_i = df_housing.groupby('ocean_proximity').agg({'median_income' : 'avg'})
df_p = df_i.toPandas()
sns.barplot(x = df_p['ocean_proximity'], y = df_p['avg(median_income)']*10000)

#Houses that are less than 1 hour to ocean where most wealthy people wants to live

In [None]:
#Label-encoding for the "ocean_proximity" column
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="ocean_proximity", outputCol="ocean_proximity_index") 
df_housing = indexer.fit(df_housing).transform(df_housing)
df_housing = df_housing.drop('ocean_proximity')
df_housing.select('ocean_proximity_index').show(3)

In [None]:
#Removing na values to ensure correlation method works properly
mean = df_housing.select(F.mean('total_bedrooms')).collect()[0][0]
df_housing = df_housing.na.fill({'total_bedrooms': mean})

In [None]:
#Checking if na values exist in 'total_bedrooms' columns
df_housing.filter(col('total_bedrooms').isNull()).show()

In [None]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

# convert to vector column first
assembler = VectorAssembler(inputCols=df_housing.columns, outputCol="features")
df_vector = assembler.transform(df_housing).select("features")

# get correlation matrix
matrix = Correlation.corr(df_vector, 'features')
corrmatrix = matrix.collect()[0][0].toArray().tolist()

#Converst to pandas dataframe
df_corr = pd.DataFrame(corrmatrix, columns = df_housing.columns, index = df_housing.columns)

#plot correlation matrix by using seaborn
sns.heatmap(df_corr)


In [None]:
df_housing.columns

**Linear regression to predict prices**

In [None]:
#Drop non-correlated columns
df_model = df_housing.select(['housing_median_age','total_rooms', 'median_income','price'])
df_model.show(3)

In [None]:
#Checking normal distribution of selected fetures
#housing_median_age

sns.distplot(df_housing.select('housing_median_age').toPandas(), color="skyblue")
df_housing.select(F.skewness('housing_median_age'), F.kurtosis('housing_median_age')).show()

#the housing_median_age is normally distributed

In [None]:
#Checking normal distribution of selected fetures
#total_rooms

sns.distplot(df_housing.select('total_rooms').toPandas(), color="skyblue")
df_housing.select(F.skewness('total_rooms'), F.kurtosis('total_rooms')).show()

#the total_rooms is not normally distributed

In [None]:
#Using lograthimic scale to normalize the data

df_model = df_model.withColumn("total_rooms_log", F.log10(col("total_rooms")))

sns.distplot(df_model.select('total_rooms_log').toPandas(), color="skyblue")
df_model.select(F.skewness('total_rooms_log'), F.kurtosis('total_rooms_log')).show()

#The distribution is now lograthmic distributed

In [None]:
#Checking normal distribution of selected fetures
#median_income

sns.distplot(df_housing.select('median_income').toPandas(), color="skyblue")
df_housing.select(F.skewness('median_income'), F.kurtosis('median_income')).show()

#the median_income is normally distributed

In [None]:
#Assembling features
feature_assembly = VectorAssembler(inputCols = ['housing_median_age','total_rooms_log', 'median_income'], outputCol = 'features')
output = feature_assembly.transform(df_model)
output.show(3)

In [None]:
#Normalizing the features
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(output)

# Normalize each feature to have unit standard deviation.
scaledOutput = scalerModel.transform(output)
scaledOutput.show(3)

In [None]:
#Selecting input and output column from output
df_model_final = scaledOutput.select(['price', 'scaledFeatures'])
df_model_final.show(3)

In [None]:
from pyspark.ml.regression import LinearRegression 

#test train split
df_train, df_test = df_model_final.randomSplit([0.75, 0.25])
regressor = LinearRegression(featuresCol = 'scaledFeatures', labelCol = 'price')
regressor = regressor.fit(df_train)

In [None]:
#MSE for the train data

pred_results = regressor.evaluate(df_train)
print("The MSE for the model is: %2f"% pred_results.meanAbsoluteError)
print("The r2 for the model is: %2f"% pred_results.r2)

In [None]:
#Checking train performance
pred_results = regressor.evaluate(df_test)
print("The MSE for the model is: %2f"% pred_results.meanAbsoluteError)
print("The r2 for the model is: %2f"% pred_results.r2)