In [1]:
#Importing libraries
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Regression").getOrCreate()
spark

In [2]:
# importing the dependencies
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer

# importing library for checking multicollinearity
from pyspark.ml.stat import Correlation

# for training and evaluation
from pyspark.ml.regression import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

Import Dataset

This is a dataset containing housing pricing data for California. Each row of data represents the median statistics for a block (eg. median income, median age of house, etc.). You could this data in a number of ways, but we will use it to predict the median house value.

About this dataset

1. longitude: A measure of how far west a house is; a higher value is farther west

2. latitude: A measure of how far north a house is; a higher value is farther north

3. housingMedianAge: Median age of a house within a block; a lower number is a newer building

4. totalRooms: Total number of rooms within a block

5. totalBedrooms: Total number of bedrooms within a block

6. population: Total number of people residing within a block

7. households: Total number of households, a group of people residing within a home unit, for a block

8. medianIncome: Median income for households within a block of houses (measured in tens of thousands of US Dollars)

9. medianHouseValue: Median house value for households within a block (measured in US Dollars)

10. oceanProximity: Location of the house w.r.t ocean/sea
Source: https://www.kaggle.com/camnugent/california-housing-prices

In [3]:
#Importing Dataset
path = "../Datasets/"

df = spark.read.csv(path+"housing.csv", inferSchema=True, header=True)
df.limit(5).toPandas()

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY
1,-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY
2,-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY
3,-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0,NEAR BAY
4,-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0,NEAR BAY


In [4]:
# printing the schema
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [5]:
# Checking dataset counts and columns
print("Total number of observations in the datatset: ",df.count())
print("Total number of columns in the dataset: ", len(df.columns))

Total number of observations in the datatset:  20640
Total number of columns in the dataset:  10


### Missing Data Statistics

It is always valuable to know how much missing data you are going to be working with before you take any action such as filling missing values with an average or dropping those rows completely.

In [6]:
# creating a function to check null statistics

from pyspark.sql.functions import *

def null_value_calc(df):
    null_columns_counts = []
    # checking total number of rows in the dataset
    numRows = df.count()
    
    #Iterating each columns for null values
    for k in df.columns:
        # checking null count for each column
        print("column ",k)
        nullRows = df.where(col(k).isNull()).count()
        print("number of null rows: ", nullRows)
        # if there are null
        if(nullRows > 0 ):
            # creating tuple having column name, null rows and null rows percent
            temp = k, nullRows, (nullRows/numRows)*100
            # appending the tuple
            null_columns_counts.append(temp)
            print(null_columns_counts)
    return(null_columns_counts)
        

In [7]:
# passing the whole dataframe
null_columns_calc_list = null_value_calc(df)

column  longitude
number of null rows:  0
column  latitude
number of null rows:  0
column  housing_median_age
number of null rows:  0
column  total_rooms
number of null rows:  0
column  total_bedrooms
number of null rows:  207
[('total_bedrooms', 207, 1.002906976744186)]
column  population
number of null rows:  0
column  households
number of null rows:  0
column  median_income
number of null rows:  0
column  median_house_value
number of null rows:  0
column  ocean_proximity
number of null rows:  0


In [8]:
# checking null column statistics
print(null_columns_calc_list)

[('total_bedrooms', 207, 1.002906976744186)]


In [9]:
# Checking null rows from the dataset
df.filter(df.total_bedrooms.isNull()).select(['longitude','latitude','total_rooms','total_bedrooms','households','ocean_proximity']).show(5)

+---------+--------+-----------+--------------+----------+---------------+
|longitude|latitude|total_rooms|total_bedrooms|households|ocean_proximity|
+---------+--------+-----------+--------------+----------+---------------+
|  -122.16|   37.77|     1256.0|          null|     218.0|       NEAR BAY|
|  -122.17|   37.75|      992.0|          null|     259.0|       NEAR BAY|
|  -122.28|   37.78|     5154.0|          null|    1273.0|       NEAR BAY|
|  -122.24|   37.75|      891.0|          null|     146.0|       NEAR BAY|
|   -122.1|   37.69|      746.0|          null|     161.0|       NEAR BAY|
+---------+--------+-----------+--------------+----------+---------------+
only showing top 5 rows



In [10]:
# creating dataframe for null statistics to get good visualization
spark.createDataFrame(null_columns_calc_list,['Column_Name','Null_Values_Count', 'Null_Value_Percent']).show()

+--------------+-----------------+------------------+
|   Column_Name|Null_Values_Count|Null_Value_Percent|
+--------------+-----------------+------------------+
|total_bedrooms|              207| 1.002906976744186|
+--------------+-----------------+------------------+



In [11]:
# filling the missing value with mean
def fill_with_mean(df, include=set()): 
    stats = df.agg(*(avg(c).alias(c) for c in df.columns if c in include))
    return df.na.fill(stats.first().asDict())

In [12]:
df = fill_with_mean(df, ["total_bedrooms"])
df.filter(df.total_bedrooms.isNull()).select(['longitude','latitude','total_rooms','total_bedrooms','households','ocean_proximity']).show(5)

+---------+--------+-----------+--------------+----------+---------------+
|longitude|latitude|total_rooms|total_bedrooms|households|ocean_proximity|
+---------+--------+-----------+--------------+----------+---------------+
+---------+--------+-----------+--------------+----------+---------------+



### Formatting the Data

Mlib requires all input columns of your dataframe to be vectorized. You will see that we rename our dependent var to label as that is what is expected for all MLib applications. If we renmame once here, we won't need to specify it later on!

In [13]:
# segregating input and output columns
input_columns = ['housing_median_age','total_rooms','total_bedrooms','population','households','median_income','ocean_proximity']
dependent_var = ['median_house_value']

In [17]:
# renaming the dependent var to label
renamed = df.withColumnRenamed('median_house_value','label')

In [19]:
# Converting all string data type in the input column list into numeric, else algorithm won't work

# first create empty list set up to divide the input list into numeric and string data types
numeric_inputs = []
string_inputs = []

for column in input_columns:
    # checking the column type and if string, appending the new column name into string_inputs list
    if str(renamed.schema[column].dataType) == 'StringType':
        new_col_name = column + "_num"
        string_inputs.append(new_col_name)
    else:
        numeric_inputs.append(column)
        

In [21]:
print("numeric columns: ",numeric_inputs)
print("New numeric columns: ",string_inputs)

numeric columns:  ['housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income']
New numeric columns:  ['ocean_proximity_num']


In [22]:
# if the dataframe contains string column
if len(string_inputs) !=0:
    # use the string indexeer to convert them to numeric
    for column in input_columns:
        if str(renamed.schema[column].dataType) == "StringType":
            indexer = StringIndexer(inputCol=column, outputCol=column+"_num")
            indexed = indexer.fit(renamed).transform(renamed)
        else:
            indexed= renamed


In [23]:
# Checking if the dataframe is having new column Ocean_proximity_num
indexed.limit(5).toPandas()

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,label,ocean_proximity,ocean_proximity_num
0,-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY,3.0
1,-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY,3.0
2,-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY,3.0
3,-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0,NEAR BAY,3.0
4,-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0,NEAR BAY,3.0


In [24]:
# checking new schema
indexed.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = false)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- label: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- ocean_proximity_num: double (nullable = false)



### Treating Outliers

In [32]:
# create empty dictionaries
d ={}

# create a dictionary of percentiles you want to set
# we do top and bottom 1% which is pretty common

for col in numeric_inputs:
    d[col] = indexed.approxQuantile(col, [0.01, 0.99], 0.25)
    print(d)
    print(" ")
    
# now fill the values
for col in numeric_inputs:
    skew = indexed.agg(skewness(indexed[col])).collect()
    skew = skew[0][0]
    
    # this function will floor, cap, and then log+1
    if skew>1:
        indexed = indexed.withColumn(col, \
                                    log(when(indexed[col] < d[col][0], d[col][0]) \
                                       .when(indexed[col] > d[col][1], d[col][1]) \
                                       .otherwise(indexed[col]) + 1).alias(col))
        print(col+" has been treated for positive skewness: ", skew)
    else:
        # do exp in case skewness is negative
        indexed = indexed.withColumn(col, \
                                    exp(when(indexed[col] < d[col][0], d[col][0]) \
                                       .when(indexed[col] > d[col][1], d[col][1]) \
                                       .otherwise(indexed[col])).alias(col))
        print(col+" has been treated for negative skewness: ", skew)

{'housing_median_age': [3.718281828459045, 3.831008000716577e+22]}
 
{'housing_median_age': [3.718281828459045, 3.831008000716577e+22], 'total_rooms': [1.0986122886681098, 10.579514006287654]}
 
{'housing_median_age': [3.718281828459045, 3.831008000716577e+22], 'total_rooms': [1.0986122886681098, 10.579514006287654], 'total_bedrooms': [0.6931471805599453, 8.771215062375383]}
 
{'housing_median_age': [3.718281828459045, 3.831008000716577e+22], 'total_rooms': [1.0986122886681098, 10.579514006287654], 'total_bedrooms': [0.6931471805599453, 8.771215062375383], 'population': [1.3862943611198906, 10.482429663876848]}
 
{'housing_median_age': [3.718281828459045, 3.831008000716577e+22], 'total_rooms': [1.0986122886681098, 10.579514006287654], 'total_bedrooms': [0.6931471805599453, 8.771215062375383], 'population': [1.3862943611198906, 10.482429663876848], 'households': [0.6931471805599453, 8.713253274320705]}
 
{'housing_median_age': [3.718281828459045, 3.831008000716577e+22], 'total_rooms': [

In [35]:
# total feature list
features_list = numeric_inputs + string_inputs
print(features_list)
# converting all features into a single vector list
assembler = VectorAssembler(inputCols=features_list, outputCol='features')
final_data = assembler.transform(indexed).select('features', 'label')
final_data.show(5)

['housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income', 'ocean_proximity_num']
+--------------------+--------+
|            features|   label|
+--------------------+--------+
|[41.0,881.0000000...|452600.0|
|[21.0000000015165...|358500.0|
|[52.0,1467.999999...|352100.0|
|[52.0,1274.999999...|341300.0|
|[52.0,1627.999999...|342200.0|
+--------------------+--------+
only showing top 5 rows



### Check for MultiCollinearity

In [39]:
from pyspark.ml.stat import Correlation
pearsonCorr = Correlation.corr(final_data, 'features', 'pearson').collect()[0][0]

In [46]:
corrmatrix = pearsonCorr.toArray().tolist()
#print(corrmatrix)
corr_df = spark.createDataFrame(corrmatrix,features_list)
corr_df.limit(7).toPandas()

Unnamed: 0,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,ocean_proximity_num
0,1.0,-0.361183,-0.318952,-0.29622,-0.302889,-0.118984,0.145192
1,-0.361183,1.0,0.927253,0.857126,0.918484,0.19805,-0.016309
2,-0.318952,0.927253,1.0,0.87391,0.974725,-0.007682,-0.021358
3,-0.29622,0.857126,0.87391,1.0,0.907222,0.004834,-0.083537
4,-0.302889,0.918484,0.974725,0.907222,1.0,0.013033,-0.027144
5,-0.118984,0.19805,-0.007682,0.004834,0.013033,1.0,-0.039673
6,0.145192,-0.016309,-0.021358,-0.083537,-0.027144,-0.039673,1.0


In [None]:
# total rooms and total bedrooms are highly correlated ie 0.927
# total rooms and population are also highly correlated ie 0.85
# total rooms and households are highly correlated ie. 0.918
# total_bedrooms and population are highly correlated ie. 0.87
# total_bedrooms and households are highly correlated ie. 0.97


#### Split the dataframe into training and testing dataset

In [49]:
train, test = final_data.randomSplit([0.7, 0.3])

In [61]:
#Fitting the Linear Regression algorithm
regressor = LinearRegression()
fitModel = regressor.fit(train)

# Load the Summary
trainingSummary = fitModel.summary

# Print the Errors
print("Training RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("Training r2: %f" % trainingSummary.r2)
print("")

Training RMSE: 76101.920939
Training r2: 0.570097



In [51]:
# Setting the regression evaluator as rmse
evaluator = RegressionEvaluator(metricName="rmse")

In [52]:
predictions = fitModel.transform(test)

# check the rmse value for prediction accuracy
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data= %g" % rmse)

Root Mean Squared Error (RMSE) on test data= 76020.5


In [62]:
# Now load the test results
test_results = fitModel.evaluate(test)

# And print them
print("Test RMSE: {}".format(test_results.rootMeanSquaredError))
print("Test r2: {}".format(test_results.r2))
print("")

Test RMSE: 76020.4690490871
Test r2: 0.5535720507259578



In [59]:
# printing coefficients and intercept for Linear regression
print("Intercept: %s" % str(fitModel.intercept))
print(" ")
coeff_array = fitModel.coefficients.toArray()
coeff_scores = []
for x in coeff_array:
    coeff_scores.append(float(x))

# zip the input_columns list and create a df
result = spark.createDataFrame(zip(input_columns, coeff_scores), schema=['feature', 'coeff'])


Intercept: -98568.1951938066
 


In [60]:
result.show()

+------------------+------------------+
|           feature|             coeff|
+------------------+------------------+
|housing_median_age| 1910.098893473276|
|       total_rooms|-19.29946062295536|
|    total_bedrooms| 78.75377618622908|
|        population|-33.94987263975245|
|        households|146.22928385994496|
|     median_income| 47761.21736368154|
|   ocean_proximity|1976.3262414215985|
+------------------+------------------+

