# BDPP Course lab 2
### Machine Learning With Spark ML
In this lab assignment, you will complete a project by going through the following steps:
1. Get the data.
2. Discover the data to gain insights.
3. Prepare the data for Machine Learning algorithms.
4. Select a model and train it.
5. Fine-tune your model.
6. Present your solution.

Before we start, I suggest you take a look at the following programming guides:
- [pyspark SQL and DataFrames](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html)
- [Machine Learning Library (MLlib) Guide](https://spark.apache.org/docs/latest/ml-guide.html)
    - [Extracting, transforming and selecting features](https://spark.apache.org/docs/latest/ml-features)
    - [Classification and regression](https://spark.apache.org/docs/latest/ml-classification-regression.html)
    - [Model selection and tuning](https://spark.apache.org/docs/latest/ml-tuning.html)

As a dataset, we use the California Housing Prices dataset from the StatLib repository. This dataset was based on data from the 1990 California census. The dataset has the following columns
1. `longitude`: a measure of how far west a house is (a higher value is farther west)
2. `latitude`: a measure of how far north a house is (a higher value is farther north)
3. `housing_,median_age`: the median age of a house within a block (a lower number is a newer building)
4. `total_rooms`: total number of rooms within a block
5. `total_bedrooms`: total number of bedrooms within a block
6. `population`: total number of people residing within a block
7. `households`: total number of households, a group of people residing within a home unit, for a block
8. `median_income`: median income for households within a block of houses
9. `median_house_value`: median house value for households within a block
10. `ocean_proximity`: location of the house w.r.t ocean/sea

---
# 1. Get the data
Let's start the lab by loading the dataset. You can find the dataset at `data/housing.csv`. To infer column types automatically, when you are reading the file, you need to set `inferSchema` to true. Moreover, you need to enable the `header` option to read the columns' names from the file.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("lab2") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [4]:
#housing = spark.read.load("gs://ajmal1/housing.csv",format="csv", sep=",", inferSchema="true", header="true")
traindata = spark.read.load("gs://ajmal1/data_frame.csv",format="csv", sep=",", inferSchema=True, header=True)


Py4JJavaError: An error occurred while calling o75.load.
: java.io.IOException: No FileSystem for scheme: gs
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:547)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.immutable.List.flatMap(List.scala:355)
	at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [5]:
type(housing)

pyspark.sql.dataframe.DataFrame

---
# 2. Discover the data to gain insights
Now it is time to take a look at the data. In this step, we are going to take a look at the data in a few different ways:
* See the schema and dimension of the dataset
* Look at the data itself
* Statistical summary of the attributes
* Breakdown of the data by the categorical attribute variable
* Find the correlation among different attributes
* Make new attributes by combining existing attributes

## 2.1. Schema and dimension
Print the schema of the dataset

In [6]:
housing.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



Print the number of records in the dataset.

In [7]:
housing.count()

20640

## 2.2. Look at the data
Print the first five records of the dataset.

In [6]:
housing.take(5)

[Row(longitude=-122.23, latitude=37.88, housing_median_age=41.0, total_rooms=880.0, total_bedrooms=129.0, population=322.0, households=126.0, median_income=8.3252, median_house_value=452600.0, ocean_proximity='NEAR BAY'),
 Row(longitude=-122.22, latitude=37.86, housing_median_age=21.0, total_rooms=7099.0, total_bedrooms=1106.0, population=2401.0, households=1138.0, median_income=8.3014, median_house_value=358500.0, ocean_proximity='NEAR BAY'),
 Row(longitude=-122.24, latitude=37.85, housing_median_age=52.0, total_rooms=1467.0, total_bedrooms=190.0, population=496.0, households=177.0, median_income=7.2574, median_house_value=352100.0, ocean_proximity='NEAR BAY'),
 Row(longitude=-122.25, latitude=37.85, housing_median_age=52.0, total_rooms=1274.0, total_bedrooms=235.0, population=558.0, households=219.0, median_income=5.6431, median_house_value=341300.0, ocean_proximity='NEAR BAY'),
 Row(longitude=-122.25, latitude=37.85, housing_median_age=52.0, total_rooms=1627.0, total_bedrooms=280.0,

Print the number of records with a population of more than 10000.

In [8]:
housing.filter(housing['population']> 10000).count()

23

In [9]:
housing.filter(housing['population']> 10000).show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -121.92|   37.53|               7.0|    28258.0|        3864.0|   12203.0|    3701.0|       8.4045|          451100.0|      <1H OCEAN|
|  -117.78|   34.03|               8.0|    32054.0|        5290.0|   15507.0|    5050.0|       6.0191|          253900.0|      <1H OCEAN|
|  -117.87|   34.04|               7.0|    27700.0|        4179.0|   15037.0|    4072.0|       6.6288|          339700.0|      <1H OCEAN|
|  -117.88|   33.96|              16.0|    19059.0|        3079.0|   10988.0|    3061.0|       5.5469|          265200.0|      <1H OCEAN|
|  -118.78|   34.16|              

## 2.3. Statistical summary
Print a summary of the table statistics for the attributes `housing_median_age`, `total_rooms`, `median_house_value`, and `population`. You can use the `describe` command.

In [11]:
housing.describe(['housing_median_age', 'total_rooms', 'median_house_value', 'population']).show()

+-------+------------------+------------------+------------------+------------------+
|summary|housing_median_age|       total_rooms|median_house_value|        population|
+-------+------------------+------------------+------------------+------------------+
|  count|             20640|             20640|             20640|             20640|
|   mean|28.639486434108527|2635.7630813953488|206855.81690891474|1425.4767441860465|
| stddev| 12.58555761211163|2181.6152515827944|115395.61587441359|  1132.46212176534|
|    min|               1.0|               2.0|           14999.0|               3.0|
|    max|              52.0|           39320.0|          500001.0|           35682.0|
+-------+------------------+------------------+------------------+------------------+



Print the maximum age (`housing_median_age`), the minimum number of rooms (`total_rooms`), and the average of house values (`median_house_value`).

In [12]:
from pyspark.sql.functions import *

housing.groupBy().max('housing_median_age').show()
housing.groupBy().min('total_rooms').show()
housing.groupBy().avg('median_house_value').show()

+-----------------------+
|max(housing_median_age)|
+-----------------------+
|                   52.0|
+-----------------------+

+----------------+
|min(total_rooms)|
+----------------+
|             2.0|
+----------------+

+-----------------------+
|avg(median_house_value)|
+-----------------------+
|     206855.81690891474|
+-----------------------+



## 2.4. Breakdown the data by categorical data
Print the number of houses in different areas (`ocean_proximity`), and sort them in descending order. You can use [groupBy](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=dataframe#pyspark.sql.DataFrame.groupBy) and [orderBy](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=dataframe#pyspark.sql.DataFrame.orderBy) functions.

In [28]:
housing.groupBy("ocean_proximity").count().orderBy(["count"], ascending=False).show()

+---------------+-----+
|ocean_proximity|count|
+---------------+-----+
|      <1H OCEAN| 9136|
|         INLAND| 6551|
|     NEAR OCEAN| 2658|
|       NEAR BAY| 2290|
|         ISLAND|    5|
+---------------+-----+



Your output should look like this:
```
+---------------+-----+
|ocean_proximity|count|
+---------------+-----+
|      <1H OCEAN| 9136|
|         INLAND| 6551|
|     NEAR OCEAN| 2658|
|       NEAR BAY| 2290|
|         ISLAND|    5|
+---------------+-----+
```

Print the average value of the houses (`median_house_value`) in different areas (`ocean_proximity`), and call the new column `avg_value` when printing it. You can use [agg](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=dataframe#pyspark.sql.DataFrame.agg) function.

In [50]:
# TODO: Replace <FILL IN> with appropriate code

from pyspark.sql.functions import avg

# housing.groupBy("ocean_proximity").avg("median_house_value").show()

housing \
    .groupBy("ocean_proximity") \
    .agg(avg("median_house_value").alias("avg_value")) \
    .orderBy(["avg_value"], ascending=False) \
    .show()

+---------------+------------------+
|ocean_proximity|         avg_value|
+---------------+------------------+
|         ISLAND|          380440.0|
|       NEAR BAY|259212.31179039303|
|     NEAR OCEAN|249433.97742663656|
|      <1H OCEAN|240084.28546409807|
|         INLAND|124805.39200122119|
+---------------+------------------+



Your output should look like this:
```
+---------------+------------------+
|ocean_proximity|         avg_value|
+---------------+------------------+
|         ISLAND|          380440.0|
|       NEAR BAY|259212.31179039303|
|     NEAR OCEAN|249433.97742663656|
|      <1H OCEAN|240084.28546409807|
|         INLAND|124805.39200122119|
+---------------+------------------+
```

Rewrite the above question in SQL.

In [57]:
housing.createOrReplaceTempView("df")

spark.sql(\
          "SELECT ocean_proximity, \
          AVG(median_house_value) as avg_value \
          FROM df \
          GROUP BY ocean_proximity \
          ORDER BY avg_value DESC").show()

+---------------+------------------+
|ocean_proximity|         avg_value|
+---------------+------------------+
|         ISLAND|          380440.0|
|       NEAR BAY|259212.31179039303|
|     NEAR OCEAN|249433.97742663656|
|      <1H OCEAN|240084.28546409807|
|         INLAND|124805.39200122119|
+---------------+------------------+



You should get the same output as the previous question.

## 2.5. Correlation among attributes
Print the correlation among the attributes `housing_median_age`, `total_rooms`, `median_house_value`, and `population`. To do so, first, you need to put these attributes into one vector. Then, compute the standard correlation coefficient (Pearson) between every pair of attributes in this new vector. To make a vector of these attributes, you can use the [VectorAssembler](https://spark.apache.org/docs/latest/ml-features#vectorassembler) Transformer.

In [59]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["housing_median_age", "total_rooms", "median_house_value", "population"],
    outputCol="features")

housingAttrs = assembler.transform(housing)

housingAttrs.select("features").show(5)

+--------------------+
|            features|
+--------------------+
|[41.0,880.0,45260...|
|[21.0,7099.0,3585...|
|[52.0,1467.0,3521...|
|[52.0,1274.0,3413...|
|[52.0,1627.0,3422...|
+--------------------+
only showing top 5 rows



Your output should look like this:
```
+--------------------+
|            features|
+--------------------+
|[41.0,880.0,45260...|
|[21.0,7099.0,3585...|
|[52.0,1467.0,3521...|
|[52.0,1274.0,3413...|
|[52.0,1627.0,3422...|
+--------------------+
only showing top 5 rows
```

In [70]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation

r1 = Correlation.corr(dataset=housingAttrs, column="features", method="pearson").collect()[0]

print("correlation matrix:\n" + str(r1[0]))

correlation matrix:
DenseMatrix([[ 1.        , -0.3612622 ,  0.10562341, -0.29624424],
             [-0.3612622 ,  1.        ,  0.13415311,  0.85712597],
             [ 0.10562341,  0.13415311,  1.        , -0.02464968],
             [-0.29624424,  0.85712597, -0.02464968,  1.        ]])


Your output should look like this:
```
correlation matrix:
DenseMatrix([[ 1.        , -0.3612622 ,  0.10562341, -0.29624424],
             [-0.3612622 ,  1.        ,  0.13415311,  0.85712597],
             [ 0.10562341,  0.13415311,  1.        , -0.02464968],
             [-0.29624424,  0.85712597, -0.02464968,  1.        ]])
```

## 2.6. Combine and make new attributes
Now, let's try out various attribute combinations. In the given dataset, the total number of rooms in a block is not very useful if we don't know how many households there are. What we really want is the number of rooms per household. Similarly, the total number of bedrooms by itself is not very useful, and we want to compare it to the number of rooms. And the population per household seems like also an interesting attribute combination to look at. To do so, add the three new columns to the dataset as below. We will call the new dataset the `housingExtra`.
```
rooms_per_household = total_rooms / households
bedrooms_per_room = total_bedrooms / total_rooms
population_per_household = population / households
```

In [77]:
housingCol1 = housing.withColumn("rooms_per_household", col("total_rooms") / col("households"))
housingCol2 = housingCol1.withColumn("bedrooms_per_room", col("total_bedrooms") / col("total_rooms"))
housingExtra = housingCol2.withColumn("population_per_household", col("population") / col("households"))

housingExtra.select("rooms_per_household", "bedrooms_per_room", "population_per_household").show(5)

+-------------------+-------------------+------------------------+
|rooms_per_household|  bedrooms_per_room|population_per_household|
+-------------------+-------------------+------------------------+
|  6.984126984126984|0.14659090909090908|      2.5555555555555554|
|  6.238137082601054|0.15579659106916466|       2.109841827768014|
|  8.288135593220339|0.12951601908657123|      2.8022598870056497|
| 5.8173515981735155|0.18445839874411302|       2.547945205479452|
|  6.281853281853282| 0.1720958819913952|      2.1814671814671813|
+-------------------+-------------------+------------------------+
only showing top 5 rows



Your output should look like this:
```
+-------------------+-------------------+------------------------+
|rooms_per_household|  bedrooms_per_room|population_per_household|
+-------------------+-------------------+------------------------+
|  6.984126984126984|0.14659090909090908|      2.5555555555555554|
|  6.238137082601054|0.15579659106916466|       2.109841827768014|
|  8.288135593220339|0.12951601908657123|      2.8022598870056497|
| 5.8173515981735155|0.18445839874411302|       2.547945205479452|
|  6.281853281853282| 0.1720958819913952|      2.1814671814671813|
+-------------------+-------------------+------------------------+
only showing top 5 rows
```

---
## 3. Prepare the data for Machine Learning algorithms
Before going through the Machine Learning steps, let's first rename the label column from `median_house_value` to `label`.

In [84]:
renamedHousing = housingExtra.withColumnRenamed("median_house_value", "label")
renamedHousing.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'label',
 'ocean_proximity',
 'rooms_per_household',
 'bedrooms_per_room',
 'population_per_household']

Now, we want to separate the numerical attributes from the categorical attribute (`ocean_proximity`) and keep their column names in two different lists. Moreover, since we don't want to apply the same transformations to the predictors (features) and the label, we should remove the label attribute from the list of predictors. 

In [82]:
# label columns
colLabel = "label"

# categorical columns
colCat = "ocean_proximity"

#numerical columns
colNum = list(filter(lambda x: x != colLabel and x !=colCat  , renamedHousing.columns))

In [85]:
colNum

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'rooms_per_household',
 'bedrooms_per_room',
 'population_per_household']

## 3.1. Prepare continuse attributes
### Data cleaning
Most Machine Learning algorithms cannot work with missing features, so we should take care of them. As a first step, let's find the columns with missing values in the numerical attributes. To do so, we can print the number of missing values of each continuous attributes, listed in `colNum`.

In [89]:
from pyspark.sql.functions import when, count, col

renamedHousing.agg(*[count(when(isnull(c), c)).alias(c) for c in colNum]).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+-------------------+-----------------+------------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|rooms_per_household|bedrooms_per_room|population_per_household|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+-------------------+-----------------+------------------------+
|        0|       0|                 0|          0|           207|         0|         0|            0|                  0|              207|                       0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+-------------------+-----------------+------------------------+



Your output should look like this:
```
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+-------------------+-----------------+------------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|rooms_per_household|bedrooms_per_room|population_per_household|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+-------------------+-----------------+------------------------+
|        0|       0|                 0|          0|           207|         0|         0|            0|                  0|              207|                       0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+-------------------+-----------------+------------------------+
```

As we observed above, the `total_bedrooms` and `bedrooms_per_room` attributes have some missing values. One way to take care of missing values is to use the `Imputer` Transformer, which completes missing values in a dataset, either using the mean or the median of the columns in which the missing values are located. To use it, you need to create an [Imputer](https://spark.apache.org/docs/latest/ml-features#imputer) instance, specifying that you want to replace each attribute's missing values with the "median" of that attribute.

In [92]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=["total_bedrooms", "bedrooms_per_room"], outputCols=["total_bedrooms", "bedrooms_per_room"])
imputer.setStrategy("median")

model = imputer.fit(renamedHousing)

imputedHousing = model.transform(renamedHousing)
imputedHousing.select("total_bedrooms", "bedrooms_per_room").show(5)

+--------------+-------------------+
|total_bedrooms|  bedrooms_per_room|
+--------------+-------------------+
|         129.0|0.14659090909090908|
|        1106.0|0.15579659106916466|
|         190.0|0.12951601908657123|
|         235.0|0.18445839874411302|
|         280.0| 0.1720958819913952|
+--------------+-------------------+
only showing top 5 rows



Your output should look like this:
```
+--------------+-------------------+
|total_bedrooms|  bedrooms_per_room|
+--------------+-------------------+
|         129.0|0.14659090909090908|
|        1106.0|0.15579659106916466|
|         190.0|0.12951601908657123|
|         235.0|0.18445839874411302|
|         280.0| 0.1720958819913952|
+--------------+-------------------+
only showing top 5 rows
```

In [93]:
######## Just checking if the new dataset does not have any missing values ##########
imputedHousing.agg(*[count(when(isnull(c), c)).alias(c) for c in colNum]).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+-------------------+-----------------+------------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|rooms_per_household|bedrooms_per_room|population_per_household|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+-------------------+-----------------+------------------------+
|        0|       0|                 0|          0|             0|         0|         0|            0|                  0|                0|                       0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+-------------------+-----------------+------------------------+



### Scaling
One of the most important transformations you need to apply to your data is feature scaling. With few exceptions, Machine Learning algorithms don't perform well when the input numerical attributes have very different scales. This is the case for the housing data: the total number of rooms ranges from about 6 to 39,320, while the median incomes only range from 0 to 15. Note that scaling the label attributes is generally not required.

One way to get all attributes to have the same scale is to use standardization. In standardization, for each value, first, it subtracts the mean value (so standardized values always have a zero mean). Then it divides by the variance so that the resulting distribution has a unit variance. To do this, we can use the `StandardScaler` Estimator. To use `StandardScaler`, again, we need to convert all the numerical attributes into a big vector of features using `VectorAssembler`, and then call `StandardScaler` on that vector.

In [102]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

assembler = VectorAssembler(inputCols=colNum, outputCol="features")

featuredHousing = assembler.transform(imputedHousing)

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withMean=True)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(featuredHousing)

# Normalize each feature to have unit standard deviation.
scaledHousing = scalerModel.transform(featuredHousing)
scaledHousing.select(["features", "scaledFeatures"]).show(5)

+--------------------+--------------------+
|            features|      scaledFeatures|
+--------------------+--------------------+
|[-122.23,37.88,41...|[-1.3278030546902...|
|[-122.22,37.86,21...|[-1.3228118684350...|
|[-122.24,37.85,52...|[-1.3327942409452...|
|[-122.25,37.85,52...|[-1.3377854272003...|
|[-122.25,37.85,52...|[-1.3377854272003...|
+--------------------+--------------------+
only showing top 5 rows



Your output should look like this:
```
+--------------------+--------------------+
|            features|      scaledFeatures|
+--------------------+--------------------+
|[-122.23,37.88,41...|[-1.3278030546902...|
|[-122.22,37.86,21...|[-1.3228118684350...|
|[-122.24,37.85,52...|[-1.3327942409452...|
|[-122.25,37.85,52...|[-1.3377854272003...|
|[-122.25,37.85,52...|[-1.3377854272003...|
+--------------------+--------------------+
only showing top 5 rows
```

In [103]:
import numpy as np
median_income = [row['scaledFeatures'] for row in scaledHousing.collect()]
print("std: " + str(np.std(median_income)) + " mean: " + str(np.mean(median_income)))

Exception ignored in: <function JavaWrapper.__del__ at 0x115733d90>
Traceback (most recent call last):
  File "/Users/Lucas/anaconda3/lib/python3.7/site-packages/pyspark/ml/wrapper.py", line 40, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'StandardScaler' object has no attribute '_java_obj'


std: 0.9999757749003724 mean: 6.392693906922685e-15


Your output should look like this:
```
std: 0.9999757749003724 mean: 6.392693906922685e-15
```
__Note__ that std ≈ 1, and mean ≈ 0, so the transformed data has a standard normal distribution.

## 3.2. Prepare categorical attributes
After imputing and scaling the continuous attributes, we should take care of the categorical attributes. Let's first print the number of distinct values of the categorical attribute `ocean_proximity`.

In [106]:
renamedHousing.select(["ocean_proximity"]).distinct().count()

5

You should het `5` as output.

### String indexer
Most Machine Learning algorithms prefer to work with numbers. So let's convert the categorical attribute `ocean_proximity` to numbers. To do so, we can use the [StringIndexer](https://spark.apache.org/docs/latest/ml-features#stringindexer) that encodes a string column of labels to a column of label indices. The indices are in [0, numLabels), ordered by label frequencies, so the most frequent label gets index 0.

In [107]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="ocean_proximity", outputCol="ocean_proximity_num")
idxHousing = indexer.fit(renamedHousing).transform(renamedHousing)
idxHousing.select(["ocean_proximity", "ocean_proximity_num"]).show(5)

+---------------+-------------------+
|ocean_proximity|ocean_proximity_num|
+---------------+-------------------+
|       NEAR BAY|                3.0|
|       NEAR BAY|                3.0|
|       NEAR BAY|                3.0|
|       NEAR BAY|                3.0|
|       NEAR BAY|                3.0|
+---------------+-------------------+
only showing top 5 rows



Your output should look like this:
```
+---------------+-------------------+
|ocean_proximity|ocean_proximity_num|
+---------------+-------------------+
|       NEAR BAY|                3.0|
|       NEAR BAY|                3.0|
|       NEAR BAY|                3.0|
|       NEAR BAY|                3.0|
|       NEAR BAY|                3.0|
+---------------+-------------------+
only showing top 5 rows
```

Now we can use this numerical data in any Machine Learning algorithm. You can look at the mapping that this encoder has learned using the `labels` method: "<1H OCEAN" is mapped to 0, "INLAND" is mapped to 1, etc.

In [108]:
indexer.fit(renamedHousing).labels

['<1H OCEAN', 'INLAND', 'NEAR OCEAN', 'NEAR BAY', 'ISLAND']

your output should look like this:
```
['<1H OCEAN', 'INLAND', 'NEAR OCEAN', 'NEAR BAY', 'ISLAND']
```

### One-hot encoding
Now, convert the label indices built in the last step into one-hot vectors. To do this, you can take advantage of the [OneHotEncoderEstimator](https://spark.apache.org/docs/latest/ml-features#onehotencoderestimator) Estimator.

In [114]:
from pyspark.ml.feature import OneHotEncoderEstimator

encoder = OneHotEncoderEstimator(inputCols=["ocean_proximity_num"], outputCols=["ocean_proximity_num_vec"])

model = encoder.fit(idxHousing)
ohHousing = model.transform(idxHousing)
ohHousing.select("ocean_proximity_num_vec").take(5)

[Row(ocean_proximity_num_vec=SparseVector(4, {3: 1.0})),
 Row(ocean_proximity_num_vec=SparseVector(4, {3: 1.0})),
 Row(ocean_proximity_num_vec=SparseVector(4, {3: 1.0})),
 Row(ocean_proximity_num_vec=SparseVector(4, {3: 1.0})),
 Row(ocean_proximity_num_vec=SparseVector(4, {3: 1.0}))]

---
# 4. Pipeline
As you can see, many data transformation steps need to be executed in the right order. For example, you called the `Imputer`, `VectorAssembler`, and `StandardScaler` from left to right. However, we can use the `Pipeline` class to define a sequence of Transformers/Estimators, and run them in order. A `Pipeline` is an `Estimator`; thus, after a Pipeline's `fit()` method runs, it produces a `PipelineModel`, which is a `Transformer`.

Now, let's create a pipeline called `numPipeline` to call the numerical transformers you built above (`imputer`, `assembler`, and `scaler`) in the right order from left to right, as well as a pipeline called `catPipeline` to call the categorical transformers (`indexer` and `encoder`). Then, put these two pipelines `numPipeline` and `catPipeline` into one pipeline.

In [116]:
from pyspark.ml import Pipeline

numPipeline = Pipeline(stages=[imputer, assembler, scaler])
catPipeline = Pipeline(stages=[indexer, encoder])

pipeline = Pipeline(stages=[numPipeline, catPipeline])

newHousing = pipeline.fit(renamedHousing).transform(renamedHousing)
newHousing.show(1)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+--------------------+-------------------+-----------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|            features|      scaledFeatures|ocean_proximity_num|ocean_proximity_num_vec|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+--------------------+-------------------+-----------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14659090909090

Your output should look like this:
```
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+---------------+-------------------+-------------------+------------------------+--------+--------------------+--------------------+-------------------+-----------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|   label|            features|      scaledFeatures|ocean_proximity_num|ocean_proximity_num_vec|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+---------------+-------------------+-------------------+------------------------+--------+--------------------+--------------------+-------------------+-----------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|       NEAR BAY|  6.984126984126984|0.14659090909090908|      2.5555555555555554|452600.0|[-122.23,37.88,41...|[-1.3278030546902...|                3.0|          (4,[3],[1.0])|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+---------------+-------------------+-------------------+------------------------+--------+--------------------+--------------------+-------------------+-----------------------+
only showing top 1 row
```

Now, use `VectorAssembler` to put all attributes of the final dataset `newHousing` into a big vector (called `final_features`), and finally rename the new column `features`.

In [151]:
len(newHousing.columns)

17

In [152]:
newHousing.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'label',
 'ocean_proximity',
 'rooms_per_household',
 'bedrooms_per_room',
 'population_per_household',
 'features',
 'scaledFeatures',
 'ocean_proximity_num',
 'ocean_proximity_num_vec']

In [159]:
##### SHOULD BE THE FIRST ONE #####

# va2 = VectorAssembler(inputCols=[column for column in newHousing.columns if column != "ocean_proximity"], outputCol="final_features")
va2 = VectorAssembler(inputCols=[column for column in newHousing.columns if column != "ocean_proximity" and column != "ocean_proximity_num"], outputCol="final_features")
# va2 = VectorAssembler(inputCols=["scaledFeatures"], outputCol="final_features")

temp1 = va2.transform(newHousing)

dataset = temp1.withColumn('features', temp1.final_features).select("features","label")
dataset.show(5)

+--------------------+--------+
|            features|   label|
+--------------------+--------+
|[-122.23,37.88,41...|452600.0|
|[-122.22,37.86,21...|358500.0|
|[-122.24,37.85,52...|352100.0|
|[-122.25,37.85,52...|341300.0|
|[-122.25,37.85,52...|342200.0|
+--------------------+--------+
only showing top 5 rows



Your output should look like this:
```
+--------------------+--------+
|            features|   label|
+--------------------+--------+
|[-1.3278030546902...|452600.0|
|[-1.3228118684350...|358500.0|
|[-1.3327942409452...|352100.0|
|[-1.3377854272003...|341300.0|
|[-1.3377854272003...|342200.0|
+--------------------+--------+
only showing top 5 rows
```

---
# 5. Make a model
Here we going to make four different regression models:
* Linear regression model
* Decision tree regression
* Random forest regression
* Gradient-booster forest regression

__Note:__ Based on how you set the model parameters, you may end up getting different results. The sample outputs provided in this section are based on default parameters.

But, before giving the data to train a Machine Learning model, let's first split the data into a training dataset (`trainSet`) with 80% of the whole data, and test dataset (`testSet`) with 20% of it.

In [160]:
# TODO: Replace <FILL IN> with appropriate code

# trainSet, testSet = dataset.<FILL IN>
trainSet, testSet = dataset.randomSplit([0.8, 0.2])

In [161]:
trainSet.count()

16567

In [162]:
testSet.count()

4073

## 5.1. Linear regression model
Now, train a Linear Regression model using the `LinearRegression` class. Then, print the coefficients and intercept of the model, as well as the summary of the model over the training set by calling the `summary` method.

In [173]:
# TODO: Replace <FILL IN> with appropriate code

from pyspark.ml.regression import LinearRegression

# lr = <FILL IN>
# lrModel = lr.<FILL IN>
# trainingSummary = lrModel.summary

lr = LinearRegression()
lrModel = lr.fit(trainSet)
trainingSummary = lrModel.summary



# print("Coefficients: %s" % str(<FILL IN>))
# print("Intercept: %s" % str(<FILL IN>))
# print("RMSE: %s" % str(<FILL IN>))

print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))
print("RMSE: %s" % str(trainingSummary.rootMeanSquaredError))

Coefficients: [0.1898439580724977,0.25588888676456223,-0.011032340151326994,-0.0001720809519210081,-0.0009287373835141284,0.0004644913034230214,3.513043713091677e-05,-0.17222597725898617,1.000015235810978,0.05460859797447862,-2.2946659176292368,-0.009964447512404544,0.1898439580724977,0.25588888676456223,-0.011032340151326994,-0.0001720809519210081,-0.0009287373835141284,0.0004644913034230214,3.513043713091677e-05,-0.17222597725898617,0.05460859797447862,-2.2946659176292368,-0.009964447512404544,0.38035863792846397,0.5465669290600951,-0.1388480430772074,-0.37541434377514493,-0.3895048971482989,0.5260188506215655,0.01343140285009548,-0.32719854742666654,0.13511119387469675,-0.13240123971780904,-0.10349122764207322,-176.68945960787937,-176.2154953202055,-175.7912904548835,-173.08242259784765]
Intercept: 202.9512084638021
RMSE: 3.329555328712852


In [174]:
len(lrModel.coefficients)

38

In [175]:
type(lrModel)

pyspark.ml.regression.LinearRegressionModel

Your output should be similar to this:
```
Coefficients: [-56639.642291914686,-57319.32110177786,14002.699108121004,3956.096007902445,2052.9245835397637,-44732.96113748317,43938.699408553475,78191.94760029572,8401.054030871619,16148.787588252462,716.5079271705835,-171681.91204667607,-205191.90252085018,-167767.18573795864,-176316.02615341634]
Intercept: 389569.42823539214
RMSE: 68419.64567910614
```

Now, use `RegressionEvaluator` to measure the root-mean-square-error (RMSE) of the model on the test dataset.

In [192]:
from pyspark.ml.evaluation import RegressionEvaluator
##  make predictions on the test data
predictions = lrModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

## select (prediction, true label) and compute test error.
evaluator = RegressionEvaluator()
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})

print( "Root Mean Squared Error (RMSE) on test data = "+ str(rmse))

+------------------+--------+--------------------+
|        prediction|   label|            features|
+------------------+--------+--------------------+
| 85801.52487999355| 85800.0|[-124.3,41.8,19.0...|
|103601.43026349347|103600.0|[-124.3,41.84,17....|
| 68401.02336265711| 68400.0|[-124.21,41.77,17...|
| 90099.47214626997| 90100.0|[-124.19,40.73,21...|
|116100.76426560341|116100.0|[-124.17,40.75,13...|
+------------------+--------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 3.3871250048891177


Your output should be similar to this:
```
+------------------+-------+--------------------+
|        prediction|  label|            features|
+------------------+-------+--------------------+
|202426.15921405185|94600.0|[-2.3859345407710...|
|178061.72929810936|79000.0|[-2.3460050507303...|
|190967.64201847612|90100.0|[-2.3060755606895...|
|169128.68031587286|69000.0|[-2.3060755606895...|
|147641.06793444577|67000.0|[-2.3010843744344...|
+------------------+-------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 65751.16868231572
```

## 5.2. Decision tree regression
Repeat what you have done on the Regression Model to build a Decision Tree model. Use the `DecisionTreeRegressor` to make a model and then measure its RMSE on the test dataset.

In [191]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor()

dtModel = dt.fit(trainSet)

predictions = dtModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

## select (prediction, true label) and compute test error.
evaluator = RegressionEvaluator()
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
print( "Root Mean Squared Error (RMSE) on test data = "+ str(rmse))

+------------------+--------+--------------------+
|        prediction|   label|            features|
+------------------+--------+--------------------+
| 84107.53295668549| 85800.0|[-124.3,41.8,19.0...|
|111931.88679245283|103600.0|[-124.3,41.84,17....|
| 68501.27450980392| 68400.0|[-124.21,41.77,17...|
| 95695.24733268672| 90100.0|[-124.19,40.73,21...|
|111931.88679245283|116100.0|[-124.17,40.75,13...|
+------------------+--------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 6159.283581340476


Your output should be similar to this:
```
+------------------+-------+--------------------+
|        prediction|  label|            features|
+------------------+-------+--------------------+
|176434.89089848308|94600.0|[-2.3859345407710...|
|144694.84987277354|79000.0|[-2.3460050507303...|
|176434.89089848308|90100.0|[-2.3060755606895...|
| 137376.0736196319|69000.0|[-2.3060755606895...|
|144694.84987277354|67000.0|[-2.3010843744344...|
+------------------+-------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 64764.20678010809
```

## 5.3. Random forest regression
Let's try the test error on a Random Forest Model. You can use the `RandomForestRegressor` to make a Random Forest model.

In [190]:
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor()

rfModel = rf.fit(trainSet)

predictions = rfModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

## select (prediction, true label) and compute test error.
evaluator = RegressionEvaluator()
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
print( "Root Mean Squared Error (RMSE) on test data = "+ str(rmse))

+------------------+--------+--------------------+
|        prediction|   label|            features|
+------------------+--------+--------------------+
|107238.67022378262| 85800.0|[-124.3,41.8,19.0...|
| 124281.1361730535|103600.0|[-124.3,41.84,17....|
|116074.46870678128| 68400.0|[-124.21,41.77,17...|
|135390.12083428708| 90100.0|[-124.19,40.73,21...|
|149488.96440911625|116100.0|[-124.17,40.75,13...|
+------------------+--------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 26345.111570706864


Your output should be similar to this:
```
+------------------+-------+--------------------+
|        prediction|  label|            features|
+------------------+-------+--------------------+
|196230.80181972618|94600.0|[-2.3859345407710...|
|155845.55604907026|79000.0|[-2.3460050507303...|
|167298.00808672755|90100.0|[-2.3060755606895...|
| 166686.4796195291|69000.0|[-2.3060755606895...|
| 142733.3221992184|67000.0|[-2.3010843744344...|
+------------------+-------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 63263.050055074156
```

## 5.4. Gradient-boosted tree regression
Finally, we want to build a Gradient-boosted Tree Regression model and test the RMSE of the test data. Use the `GBTRegressor` to make the model.

In [193]:
from pyspark.ml.regression import GBTRegressor
gb = GBTRegressor()

gbModel = gb.fit(trainSet)

predictions = gbModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

## select (prediction, true label) and compute test error.
evaluator = RegressionEvaluator()
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
print( "Root Mean Squared Error (RMSE) on test data = "+ str(rmse))

+------------------+--------+--------------------+
|        prediction|   label|            features|
+------------------+--------+--------------------+
| 83846.08683118374| 85800.0|[-124.3,41.8,19.0...|
|111868.51892276126|103600.0|[-124.3,41.84,17....|
| 67812.30873562481| 68400.0|[-124.21,41.77,17...|
| 93488.14221226449| 90100.0|[-124.19,40.73,21...|
|114013.55186304271|116100.0|[-124.17,40.75,13...|
+------------------+--------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 5578.952326557389


Your output should be similar to this:
```
+------------------+-------+--------------------+
|        prediction|  label|            features|
+------------------+-------+--------------------+
|115613.11054178125|94600.0|[-2.3859345407710...|
| 81328.79640266801|79000.0|[-2.3460050507303...|
|147905.92250882403|90100.0|[-2.3060755606895...|
| 85928.30116721884|69000.0|[-2.3060755606895...|
| 81399.31430268366|67000.0|[-2.3010843744344...|
+------------------+-------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 53622.69614407029
```

---
# 6. Hyperparameter tuning
An important task in Machine Learning is model selection or using data to find the best model or parameters for a given task. This is also called tuning. Tuning may be done for individual Estimators such as LinearRegression or for entire Pipelines, which include multiple algorithms, featurization, and other steps. Users can tune an entire Pipeline at once, rather than tuning each element in the Pipeline separately. MLlib supports model selection tools, such as [CrossValidator](https://spark.apache.org/docs/latest/ml-tuning.html#cross-validation). These tools require the following items:
* Estimator: algorithm or Pipeline to tune (`setEstimator`)
* Set of ParamMaps: parameters to choose from, sometimes called a "parameter grid" to search over (`setEstimatorParamMaps`)
* Evaluator: metric to measure how well a fitted Model does on held-out test data (`setEvaluator`)

`CrossValidator` begins by splitting the dataset into a set of folds, which are used as separate training and test datasets. For example, with `k=3` folds, `CrossValidator` will generate 3 (training, test) dataset pairs, each of which uses 2/3 of the data for training and 1/3 for testing. To evaluate a particular `ParamMap`, `CrossValidator` computes the average evaluation metric for the 3 Models produced by fitting the Estimator on the three different (training, test) dataset pairs. After identifying the best `ParamMap`, `CrossValidator` finally re-fits the Estimator using the best ParamMap and the entire dataset.

Below, use the `CrossValidator` to select the best Random Forest model. To do so, you need to define a grid of parameters. Let's say we want to search among the different number of trees (1, 5, and 10), and various tree depth (5, 10, and 15).

In [194]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [1, 5, 10]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .build()

evaluator = RegressionEvaluator().setMetricName("rmse")

cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator,numFolds=3)
cvModel = cv.fit(trainSet)

predictions = cvModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
print( "Root Mean Squared Error (RMSE) on test data = "+ str(rmse))

+------------------+--------+--------------------+
|        prediction|   label|            features|
+------------------+--------+--------------------+
| 89138.24928639391| 85800.0|[-124.3,41.8,19.0...|
|100434.17475728155|103600.0|[-124.3,41.84,17....|
| 64777.36943907157| 68400.0|[-124.21,41.77,17...|
| 89138.24928639391| 90100.0|[-124.19,40.73,21...|
|113250.37037037036|116100.0|[-124.17,40.75,13...|
+------------------+--------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 5996.397739384252


Your output should look like this:
```
+------------------+-------+--------------------+
|        prediction|  label|            features|
+------------------+-------+--------------------+
|          117870.0|94600.0|[-2.3859345407710...|
| 94230.00865800866|79000.0|[-2.3460050507303...|
|138148.57142857142|90100.0|[-2.3060755606895...|
|120944.28571428572|69000.0|[-2.3060755606895...|
| 77710.73015873015|67000.0|[-2.3010843744344...|
+------------------+-------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 51427.20882245195
```

---
# 7. Custom transformer
At the end of part two, we added extra columns to the `housing` dataset. Here, we are going to implement a Transformer to do the same task. The Transformer should take the name of two input columns `inputCol1` and `inputCol2`, as well as the name of output column `outputCol`. It, then, computes `inputCol1` divided by `inputCol2`, and adds its result as a new column to the dataset. You can get help from [here](https://stackoverflow.com/questions/32331848/create-a-custom-transformer-in-pyspark-ml).

First, define the given parameters of the Transformer and implement a method to validate their schemas (`StructType`).

In [204]:
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol

class MyTransformer(Transformer, HasInputCol, HasOutputCol):

    def __init__(self, inputCol1, inputCol2, outputCol):
        self.inputCol1 = inputCol1
        self.inputCol2 = inputCol2
        self.outputCol = outputCol
        
    def transform(self, dataset):
        return dataset.withColumn(self.outputCol, dataset[self.inputCol1] / dataset[self.inputCol2])


In [205]:
mytransformer = MyTransformer("total_rooms",'households','rooms_per_household')
mytransformer.transform(housing).select("rooms_per_household").show(5)

+-------------------+
|rooms_per_household|
+-------------------+
|  6.984126984126984|
|  6.238137082601054|
|  8.288135593220339|
| 5.8173515981735155|
|  6.281853281853282|
+-------------------+
only showing top 5 rows



Your output should look like this:
```
+-------------------+
|rooms_per_household|
+-------------------+
|  6.984126984126984|
|  6.238137082601054|
|  8.288135593220339|
| 5.8173515981735155|
|  6.281853281853282|
+-------------------+
only showing top 5 rows
```