In [345]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, mean
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, Imputer, StandardScaler
from pyspark.ml.regression import RandomForestRegressor, LinearRegression
from pyspark.ml.feature import OneHotEncoder, StringIndexer

In [346]:
spark = SparkSession.builder.appName('Housing').getOrCreate()

In [347]:
df_spark = spark.read.csv('housing.csv', header = True, inferSchema=True)
df_spark.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [348]:
df_spark.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [349]:
df_spark.describe().show()

+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+---------------+
|summary|          longitude|         latitude|housing_median_age|       total_rooms|    total_bedrooms|        population|       households|     median_income|median_house_value|ocean_proximity|
+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+---------------+
|  count|              20640|            20640|             20640|             20640|             20433|             20640|            20640|             20640|             20640|          20640|
|   mean|-119.56970445736148| 35.6318614341087|28.639486434108527|2635.7630813953488| 537.8705525375618|1425.4767441860465|499.5396802325581|3.8706710029070246|206855.81690891474|           NULL|
| stddev|  2.0035317

In [350]:
df_spark.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [351]:
df_spark.count()

20640

In [352]:
df_spark.select(avg('total_rooms')).show()

+------------------+
|  avg(total_rooms)|
+------------------+
|2635.7630813953488|
+------------------+



In [353]:
df_spark.select(*[mean(c) for c in df_spark.columns]).show()

+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+
|     avg(longitude)|   avg(latitude)|avg(housing_median_age)|  avg(total_rooms)|avg(total_bedrooms)|   avg(population)|  avg(households)|avg(median_income)|avg(median_house_value)|avg(ocean_proximity)|
+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+
|-119.56970445736148|35.6318614341087|     28.639486434108527|2635.7630813953488|  537.8705525375618|1425.4767441860465|499.5396802325581|3.8706710029070246|     206855.81690891474|                NULL|
+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+-------

In [354]:
from pyspark.sql.functions import monotonically_increasing_id

df = df_spark.withColumn('id', monotonically_increasing_id())

df_spark = df[['id'] + df.columns[:-1]]


In [355]:
df_spark.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  3|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  4| 

In [356]:
train, test = df_spark.randomSplit([0.7,0.3])
train, test

(DataFrame[id: bigint, longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string],
 DataFrame[id: bigint, longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string])

In [357]:
numerical_feature_lst = train.columns
numerical_feature_lst.remove ('median_house_value')
numerical_feature_lst.remove('ocean_proximity')

numerical_feature_lst

['id',
 'longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income']

In [358]:
imputer = Imputer(inputCols=numerical_feature_lst, outputCols=numerical_feature_lst)
imputer = imputer.fit(train)
train = imputer.transform(train)
test = imputer.transform(test)

In [359]:
train.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  3|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  4|  -122.25|   37.85|              52.0|     1627.0|         280.0|     565.0|     259.0|       3.8462|          342200.0|       NEAR BAY|
|  5| 

In [360]:
numerical_vector = VectorAssembler(inputCols=numerical_feature_lst, outputCol='numerical_vector_asse')
train = numerical_vector.transform(train)
test = numerical_vector.transform(test)
train.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_vector_asse|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY| [0.0,-122.23,37.8...|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY| [1.0,-122.22,37.8...|
|  3|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY| [3.0,-122.25,37.8...|
|  4|  -122.25| 

In [361]:
train.select('numerical_vector_asse').show(3)

+---------------------+
|numerical_vector_asse|
+---------------------+
| [0.0,-122.23,37.8...|
| [1.0,-122.22,37.8...|
| [3.0,-122.25,37.8...|
+---------------------+
only showing top 3 rows



In [362]:
train.select('numerical_vector_asse').take(3)

[Row(numerical_vector_asse=DenseVector([0.0, -122.23, 37.88, 41.0, 880.0, 129.0, 322.0, 126.0, 8.3252])),
 Row(numerical_vector_asse=DenseVector([1.0, -122.22, 37.86, 21.0, 7099.0, 1106.0, 2401.0, 1138.0, 8.3014])),
 Row(numerical_vector_asse=DenseVector([3.0, -122.25, 37.85, 52.0, 1274.0, 235.0, 558.0, 219.0, 5.6431]))]

In [363]:
scaler = StandardScaler(inputCol='numerical_vector_asse', outputCol='scaler_numerical_vector_asse', withMean=True, withStd=True)
scaler = scaler.fit(train)
train= scaler.transform(train)
test = scaler.transform(test)

train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_vector_asse|scaler_numerical_vector_asse|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+----------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY| [0.0,-122.23,37.8...|        [-1.7299195000489...|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY| [1.0,-122.22,37.8...|        [-1.7297517940931...|
|  3|  -122.25|   37.85|           

In [364]:
train.select('scaler_numerical_vector_asse').take(3)

[Row(scaler_numerical_vector_asse=DenseVector([-1.7299, -1.3247, 1.0463, 0.9964, -0.8149, -0.9905, -0.9693, -0.9911, 2.3428])),
 Row(scaler_numerical_vector_asse=DenseVector([-1.7298, -1.3197, 1.0369, -0.5945, 2.0839, 1.3862, 0.8595, 1.705, 2.3303])),
 Row(scaler_numerical_vector_asse=DenseVector([-1.7294, -1.3346, 1.0323, 1.8714, -0.6312, -0.7326, -0.7617, -0.7433, 0.9316]))]

In [365]:
one_hot = StringIndexer(inputCol='ocean_proximity', outputCol='ocean_proximity_index')
one_hot = one_hot.fit(train)
train= one_hot.transform(train)
test = one_hot.transform(test)

train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+----------------------------+---------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_vector_asse|scaler_numerical_vector_asse|ocean_proximity_index|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+----------------------------+---------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY| [0.0,-122.23,37.8...|        [-1.7299195000489...|                  3.0|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY

In [366]:
set(train.select('ocean_proximity_index').collect())

{Row(ocean_proximity_index=0.0),
 Row(ocean_proximity_index=1.0),
 Row(ocean_proximity_index=2.0),
 Row(ocean_proximity_index=3.0),
 Row(ocean_proximity_index=4.0)}

In [367]:
one_hot = OneHotEncoder(inputCol='ocean_proximity_index', outputCol='ocean_proximity_one_hot')
one_hot = one_hot.fit(train)
train= one_hot.transform(train)
test = one_hot.transform(test)

train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+----------------------------+---------------------+-----------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_vector_asse|scaler_numerical_vector_asse|ocean_proximity_index|ocean_proximity_one_hot|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+----------------------------+---------------------+-----------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY| [0.0,-122.23,37.8...|        [-1.7299195000489...|                  3.0|          (4,[3],[1.0])|
|  1|  -122.22|   37.86|              21.0| 

In [368]:
train.select('ocean_proximity_one_hot').take(3)

[Row(ocean_proximity_one_hot=SparseVector(4, {3: 1.0})),
 Row(ocean_proximity_one_hot=SparseVector(4, {3: 1.0})),
 Row(ocean_proximity_one_hot=SparseVector(4, {3: 1.0}))]

In [369]:
assembler = VectorAssembler(inputCols=['scaler_numerical_vector_asse','ocean_proximity_index'], outputCol='final_vector_no_ocean')

train= assembler.transform(train)
test = assembler.transform(test)

train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+----------------------------+---------------------+-----------------------+---------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_vector_asse|scaler_numerical_vector_asse|ocean_proximity_index|ocean_proximity_one_hot|final_vector_no_ocean|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+----------------------------+---------------------+-----------------------+---------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY| [0.0,-122.23,37.8...|        [-1.7299195000489...|                  3.0|   

In [370]:
assembler = VectorAssembler(inputCols=['scaler_numerical_vector_asse','ocean_proximity_one_hot'], outputCol='final_vector')

train= assembler.transform(train)
test = assembler.transform(test)

train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+----------------------------+---------------------+-----------------------+---------------------+--------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_vector_asse|scaler_numerical_vector_asse|ocean_proximity_index|ocean_proximity_one_hot|final_vector_no_ocean|        final_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+----------------------------+---------------------+-----------------------+---------------------+--------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY| [0.0,-122.23

In [371]:
lr_no_ocean = LinearRegression ( featuresCol= 'final_vector_no_ocean', labelCol= 'median_house_value')
lr_no_ocean = lr_no_ocean.fit(train)
predict_train = lr_no_ocean.transform(train).withColumnRenamed('prediction', 'prediction_mediam_house_value_no_ocean')
predict_train.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+----------------------------+---------------------+-----------------------+---------------------+--------------------+--------------------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_vector_asse|scaler_numerical_vector_asse|ocean_proximity_index|ocean_proximity_one_hot|final_vector_no_ocean|        final_vector|prediction_mediam_house_value_no_ocean|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+----------------------------+---------------------+-----------------------+---------------------+--------------------+--------------------------------------+
|  0|  -122.23|   37.88|             

In [372]:
lr_ocean = LinearRegression ( featuresCol= 'final_vector', labelCol= 'median_house_value')
lr_ocean = lr_ocean.fit(train)
predict_train = lr_ocean.transform(predict_train).withColumnRenamed('prediction', 'prediction_mediam_house_value')
predict_train.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+----------------------------+---------------------+-----------------------+---------------------+--------------------+--------------------------------------+-----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_vector_asse|scaler_numerical_vector_asse|ocean_proximity_index|ocean_proximity_one_hot|final_vector_no_ocean|        final_vector|prediction_mediam_house_value_no_ocean|prediction_mediam_house_value|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+----------------------------+---------------------+-----------------------+---------------------+--------------------+-----------------

In [373]:
predict_train.select(['median_house_value','prediction_mediam_house_value_no_ocean','prediction_mediam_house_value']).show(5)

+------------------+--------------------------------------+-----------------------------+
|median_house_value|prediction_mediam_house_value_no_ocean|prediction_mediam_house_value|
+------------------+--------------------------------------+-----------------------------+
|          452600.0|                     399494.3814015753|           402127.91587132047|
|          358500.0|                    410014.63349578716|           415660.42629790225|
|          341300.0|                     313063.4006832341|             316031.643973711|
|          342200.0|                    245815.10655730442|            250699.2799159012|
|          269700.0|                     253621.2994648234|           257747.62167579285|
+------------------+--------------------------------------+-----------------------------+
only showing top 5 rows



In [374]:
rf_no_ocean = RandomForestRegressor ( featuresCol= 'final_vector_no_ocean', labelCol= 'median_house_value')
rf_no_ocean = rf_no_ocean.fit(train)
predict_train_rf = rf_no_ocean.transform(train).withColumnRenamed('prediction', 'prediction_mediam_house_value_no_ocean')
predict_train_rf.show(5)


+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+----------------------------+---------------------+-----------------------+---------------------+--------------------+--------------------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_vector_asse|scaler_numerical_vector_asse|ocean_proximity_index|ocean_proximity_one_hot|final_vector_no_ocean|        final_vector|prediction_mediam_house_value_no_ocean|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+----------------------------+---------------------+-----------------------+---------------------+--------------------+--------------------------------------+
|  0|  -122.23|   37.88|             

In [375]:
rf = RandomForestRegressor ( featuresCol= 'final_vector', labelCol= 'median_house_value')
rf = rf.fit(train)
predict_train_rf = rf.transform(predict_train_rf).withColumnRenamed('prediction', 'prediction_mediam_house_value')
predict_train_rf.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+----------------------------+---------------------+-----------------------+---------------------+--------------------+--------------------------------------+-----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_vector_asse|scaler_numerical_vector_asse|ocean_proximity_index|ocean_proximity_one_hot|final_vector_no_ocean|        final_vector|prediction_mediam_house_value_no_ocean|prediction_mediam_house_value|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+----------------------------+---------------------+-----------------------+---------------------+--------------------+-----------------

In [376]:
predict_train.select(['median_house_value','prediction_mediam_house_value_no_ocean','prediction_mediam_house_value']).show(5)

+------------------+--------------------------------------+-----------------------------+
|median_house_value|prediction_mediam_house_value_no_ocean|prediction_mediam_house_value|
+------------------+--------------------------------------+-----------------------------+
|          452600.0|                     399494.3814015753|           402127.91587132047|
|          358500.0|                    410014.63349578716|           415660.42629790225|
|          341300.0|                     313063.4006832341|             316031.643973711|
|          342200.0|                    245815.10655730442|            250699.2799159012|
|          269700.0|                     253621.2994648234|           257747.62167579285|
+------------------+--------------------------------------+-----------------------------+
only showing top 5 rows

