# California Housing Dataset - Linear Regression (pyspark)

The purpose of this of this project is to using pyspark's machine learning library to predict the median_house_value using linear regression.

In [1]:
pip install pyspark



In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.master('local').appName('Colab').getOrCreate()

In [4]:
df = spark.read.format('csv').load('housing.csv', header=True, inferSchema=True)

In [5]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [6]:
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [7]:
from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn('id', monotonically_increasing_id())
df = df[['id'] + df.columns[:-1]]
df.show()

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  3|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  4| 

In [8]:
df.select('total_rooms').agg({'total_rooms' : 'avg'}).show()

+------------------+
|  avg(total_rooms)|
+------------------+
|2635.7630813953488|
+------------------+



In [9]:
# Finding the mean of each of the columns

from pyspark.sql.functions import mean

df.select(*[mean(c) for c in df.columns]).show()

+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+
|avg(id)|     avg(longitude)|   avg(latitude)|avg(housing_median_age)|  avg(total_rooms)|avg(total_bedrooms)|   avg(population)|  avg(households)|avg(median_income)|avg(median_house_value)|avg(ocean_proximity)|
+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+
|10319.5|-119.56970445736148|35.6318614341087|     28.639486434108527|2635.7630813953488|  537.8705525375618|1425.4767441860465|499.5396802325581|3.8706710029070246|     206855.81690891474|                NULL|
+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+----------

In [10]:
# Grouping the data by ocean proximity and finding the average for each of the columns

df.groupby('ocean_proximity').agg({col : 'avg' for col in df.columns}).show()

+---------------+------------------+------------------+------------------+-------------------+-------------------+------------------+------------------+------------------+-----------------------+-----------------------+--------------------+
|ocean_proximity|   avg(households)|     avg(latitude)|   avg(population)|avg(total_bedrooms)|     avg(longitude)|           avg(id)|avg(median_income)|  avg(total_rooms)|avg(median_house_value)|avg(housing_median_age)|avg(ocean_proximity)|
+---------------+------------------+------------------+------------------+-------------------+-------------------+------------------+------------------+------------------+-----------------------+-----------------------+--------------------+
|         ISLAND|             276.6|33.358000000000004|             668.0|              420.4|           -118.354|            8316.0|2.7444200000000003|            1574.6|               380440.0|                   42.4|                NULL|
|     NEAR OCEAN|501.24454477050415|

In [11]:
# Creating a squared function

from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

def squared(value):
  return value*value

squared_udf = udf(squared, FloatType())

df.withColumn('total_rooms_squared', squared_udf('total_rooms')).show()

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|total_rooms_squared|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|           774400.0|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|          5.03958E7|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|          2152089.0|
|  3|  -122.25|   37.85|    

In [12]:
# Setting up train/test split

# Note that usually, we would want to do a train/validation and test split. Howeever, for the purpose of this project, we will just stick to train and test.

train, test = df.randomSplit([0.7, 0.3])
train, test

(DataFrame[id: bigint, longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string],
 DataFrame[id: bigint, longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string])

In [13]:
numerical_features = train.columns
numerical_features.remove('median_house_value')
numerical_features.remove('id')
numerical_features.remove('ocean_proximity')

numerical_features

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income']

In [14]:
# Removing missing values

from pyspark.ml.feature import Imputer

imputer = Imputer(strategy='mode', inputCols=numerical_features, outputCols=numerical_features)

imputer_model = imputer.fit(train)

train = imputer_model.transform(train)
test = imputer_model.transform(test)

train.show(3)
test.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only s

In [15]:
# Converting all of the numerical features into a single vector

from pyspark.ml.feature import VectorAssembler

numerical_vector_assembler = VectorAssembler(inputCols=numerical_features, outputCol='numerical_features_vector')

train = numerical_vector_assembler.transform(train)
test = numerical_vector_assembler.transform(test)

train.show(3)
test.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_features_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|     [-122.23,37.88,41...|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|     [-122.22,37.86,21...|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|     [-122.24,37.85

In [16]:
from pyspark.ml.feature import StandardScaler

In [17]:
# Scaling our numerical features

scaler = StandardScaler(inputCol='numerical_features_vector', outputCol='scaled_numerical_features_vector', withStd=True, withMean=True)

scaler = scaler.fit(train)

train = scaler.transform(train)
test = scaler.transform(test)

In [18]:
train.show(3)
test.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------------+--------------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_features_vector|scaled_numerical_features_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------------+--------------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|     [-122.23,37.88,41...|            [-1.3323116783017...|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|     [-122.22,37.86,21...|            [-1.3273158006805

In [19]:
# Converting our catergorical feature into a numerical feature

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol='ocean_proximity', outputCol='ocean_proximity_indexed')

indexer = indexer.fit(train)

train = indexer.transform(train)
test = indexer.transform(test)

train.show(3)
test.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------------+--------------------------------+-----------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_features_vector|scaled_numerical_features_vector|ocean_proximity_indexed|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------------+--------------------------------+-----------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|     [-122.23,37.88,41...|            [-1.3323116783017...|                    3.0|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8

In [20]:
set(train.select('ocean_proximity_indexed').collect())

{Row(ocean_proximity_indexed=0.0),
 Row(ocean_proximity_indexed=1.0),
 Row(ocean_proximity_indexed=2.0),
 Row(ocean_proximity_indexed=3.0),
 Row(ocean_proximity_indexed=4.0)}

In [21]:
# We want to use a OneHotencoder on the ocean_proximity_indexed

from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(inputCol='ocean_proximity_indexed', outputCol='ocean_proximity_encoded')

encoder = encoder.fit(train)

train = encoder.transform(train)
test = encoder.transform(test)

train.show(3)
test.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------------+--------------------------------+-----------------------+-----------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_features_vector|scaled_numerical_features_vector|ocean_proximity_indexed|ocean_proximity_encoded|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------------+--------------------------------+-----------------------+-----------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|     [-122.23,37.88,41...|            [-1.3323116783017...|                    3.0|          (4,[3],[1.0])|
|  1

In [22]:
# Combining our encoded_vector for categroical feature with our numerical features vector

assembler = VectorAssembler(inputCols=['scaled_numerical_features_vector', 'ocean_proximity_encoded'], outputCol='total_features_vector')

train = assembler.transform(train)
test = assembler.transform(test)

train.show(3)
test.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------------+--------------------------------+-----------------------+-----------------------+---------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_features_vector|scaled_numerical_features_vector|ocean_proximity_indexed|ocean_proximity_encoded|total_features_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------------+--------------------------------+-----------------------+-----------------------+---------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|     [-122.23,37.88,41...|            [-1.3323

In [23]:
train.select('total_features_vector').take(1)

[Row(total_features_vector=DenseVector([-1.3323, 1.0548, 0.9871, -0.7982, -0.9624, -0.9526, -0.9689, 2.3529, 0.0, 0.0, 0.0, 1.0]))]

In [24]:
# Training a linear regression model

from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='total_features_vector', labelCol='median_house_value')

lr = lr.fit(train)

In [25]:
pred_train_df = lr.transform(train).withColumnRenamed('prediction', 'predcition_median_house_value')
pred_train_df.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------------+--------------------------------+-----------------------+-----------------------+---------------------+-----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_features_vector|scaled_numerical_features_vector|ocean_proximity_indexed|ocean_proximity_encoded|total_features_vector|predcition_median_house_value|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------------+--------------------------------+-----------------------+-----------------------+---------------------+-----------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|     

In [26]:
pred_test_df = lr.transform(test).withColumnRenamed('prediction', 'predcition_median_house_value')
pred_test_df.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------------+--------------------------------+-----------------------+-----------------------+---------------------+-----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_features_vector|scaled_numerical_features_vector|ocean_proximity_indexed|ocean_proximity_encoded|total_features_vector|predcition_median_house_value|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------------+--------------------------------+-----------------------+-----------------------+---------------------+-----------------------------+
|  8|  -122.26|   37.84|              42.0|     2555.0|         665.0|    1206.0|     595.0|     

In [27]:
preds_and_actuals = pred_test_df[['predcition_median_house_value', 'median_house_value']]

# Converting to RDD

preds_and_actuals_rdd = preds_and_actuals.rdd

preds_and_actuals_rdd.take(3)

[Row(predcition_median_house_value=199137.80573833315, median_house_value=226700.0),
 Row(predcition_median_house_value=266147.6159940249, median_house_value=261100.0),
 Row(predcition_median_house_value=256469.21710566958, median_house_value=241800.0)]

In [27]:
# Converting to a tuple

preds_and_actuals_rdd = preds_and_actuals_rdd.map(tuple)

preds_and_actuals_rdd.take(3)

In [28]:
# Evalauting metrics for test results

from pyspark.mllib.evaluation import RegressionMetrics

metrics = RegressionMetrics(preds_and_actuals_rdd)

print('MSE: ', metrics.meanSquaredError)
print('RMSE: ', metrics.rootMeanSquaredError)
print('R2: ', metrics.r2)



MSE:  4768059133.824859
RMSE:  69051.13419651309
R2:  0.6441115880057764


#### Conclusion

The results of this model is clearly not the best and could be improved through a variety of techniques.