In [1]:
from pyspark.sql import SparkSession


spark = SparkSession.builder \
  .appName("Predicting housing prices") \
  .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/04 12:42:57 WARN Utils: Your hostname, Dylans-MacBook-Pro.local, resolves to a loopback address: 127.0.0.1; using 10.20.42.250 instead (on interface en0)
25/08/04 12:42:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/04 12:42:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
data = spark.read.csv("data/housing.csv", header=True, inferSchema=True)

data.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [3]:
data.count()

20640

In [4]:
import pyspark.sql.functions as F


data \
  .select(F.count(F.when(F.col('ocean_proximity').isNull(), 1)).alias('ocean_proximity')) \
  .show()

+---------------+
|ocean_proximity|
+---------------+
|              0|
+---------------+



In [5]:
data \
  .select([F.count(F.when(F.col(c).isNull(), 1)).alias(c) for c in data.columns]) \
  .show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|           207|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



In [6]:
filtered_data = data.dropna(subset=['total_bedrooms'])
filtered_data.count()

20433

In [7]:
from pyspark.ml.feature import VectorAssembler


feature_columns = ['housing_median_age', 'total_rooms', 'total_bedrooms',
                   'population', 'households', 'median_income']
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')

transformed_data = assembler.transform(filtered_data)
transformed_data.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|            features|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|[41.0,880.0,129.0...|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|[21.0,7099.0,1106...|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|[52.0,1467.0,190....|
|  -122.25|   37.85|              52.0|     12

In [8]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='features', labelCol='median_house_value')
model = lr.fit(transformed_data)

25/08/04 12:54:13 WARN Instrumentation: [f1bb253b] regParam is zero, which might cause numerical instability and overfitting.
25/08/04 12:54:13 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/08/04 12:54:13 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [9]:
single_record = [{
    'housing_median_age': 41.0,
    'total_rooms': 880.0,
    'total_bedrooms': 129.0,
    'population': 322.0,
    'households': 126.0,
    'median_income': 8.3252
}]

single_record_df = spark.createDataFrame(single_record)

In [10]:
new_data = assembler.transform(single_record_df)
new_data.show()

                                                                                

+----------+------------------+-------------+----------+--------------+-----------+--------------------+
|households|housing_median_age|median_income|population|total_bedrooms|total_rooms|            features|
+----------+------------------+-------------+----------+--------------+-----------+--------------------+
|     126.0|              41.0|       8.3252|     322.0|         129.0|      880.0|[41.0,880.0,129.0...|
+----------+------------------+-------------+----------+--------------+-----------+--------------------+



In [11]:
model.transform(new_data).show()

+----------+------------------+-------------+----------+--------------+-----------+--------------------+------------------+
|households|housing_median_age|median_income|population|total_bedrooms|total_rooms|            features|        prediction|
+----------+------------------+-------------+----------+--------------+-----------+--------------------+------------------+
|     126.0|              41.0|       8.3252|     322.0|         129.0|      880.0|[41.0,880.0,129.0...|428551.23479640554|
+----------+------------------+-------------+----------+--------------+-----------+--------------------+------------------+

