In [15]:
!pip install pyspark



In [29]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [17]:
spark = (SparkSession.builder
         .master("local")
         .appName("Colab")
         .config("spark.ui.post", "4050")
         .getOrCreate()
)

In [18]:
df = spark.read.csv("/content/housing.csv", header = True, inferSchema = True)

In [19]:
df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [20]:
df = df.withColumn("id", monotonically_increasing_id())

In [21]:
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity| id|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|  0|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|  1|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|  2|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|  3|
|  -12

In [22]:
df = df[["id"] + df.columns[:-1]]

In [23]:
df.show()

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  3|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  4| 

In [24]:
df.count()

20640

In [25]:
df.select("total_rooms").agg({"total_rooms" : "avg"}).show()

+------------------+
|  avg(total_rooms)|
+------------------+
|2635.7630813953488|
+------------------+



In [27]:
df.select(*[mean(c) for c in df.columns]).show()

+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+
|avg(id)|     avg(longitude)|   avg(latitude)|avg(housing_median_age)|  avg(total_rooms)|avg(total_bedrooms)|   avg(population)|  avg(households)|avg(median_income)|avg(median_house_value)|avg(ocean_proximity)|
+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+
|10319.5|-119.56970445736148|35.6318614341087|     28.639486434108527|2635.7630813953488|  537.8705525375618|1425.4767441860465|499.5396802325581|3.8706710029070246|     206855.81690891474|                NULL|
+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+----------

In [30]:
df.groupBy("ocean_proximity").agg({col : 'avg' for col in df.columns[3:-1]}).show()

+---------------+------------------+------------------+-------------------+------------------+------------------+-----------------------+-----------------------+
|ocean_proximity|   avg(households)|   avg(population)|avg(total_bedrooms)|avg(median_income)|  avg(total_rooms)|avg(median_house_value)|avg(housing_median_age)|
+---------------+------------------+------------------+-------------------+------------------+------------------+-----------------------+-----------------------+
|         ISLAND|             276.6|             668.0|              420.4|2.7444200000000003|            1574.6|               380440.0|                   42.4|
|     NEAR OCEAN|501.24454477050415|1354.0086531226486|  538.6156773211568| 4.005784800601957| 2583.700902934537|     249433.97742663656|     29.347253574115875|
|       NEAR BAY| 488.6161572052402|1230.3174672489083|  514.1828193832599| 4.172884759825336| 2493.589519650655|     259212.31179039303|      37.73013100436681|
|      <1H OCEAN| 517.744964

In [31]:
def squared(value):
  return value ** 2

squared_udf = udf(squared, FloatType())

df.withColumn("total_rooms_squared", squared_udf("total_rooms")).show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|total_rooms_squared|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|           774400.0|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|          5.03958E7|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|          2152089.0|
|  3|  -122.25|   37.85|    

In [32]:
df.show()

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  3|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  4| 

In [33]:
train, test = df.randomSplit([0.7, 0.3])

train, test

(DataFrame[id: bigint, longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string],
 DataFrame[id: bigint, longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string])

In [35]:
numerical_features_list = train.columns
numerical_features_list.remove("median_house_value")
numerical_features_list.remove("id")
numerical_features_list.remove("ocean_proximity")

In [36]:
numerical_features_list

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income']

In [37]:
from pyspark.ml.feature import *

In [38]:
imputer = Imputer(inputCols=numerical_features_list,
                  outputCols=numerical_features_list)

imputer = imputer.fit(train)

train = imputer.transform(train)
test = imputer.transform(test)

In [39]:
train.show()

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  3|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  5| 

In [41]:
numerical_vector_assembler = VectorAssembler(inputCols = numerical_features_list,
                                               outputCol = "numerical_feature_vector")

train = numerical_vector_assembler.transform(train)
test = numerical_vector_assembler.transform(test)

In [42]:
train.show()

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|    [-122.22,37.86,21...|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|    [-122.24,37.85,52...

In [45]:
scaler = StandardScaler(inputCol = 'numerical_feature_vector',
                        outputCol = 'scaled_numerical_feature_vector',
                        withStd = True, withMean = True)

In [46]:
scaler = scaler.fit(train)

In [47]:
train = scaler.transform(train)
test = scaler.transform(test)

train.show()

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.3305720659827...|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|    [-122.22,37.86,21...|           [-1.3255778554684...|
|  2|

In [49]:
train.select("scaled_numerical_feature_vector").take(3)

[Row(scaled_numerical_feature_vector=DenseVector([-1.3306, 1.054, 0.991, -0.8095, -0.9788, -0.9802, -0.9816, 2.3694])),
 Row(scaled_numerical_feature_vector=DenseVector([-1.3256, 1.0446, -0.5974, 2.0753, 1.3719, 0.8768, 1.6938, 2.3568])),
 Row(scaled_numerical_feature_vector=DenseVector([-1.3356, 1.0399, 1.8646, -0.5372, -0.8321, -0.8248, -0.8467, 1.8026]))]

In [50]:
indexer = StringIndexer(inputCol = "ocean_proximity",
                        outputCol = "ocean_category_index")

indexer = indexer.fit(train)
train = indexer.transform(train)
test = indexer.transform(test)

train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.3305720659827...|                 3.0|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          3585

In [51]:
set(train.select("ocean_category_index").collect())

{Row(ocean_category_index=0.0),
 Row(ocean_category_index=1.0),
 Row(ocean_category_index=2.0),
 Row(ocean_category_index=3.0),
 Row(ocean_category_index=4.0)}

In [55]:
one_hot_encoder = OneHotEncoder(inputCol = "ocean_category_index",
                                outputCol = "ocean_category_one_hot")

one_hot_encoder = one_hot_encoder.fit(train)

train = one_hot_encoder.transform(train)
test = one_hot_encoder.transform(test)

train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+------------------+----------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_hot|ocean_category_one_hot|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+------------------+----------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.3305720659827...|                

In [57]:
assembler = VectorAssembler(inputCols = ["scaled_numerical_feature_vector", "ocean_category_one_hot"],
                            outputCol = "final_feature_vector")

train = assembler.transform(train)
test = assembler.transform(test)

train.show(3)

IllegalArgumentException: Output column final_feature_vector already exists.

In [58]:
train.select("final_feature_vector").take(2)

[Row(final_feature_vector=DenseVector([-1.3306, 1.054, 0.991, -0.8095, -0.9788, -0.9802, -0.9816, 2.3694, 0.0, 0.0, 0.0, 1.0])),
 Row(final_feature_vector=DenseVector([-1.3256, 1.0446, -0.5974, 2.0753, 1.3719, 0.8768, 1.6938, 2.3568, 0.0, 0.0, 0.0, 1.0]))]

In [59]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = "final_feature_vector",
                      labelCol = "median_house_value")

lr

LinearRegression_f05a9e34cb07

In [60]:
lr = lr.fit(train)

lr

LinearRegressionModel: uid=LinearRegression_f05a9e34cb07, numFeatures=12

In [63]:
pred_train_df = lr.transform(train).withColumnRenamed("prediction", "predicted_median_house_value")

In [64]:
pred_train_df.show()

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+------------------+----------------------+--------------------+----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_hot|ocean_category_one_hot|final_feature_vector|predicted_median_house_value|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+------------------+----------------------+--------------------+----------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         

In [65]:
pred_test_df = lr.transform(test).withColumnRenamed("prediction", "predicted_median_house_value")

In [68]:
pred_test_df.show()

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+------------------+----------------------+--------------------+----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_hot|ocean_category_one_hot|final_feature_vector|predicted_median_house_value|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+------------------+----------------------+--------------------+----------------------------+
|  4|  -122.25|   37.85|              52.0|     1627.0|         

In [69]:
pred_test_pd_df = pred_test_df.toPandas()

In [70]:
pred_test_pd_df.head(5)

Unnamed: 0,id,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity,numerical_feature_vector,scaled_numerical_feature_vector,ocean_category_index,ocean_category_hot,ocean_category_one_hot,final_feature_vector,predicted_median_house_value
0,4,-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0,NEAR BAY,"[-122.25, 37.85, 52.0, 1627.0, 280.0, 565.0, 2...","[-1.340560487011272, 1.0399217795119462, 1.864...",3.0,"(0.0, 0.0, 0.0, 1.0)","(0.0, 0.0, 0.0, 1.0)","[-1.340560487011272, 1.0399217795119462, 1.864...",253894.142901
1,12,-122.26,37.85,52.0,2491.0,474.0,1098.0,468.0,3.075,213500.0,NEAR BAY,"[-122.26, 37.85, 52.0, 2491.0, 474.0, 1098.0, ...","[-1.3455546975255341, 1.0399217795119462, 1.86...",3.0,"(0.0, 0.0, 0.0, 1.0)","(0.0, 0.0, 0.0, 1.0)","[-1.3455546975255341, 1.0399217795119462, 1.86...",229256.017749
2,13,-122.26,37.84,52.0,696.0,191.0,345.0,174.0,2.6736,191300.0,NEAR BAY,"[-122.26, 37.84, 52.0, 696.0, 191.0, 345.0, 17...","[-1.3455546975255341, 1.0352359845875512, 1.86...",3.0,"(0.0, 0.0, 0.0, 1.0)","(0.0, 0.0, 0.0, 1.0)","[-1.3455546975255341, 1.0352359845875512, 1.86...",209230.957791
3,14,-122.26,37.85,52.0,2643.0,626.0,1212.0,620.0,1.9167,159200.0,NEAR BAY,"[-122.26, 37.85, 52.0, 2643.0, 626.0, 1212.0, ...","[-1.3455546975255341, 1.0399217795119462, 1.86...",3.0,"(0.0, 0.0, 0.0, 1.0)","(0.0, 0.0, 0.0, 1.0)","[-1.3455546975255341, 1.0399217795119462, 1.86...",201716.909687
4,15,-122.26,37.85,50.0,1120.0,283.0,697.0,264.0,2.125,140000.0,NEAR BAY,"[-122.26, 37.85, 50.0, 1120.0, 283.0, 697.0, 2...","[-1.3455546975255341, 1.0399217795119462, 1.70...",3.0,"(0.0, 0.0, 0.0, 1.0)","(0.0, 0.0, 0.0, 1.0)","[-1.3455546975255341, 1.0399217795119462, 1.70...",183474.345256


In [71]:
predictions_and_actuals = pred_test_df[["predicted_median_house_value",
                                        "median_house_value"]]

In [72]:
predictions_and_actuals_rdd = predictions_and_actuals.rdd

In [73]:
predictions_and_actuals_rdd.take(2)

[Row(predicted_median_house_value=253894.14290103846, median_house_value=342200.0),
 Row(predicted_median_house_value=229256.0177492994, median_house_value=213500.0)]

In [74]:
predictions_and_actuals_rdd = predictions_and_actuals_rdd.map(tuple)\

predictions_and_actuals_rdd.take(2)

[(253894.14290103846, 342200.0), (229256.0177492994, 213500.0)]

In [76]:
from pyspark.mllib.evaluation import RegressionMetrics

metrics = RegressionMetrics(predictions_and_actuals_rdd)

print(metrics.meanSquaredError)



4581459336.678994
