## 1. Create Spark Program on Jupyter

Import the SparkSession module from pyspark.sql and build a **SparkSession** with the __`builder()`__ method. Afterwards, you can set the master URL to connect to, the application name, add some additional configuration like the executor memory and then lastly, use __`getOrCreate()`__ to either get the current Spark session or to create one if there is none running

In [1]:
# Import SparkSession
from pyspark.sql import SparkSession

# Build the SparkSession
spark = SparkSession.builder \
   .master("local") \
   .appName("Linear Regression Model") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
   
sc = spark.sparkContext

These spatial data contain 20,640 observations on housing prices with 9 economic variables:

* __Longitude__ refers to the angular distance of a geographic place north or south of the earth’s equator for each block group;
* __Latitude__ refers to the angular distance of a geographic place east or west of the earth’s equator for each block group;
* __Housing median age__ is the median age of the people that belong to a block group. Note that the median is the value that lies at the midpoint of a frequency distribution of observed values;
* __Total rooms__ is the total number of rooms in the houses per block group;
* __Total bedrooms__ is the total number of bedrooms in the houses per block group;
* __Population__ is the number of inhabitants of a block group;
* __Households__ refers to units of houses and their occupants per block group;
* __Median income__ is used to register the median income of people that belong to a block group; And,
* __Median house value__ is the dependent variable and refers to the median house value per block group.

The __Median house value__ is the dependent variable and will be assigned the role of the target variable in your ML model.

Next, you’ll use the __`textFile()`__ method to read in the data from the folder that you downloaded it to RDDs. This method takes an URI for the file, which is in this case the local path of your machine, and reads it as a collection of lines. For all convenience, you’ll not only read in the .data file, but also the .domain file that contains the header. This will allow you to double check the order of the variables.

In [2]:
# Load in the data
rdd = sc.textFile('data/CaliforniaHousing/cal_housing.data')

# Load in the header
header = sc.textFile('data/CaliforniaHousing/cal_housing.domain')

## 2. Data Exploration
Important to understand here is that, because Spark’s execution is __“lazy”__ execution, nothing has been executed yet. Your data hasn’t been actually read in. The rdd and header variables are actually just concepts in your mind. You have to push Spark to work for you, so let’s use the __`collect()`__ method to look at the header:

In [3]:
header.collect()

['longitude: continuous.',
 'latitude: continuous.',
 'housingMedianAge: continuous. ',
 'totalRooms: continuous. ',
 'totalBedrooms: continuous. ',
 'population: continuous. ',
 'households: continuous. ',
 'medianIncome: continuous. ',
 'medianHouseValue: continuous. ']

The `collect()` method brings the entire RDD to a single machine, and you’ll get to see the above result.

__Tip:__ be careful when using `collect()`! Running this line of code can possibly cause the driver to run out of memory. That’s why the following approach with `the take()` method is a safer approach if you want to just print a few elements of the RDD. In general, it’s a good principle to limit your result set whenever possible, just like when you’re using SQL.

In [5]:
rdd.take(2)

['-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 '-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000']

You’ll use the __`map()`__ function again and another lambda function in which you’ll map each entry to a field in a Row.

With this SchemaRDD in place, you can easily convert the RDD to a DataFrame with the __`toDF()`__ method.

In [6]:
# Import the necessary modules 
from pyspark.sql import Row

# Map the RDD to a DF
df = rdd.map(lambda line: Row(longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

Now that you have your DataFrame df, you can inspect it with the methods that you have also used before, namely `first()` and `take()`, but also with `head()` and `show()`:

In [7]:
# Show the top 20 rows 
df.show()

+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|households|housingMedianAge|latitude|longitude|medianHouseValue|medianIncome|population|totalBedRooms|totalRooms|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|         3|               2|       1|        -|               0|           0|         2|            .|         2|
|         2|               2|       1|        -|               0|           0|         2|            .|         2|
|         4|               2|       1|        -|               0|           0|         2|            .|         2|
|         5|               2|       1|        -|               0|           0|         2|            .|         2|
|         5|               2|       1|        -|               0|           0|         2|            .|         2|
|         5|               2|       1|        -|               0|           0|  

In [9]:
# Show the columns of your DataFrame
df.columns

['households',
 'housingMedianAge',
 'latitude',
 'longitude',
 'medianHouseValue',
 'medianIncome',
 'population',
 'totalBedRooms',
 'totalRooms']

In [10]:
# Print the data types of all `df` columns
# df.dtypes

# Print the schema of `df`
df.printSchema()

root
 |-- households: string (nullable = true)
 |-- housingMedianAge: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- medianHouseValue: string (nullable = true)
 |-- medianIncome: string (nullable = true)
 |-- population: string (nullable = true)
 |-- totalBedRooms: string (nullable = true)
 |-- totalRooms: string (nullable = true)



Since all the columns are in **String** type, we need to convert them to **Float**. We'll write a function to convert data

In [11]:
# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 

# Assign all column names to `columns`
columns = ['households', 'housingMedianAge', 'latitude', 'longitude', 'medianHouseValue', 'medianIncome', 'population', 'totalBedRooms', 'totalRooms']

# Conver the `df` columns to `FloatType()`
df = convertColumn(df, columns, FloatType())

In [12]:
# Print the schema of `df`
df.printSchema()

root
 |-- households: float (nullable = true)
 |-- housingMedianAge: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- medianHouseValue: float (nullable = true)
 |-- medianIncome: float (nullable = true)
 |-- population: float (nullable = true)
 |-- totalBedRooms: float (nullable = true)
 |-- totalRooms: float (nullable = true)



In [13]:
df.select('population','totalBedRooms').show(10)

+----------+-------------+
|population|totalBedRooms|
+----------+-------------+
|       2.0|         null|
|       2.0|         null|
|       2.0|         null|
|       2.0|         null|
|       2.0|         null|
|       2.0|         null|
|       2.0|         null|
|       2.0|         null|
|       2.0|         null|
|       2.0|         null|
+----------+-------------+
only showing top 10 rows



In [15]:
df.groupBy("housingMedianAge").count().sort("housingMedianAge",ascending=False).show()

+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
|             2.0| 8207|
|             1.0|12433|
+----------------+-----+



In [16]:
df.describe().show()

+-------+-----------------+-------------------+--------+---------+----------------+------------+------------------+-------------+-----------------+
|summary|       households|   housingMedianAge|latitude|longitude|medianHouseValue|medianIncome|        population|totalBedRooms|       totalRooms|
+-------+-----------------+-------------------+--------+---------+----------------+------------+------------------+-------------+-----------------+
|  count|            20640|              20640|   20640|        0|           20640|       20640|             20640|            0|            20640|
|   mean| 4.54312015503876| 1.3976259689922481|     1.0|     null|             0.0|         0.0|3.8197674418604652|         null|5.166036821705426|
| stddev|2.873140433308782|0.48941920984696774|     0.0|     null|             0.0|         0.0| 2.923293251863948|         null| 3.14998088304445|
|    min|              0.0|                1.0|     1.0|     null|             0.0|         0.0|               0

## 3. Data Processing
### Preprocessing The Target Values
First, let’s start with the `medianHouseValue`, your dependent variable. To facilitate your working with the target values, you will express the house values in units of `100,000`. That means that a target such as `452600.000000` should become `4.526`:

In [17]:
## DISCLAIMERS: THIS IS NOT NECESSARY SINCE THE MEDIANHOUSEVALUE IS ALREADY PREPROCESSED

# Import all from `sql.functions` 
from pyspark.sql.functions import *

# Adjust the values of `medianHouseValue`
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)

# Show the first 2 lines of `df`
df.take(2)

[Row(households=3.0, housingMedianAge=2.0, latitude=1.0, longitude=None, medianHouseValue=0.0, medianIncome=0.0, population=2.0, totalBedRooms=None, totalRooms=2.0),
 Row(households=2.0, housingMedianAge=2.0, latitude=1.0, longitude=None, medianHouseValue=0.0, medianIncome=0.0, population=2.0, totalBedRooms=None, totalRooms=2.0)]

### Feature Engineering
You’re going to add the following columns to the data set:

* `Rooms per household` which refers to the number of rooms in households per block group;
* `Population per household` which basically gives you an indication of how many people live in households per block group; And
* `Bedrooms per room` which will give you an idea about how many rooms are bedrooms per block group;

As you’re working with DataFrames, you can best use the `select()` method to select the columns that you’re going to be working with, namely `totalRooms`, `households`, and `population`

In [18]:
# Import all from `sql.functions` if you haven't yet
from pyspark.sql.functions import *

# Divide `totalRooms` by `households`
roomsPerHousehold = df.select(col("totalRooms")/col("households"))

# Divide `population` by `households`
populationPerHousehold = df.select(col("population")/col("households"))

# Divide `totalBedRooms` by `totalRooms`
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))

# Add the new columns to `df`
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
   .withColumn("populationPerHousehold", col("population")/col("households")) \
   .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))
   
# Inspect the result
df.first()

Row(households=3.0, housingMedianAge=2.0, latitude=1.0, longitude=None, medianHouseValue=0.0, medianIncome=0.0, population=2.0, totalBedRooms=None, totalRooms=2.0, roomsPerHousehold=0.6666666666666666, populationPerHousehold=0.6666666666666666, bedroomsPerRoom=None)

In [19]:
# Re-order and select columns for the analysis
df = df.select("medianHouseValue", 
              "totalBedRooms", 
              "population", 
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
              "populationPerHousehold", 
              "bedroomsPerRoom")

### Standardization
Now that you have re-ordered the data, you’re ready to normalize the data. Or almost, at least. There is just one more step that you need to go through: separating the features from the target variable. In essence, this boils down to isolating the first column in your DataFrame from the rest of the columns.

In this case, you’ll use the `map()` function that you use with RDDs to perform this action. You also see that you make use of the `DenseVector()` function. A dense vector is a local vector that is backed by a double array that represents its entry values. In other words, it's used to store arrays of values for use in PySpark.

Next, you go back to making a DataFrame out of the `input_data` and you re-label the columns by passing a list as a second argument. This list consists of the column names `"label"` and `"features"`:

In [20]:
# Import `DenseVector`
from pyspark.ml.linalg import DenseVector

# Define the `input_data` 
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Replace `df` with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])

Next, you can finally scale the data. You can use Spark ML to do this: this library will make machine learning on big data scalable and easy. You’ll find tools such as ML algorithms and everything you need to build practical ML pipelines. In this case, you don’t need to do that much preprocessing so a pipeline would maybe be overkill, but if you want to look into it, definitely consider visiting the [this page](https://spark.apache.org/docs/latest/ml-pipeline.html)

The input columns are the features, and the output column with the rescaled that will be included in the `scaled_df` will be named `"features_scaled"`

Let’s take a look at your DataFrame and the result. You see that, indeed, a third column `features_scaled` was added to your DataFrame, which you can use to compare with features:

In [21]:
# Import `StandardScaler` 
from pyspark.ml.feature import StandardScaler

# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)

# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df)

# Inspect the result
scaled_df.take(2)

[Row(label=0.0, features=DenseVector([nan, 2.0, 3.0, 0.0, 0.6667, 0.6667, nan]), features_scaled=DenseVector([nan, 0.6842, 1.0442, 0.0, nan, nan, nan])),
 Row(label=0.0, features=DenseVector([nan, 2.0, 2.0, 0.0, 1.0, 1.0, nan]), features_scaled=DenseVector([nan, 0.6842, 0.6961, 0.0, nan, nan, nan]))]

## Building A Machine Learning Model With Spark ML
With all the preprocessing done, it’s finally time to start building your Linear Regression model! Just like always, you first need to split the data into training and test sets. Luckily, this is no issue with the `randomSplit()` method:

In [22]:
# Split the data into train and test sets
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)

__Note__ that the argument `elasticNetParam` corresponds to `α` or the vertical intercept and that the `regParam` or the regularization paramater corresponds to `λ`. Go [here](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.regression.LinearRegression) for more information.

In [23]:
# Import `LinearRegression`
from pyspark.ml.regression import LinearRegression

# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the data to the model
linearModel = lr.fit(train_data)

With your model in place, you can generate predictions for your test data: use the `transform()` method to predict the labels for your `test_data`. Then, you can use RDD operations to extract the predictions as well as the true labels from the DataFrame and zip these two values together in a list called `predictionAndLabel`

In [24]:
# Generate predictions
predicted = linearModel.transform(test_data)

# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()

# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel[:5]

[(nan, 0.0), (nan, 0.0), (nan, 0.0), (nan, 0.0), (nan, 0.0)]

## Evaluating the Model
Looking at predicted values is one thing, but another and better thing is looking at some metrics to get a better idea of how good your model actually is. You can first start by printing out the coefficients and the intercept of the model:

In [25]:
# Coefficients for the model
linearModel.coefficients

# Intercept for the model
linearModel.intercept

0.0

In [26]:
# Get the RMSE
linearModel.summary.rootMeanSquaredError

# Get the R2
linearModel.summary.r2

nan

## Before You Go…
Before you go, make sure to stop the SparkSession with the following line of code:

In [27]:
spark.stop()