# Task 4

Now, we'll try to use spark functions to improve the quality of our predictions on the california housing data. For convenience, much of the code to load data and run the model is copied from the previous part. This activity is pretty free form and will be graded for effort instead of accuracy. Here are some ideas of feature engineering you can do to improve the quality of your predictions:

1. Try different types of regression (which can be found here) https://spark.apache.org/docs/2.1.1/ml-classification-regression.html
2. Think of other features which may be indicative of this data. Some of the most important ones could be rooms, population, or bedrooms per household (which is more helpful than rooms, populations, or bedrooms per block group)
3. Try to scale the features using StandardScaler


## Housing Data Set

The California Housing data set appeared in a 1997 paper titled *Sparse Spatial Autoregressions*, written by Pace, R. Kelley and Ronald Barry and published in the Statistics and Probability Letters journal. The researchers built this data set by using the 1990 California census data.

The data contains one row per census block group. A block group is the smallest geographical unit for which the U.S. Census Bureau publishes sample data (a block group typically has a population of 600 to 3,000 people). In this sample a block group on average includes 1425.5 individuals living in a geographically compact area.

These spatial data contain 20,640 observations on housing prices with 9 economic variables:

<p style="text-align: justify;"></p>
<pre><strong>Longitude:</strong>refers to the angular distance of a geographic place north or south of the earth’s equator for each block group
<strong>Latitude :</strong>refers to the angular distance of a geographic place east or west of the earth’s equator for each block group
<strong>Housing Median Age:</strong>is the median age of the people that belong to a block group. Note that the median is the value that lies at the midpoint of a frequency distribution of observed values
<strong>Total Rooms:</strong>is the total number of rooms in the houses per block group
<strong>Total Bedrooms:</strong>is the total number of bedrooms in the houses per block group
<strong>Population:</strong>is the number of inhabitants of a block group
<strong>Households:</strong>refers to units of houses and their occupants per block group
<strong>Median Income:</strong>is used to register the median income of people that belong to a block group
<strong>Median House Value:</strong>is the dependent variable and refers to the median house value per block group
</pre>

What's more, we also learn that all the block groups have zero entries for the independent and dependent variables have been excluded from the data.

The Median house value is the dependent variable and will be assigned the role of the target variable in our ML model.

In [42]:
!pip install pyspark

You should consider upgrading via the '/opt/conda/bin/python3.7 -m pip install --upgrade pip' command.[0m


In [89]:
import os
import pandas as pd
import numpy as np

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.mllib.evaluation import RegressionMetrics

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator

import matplotlib.pyplot as plt

spark = SparkSession.builder.master("local[2]").appName("Linear-Regression-California-Housing").getOrCreate()
path = '../input/hausing-data/cal_housing.data'

schema = StructType([
    StructField("long", FloatType(), nullable=True),
    StructField("lat", FloatType(), nullable=True),
    StructField("medage", FloatType(), nullable=True),
    StructField("totrooms", FloatType(), nullable=True),
    StructField("totbdrms", FloatType(), nullable=True),
    StructField("pop", FloatType(), nullable=True),
    StructField("houshlds", FloatType(), nullable=True),
    StructField("medinc", FloatType(), nullable=True),
    StructField("medhv", FloatType(), nullable=True)]
)

housing_df = spark.read.csv(path=path, schema=schema).cache()

In [90]:
housing_df.show(5)

+-------+-----+------+--------+--------+------+--------+------+--------+
|   long|  lat|medage|totrooms|totbdrms|   pop|houshlds|medinc|   medhv|
+-------+-----+------+--------+--------+------+--------+------+--------+
|-122.23|37.88|  41.0|   880.0|   129.0| 322.0|   126.0|8.3252|452600.0|
|-122.22|37.86|  21.0|  7099.0|  1106.0|2401.0|  1138.0|8.3014|358500.0|
|-122.24|37.85|  52.0|  1467.0|   190.0| 496.0|   177.0|7.2574|352100.0|
|-122.25|37.85|  52.0|  1274.0|   235.0| 558.0|   219.0|5.6431|341300.0|
|-122.25|37.85|  52.0|  1627.0|   280.0| 565.0|   259.0|3.8462|342200.0|
+-------+-----+------+--------+--------+------+--------+------+--------+
only showing top 5 rows



In [91]:
housing_df = housing_df.withColumn("roomsperhouse",col('totrooms')/col('houshlds'))
housing_df = housing_df.withColumn("bedroomsperhouse",col('totbdrms')/col('houshlds'))
housing_df = housing_df.withColumn("popperhouse",col('pop')/col('houshlds'))

In [92]:
feature_cols = ['roomsperhouse', 'bedroomsperhouse','popperhouse','medinc','pop','totbdrms','houshlds']

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
assembled_df = assembler.transform(housing_df)

standardScaler = StandardScaler(inputCol="features", outputCol="scaled")
assembled_df = standardScaler.fit(assembled_df).transform(assembled_df)

train_data, test_data = assembled_df.randomSplit([.8,.2])

In [95]:
rf = RandomForestRegressor(featuresCol="scaled",labelCol='medhv')
rfModel = rf.fit(train_data)
predictions = rfModel.transform(test_data)
predictions.show(20)

+-------+-----+------+--------+--------+------+--------+------+--------+------------------+------------------+------------------+--------------------+--------------------+------------------+
|   long|  lat|medage|totrooms|totbdrms|   pop|houshlds|medinc|   medhv|     roomsperhouse|  bedroomsperhouse|       popperhouse|            features|              scaled|        prediction|
+-------+-----+------+--------+--------+------+--------+------+--------+------------------+------------------+------------------+--------------------+--------------------+------------------+
| -124.3| 41.8|  19.0|  2672.0|   552.0|1298.0|   478.0|1.9797| 85800.0| 5.589958158995816|1.1548117154811715| 2.715481171548117|[5.58995815899581...|[2.25932375949100...| 117945.6901407779|
| -124.3|41.84|  17.0|  2677.0|   531.0|1244.0|   456.0|3.0313|103600.0| 5.870614035087719|1.1644736842105263|2.7280701754385963|[5.87061403508771...|[2.37275796974081...|130713.06259105576|
|-124.27|40.69|  36.0|  2349.0|   528.0|1194.

In [96]:
evaluator = RegressionEvaluator(
    labelCol="medhv", predictionCol="prediction", metricName="rmse")

mse = evaluator.evaluate(predictions)

print("MSE:",mse)

MSE: 73605.18643688427


In [69]:
spark.stop()

### CREDITS:
THIS NOTEBOOK IS HEAVILY INSPIRED BY THE ONE BY FATMAKURSUN WHICH YOU CAN FIND HERE https://www.kaggle.com/fatmakursun/pyspark-ml-tutorial-for-beginners. SOME BLOCKS OF CODE, FOR EXAMPLE LOADING THE DATASET, ARE TAKEN DIRECTLY FROM IT.