# ML with PySpark: California Housing
In this notebook, I have done big data processing, analysis and ML with PySpark. Firstly, I have explored and preprocessed the dataset that I loaded in at the first step the help of DataFrames. About the dataset: It appeared in a 1997 paper titled Sparse Spatial Autoregressions, written by Pace, R. Kelley and Ronald Barry and published in the Statistics and Probability Letters journal. The researchers built this data set by using the 1990 California census data.

The data contains one row per census block group. A block group is the smallest geographical unit for which the U.S. Census Bureau publishes sample data (a block group typically has a population of 600 to 3,000 people). In this sample a block group on average includes 1425.5 individuals living in a geographically compact area. You’ll gather this information from this web page or by reading the paper which was mentioned above and which you can find here.

These spatial data contain 20,640 observations on housing prices with 9 economic variables:

__Longitude__ refers to the angular distance of a geographic place north or south of the earth’s equator for each block group;

__Latitude__ refers to the angular distance of a geographic place east or west of the earth’s equator for each block group;

__Housing median__ age is the median age of the people that belong to a block group. Note that the median is the value that lies at the midpoint of a frequency distribution of observed values;

__Total rooms__ is the total number of rooms in the houses per block group;

__Total bedrooms__ is the total number of bedrooms in the houses per block group;

__Population__ is the number of inhabitants of a block group;

__Households__ refers to units of houses and their occupants per block group;

__Median income__ is used to register the median income of people that belong to a block group;

__Median house value__ is the dependent variable and refers to the median house value per block group.




## 1. Running Spark and Data Loading

In [7]:
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

# Load in the data
rdd = sc.textFile('C:/Users/NicatZeynalov/Downloads/Documents/cal_housing.data')

# Load in the header
header = sc.textFile('C:/Users/NicatZeynalov/Downloads/Documents/cal_housing.domain')

## 2. Data Exploration

In [8]:
header.collect()

['longitude: continuous.',
 'latitude: continuous.',
 'housingMedianAge: continuous. ',
 'totalRooms: continuous. ',
 'totalBedrooms: continuous. ',
 'population: continuous. ',
 'households: continuous. ',
 'medianIncome: continuous. ',
 'medianHouseValue: continuous. ']

We have to be careful when using collect()! Running this line of code can possibly cause the driver to run out of memory. That’s why the following approach with the take() method is a safer approach if you want to just print a few elements of the RDD. In general, it’s a good principle to limit our result set whenever possible

In [9]:
rdd.take(10)

['-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 '-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000',
 '-122.240000,37.850000,52.000000,1467.000000,190.000000,496.000000,177.000000,7.257400,352100.000000',
 '-122.250000,37.850000,52.000000,1274.000000,235.000000,558.000000,219.000000,5.643100,341300.000000',
 '-122.250000,37.850000,52.000000,1627.000000,280.000000,565.000000,259.000000,3.846200,342200.000000',
 '-122.250000,37.850000,52.000000,919.000000,213.000000,413.000000,193.000000,4.036800,269700.000000',
 '-122.250000,37.840000,52.000000,2535.000000,489.000000,1094.000000,514.000000,3.659100,299200.000000',
 '-122.250000,37.840000,52.000000,3104.000000,687.000000,1157.000000,647.000000,3.120000,241400.000000',
 '-122.260000,37.840000,42.000000,2555.000000,665.000000,1206.000000,595.000000,2.080400,226700.000000',
 '-122.250000,37.840000,52.000000,3549.000000,707.000000,155

In [10]:
# Split lines on commas
rdd = rdd.map(lambda line: line.split(","))

# Inspect the first 2 lines 
rdd.take(2)

[['-122.230000',
  '37.880000',
  '41.000000',
  '880.000000',
  '129.000000',
  '322.000000',
  '126.000000',
  '8.325200',
  '452600.000000'],
 ['-122.220000',
  '37.860000',
  '21.000000',
  '7099.000000',
  '1106.000000',
  '2401.000000',
  '1138.000000',
  '8.301400',
  '358500.000000']]

Let's convert the RDD to a DataFrame

In [11]:
# Import the necessary modules 
from pyspark.sql import Row

# Map the RDD to a DF
df = rdd.map(lambda line: Row(longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

In [12]:
#returning the columns of dataframe
df.show()

+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| households|housingMedianAge| latitude|  longitude|medianHouseValue|medianIncome| population|totalBedRooms| totalRooms|
+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| 126.000000|       41.000000|37.880000|-122.230000|   452600.000000|    8.325200| 322.000000|   129.000000| 880.000000|
|1138.000000|       21.000000|37.860000|-122.220000|   358500.000000|    8.301400|2401.000000|  1106.000000|7099.000000|
| 177.000000|       52.000000|37.850000|-122.240000|   352100.000000|    7.257400| 496.000000|   190.000000|1467.000000|
| 219.000000|       52.000000|37.850000|-122.250000|   341300.000000|    5.643100| 558.000000|   235.000000|1274.000000|
| 259.000000|       52.000000|37.850000|-122.250000|   342200.000000|    3.846200| 565.000000|   280.000000|1627.000000|
| 193.000000|       52.000000|37

In [13]:
#returning the schema of dataframe
df.printSchema()

root
 |-- households: string (nullable = true)
 |-- housingMedianAge: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- medianHouseValue: string (nullable = true)
 |-- medianIncome: string (nullable = true)
 |-- population: string (nullable = true)
 |-- totalBedRooms: string (nullable = true)
 |-- totalRooms: string (nullable = true)



##### All columns are still of data type string… That’s disappointing!


In [14]:
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
  for name in names: 
     df = df.withColumn(name, df[name].cast(newType))
  return df 

# Assign all column names to `columns`
columns = ['households', 'housingMedianAge', 'latitude', 'longitude', 'medianHouseValue', 'medianIncome', 'population', 'totalBedRooms', 'totalRooms']

# Conver the `df` columns to `FloatType()`
df = convertColumn(df, columns, FloatType())

In [15]:
df.show()

+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|households|housingMedianAge|latitude|longitude|medianHouseValue|medianIncome|population|totalBedRooms|totalRooms|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|     126.0|            41.0|   37.88|  -122.23|        452600.0|      8.3252|     322.0|        129.0|     880.0|
|    1138.0|            21.0|   37.86|  -122.22|        358500.0|      8.3014|    2401.0|       1106.0|    7099.0|
|     177.0|            52.0|   37.85|  -122.24|        352100.0|      7.2574|     496.0|        190.0|    1467.0|
|     219.0|            52.0|   37.85|  -122.25|        341300.0|      5.6431|     558.0|        235.0|    1274.0|
|     259.0|            52.0|   37.85|  -122.25|        342200.0|      3.8462|     565.0|        280.0|    1627.0|
|     193.0|            52.0|   37.85|  -122.25|        269700.0|      4.0368|  

Let’s start small and just select two columns from df of which you only want to see 10 rows:

In [16]:
df.select('households', 'medianIncome').show(10)

+----------+------------+
|households|medianIncome|
+----------+------------+
|     126.0|      8.3252|
|    1138.0|      8.3014|
|     177.0|      7.2574|
|     219.0|      5.6431|
|     259.0|      3.8462|
|     193.0|      4.0368|
|     514.0|      3.6591|
|     647.0|        3.12|
|     595.0|      2.0804|
|     714.0|      3.6912|
+----------+------------+
only showing top 10 rows



In [17]:
df.describe().show()

+-------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|summary|        households|  housingMedianAge|          latitude|          longitude|  medianHouseValue|      medianIncome|        population|    totalBedRooms|        totalRooms|
+-------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|  count|             20640|             20640|             20640|              20640|             20640|             20640|             20640|            20640|             20640|
|   mean| 499.5396802325581|28.639486434108527| 35.63186143109965|-119.56970444871473|206855.81690891474|3.8706710030346416|1425.4767441860465|537.8980135658915|2635.7630813953488|
| stddev|382.32975283161136|12.585557612111613|2.1359523806029554| 2.0035317429328914|115395.61

## 3. Data Preprocessing

In [19]:
from pyspark.sql.functions import *

# Adjust the values of `medianHouseValue`
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)

In [20]:
df.show(2)

+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|households|housingMedianAge|latitude|longitude|medianHouseValue|medianIncome|population|totalBedRooms|totalRooms|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|     126.0|            41.0|   37.88|  -122.23|           4.526|      8.3252|     322.0|        129.0|     880.0|
|    1138.0|            21.0|   37.86|  -122.22|           3.585|      8.3014|    2401.0|       1106.0|    7099.0|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
only showing top 2 rows



### 3.1 Future Engineering
Now that I have adjusted the values in medianHouseValue, I can also add the additional variables that I read about above. I am going to add the following columns to the data set:

__Rooms per household which refers to the number of rooms in households per block group;__

__Population per household, which basically gives me an indication of how many people live in households per block group;__

__Bedrooms per room which will give me an idea about how many rooms are bedrooms per block group;__

In [22]:
# Import all from `sql.functions` if you haven't yet
from pyspark.sql.functions import *

# Divide `totalRooms` by `households`
roomsPerHousehold = df.select(col('totalRooms')/col('households'))

#Divide 'population' by 'households'
personPerHousehold = df.select(col('population')/col('households'))

# Divide `totalBedRooms` by `totalRooms`
bedroomsPerRoom = df.select(col('totalBedRooms')/col('totalRooms'))

# Add the new columns to `df`
df = df.withColumn('roomsPerHousehold', col('totalRooms')/col('households')).withColumn('personPerHousehold',col('population')/col('households')).withColumn('bedroomsPerRoom',col('totalBedRooms')/col('totalRooms'))

df.first()

Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0, roomsPerHousehold=6.984126984126984, personPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

In [24]:
# Re-order and select columns
df = df.select("medianHouseValue", 
              "totalBedRooms", 
              "population", 
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
              "personPerHousehold", 
              "bedroomsPerRoom")

### 3.2 Standardization
Now that I have re-ordered the data, I am ready to normalize the data. Or almost, at least. There is just one more step that I need to go through: separating the features from the target variable.

In [25]:
# Import `DenseVector`
from pyspark.ml.linalg import DenseVector

# Define the `input_data` 
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Replace `df` with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])

In [26]:
df.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|4.526|[129.0,322.0,126....|
|3.585|[1106.0,2401.0,11...|
|3.521|[190.0,496.0,177....|
|3.413|[235.0,558.0,219....|
|3.422|[280.0,565.0,259....|
|2.697|[213.0,413.0,193....|
|2.992|[489.0,1094.0,514...|
|2.414|[687.0,1157.0,647...|
|2.267|[665.0,1206.0,595...|
|2.611|[707.0,1551.0,714...|
|2.815|[434.0,910.0,402....|
|2.418|[752.0,1504.0,734...|
|2.135|[474.0,1098.0,468...|
|1.913|[191.0,345.0,174....|
|1.592|[626.0,1212.0,620...|
|  1.4|[283.0,697.0,264....|
|1.525|[347.0,793.0,331....|
|1.555|[293.0,648.0,303....|
|1.587|[455.0,990.0,419....|
|1.629|[298.0,690.0,275....|
+-----+--------------------+
only showing top 20 rows



 __Next, I can finally scale the data. I can use Spark ML to do this: this library will make machine learning on big data scalable and easy.__ The input columns are the features, and the output column with the rescaled that will be included in the scaled_df will be named "features_scaled":



In [27]:
# Import `StandardScaler` 
from pyspark.ml.feature import StandardScaler

# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)

# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df)

# Inspect the result
scaled_df.take(2)

[Row(label=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([0.3062, 0.2843, 0.3296, 4.3821, 2.8228, 0.2461, 2.5264])),
 Row(label=3.585, features=DenseVector([1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([2.6255, 2.1202, 2.9765, 4.3696, 2.5213, 0.2031, 2.6851]))]

## 4. Building A Machine Learning Model With Spark ML
With all the preprocessing done, it’s finally time to start building my Linear Regression model! Just like always, I first need to split the data into training and test sets. Luckily, this is no issue with the randomSplit() method:

In [28]:
# Split the data into train and test sets
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)

In [29]:
# Import `LinearRegression`
from pyspark.ml.regression import LinearRegression

# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the data to the model
linearModel = lr.fit(train_data)

__Note that__ the argument elasticNetParam corresponds to α or the vertical intercept and that the regParam or the regularization paramater corresponds to λ. Go here for more information. Lastly, I can then inspect the predicted and real values by simply accessing the list with square brackets []:

In [30]:
# Generate predictions
predicted = linearModel.transform(test_data)

# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()

# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel[:5]

[(1.5755352395152893, 0.14999),
 (1.7341233026776406, 0.225),
 (1.531875281086717, 0.388),
 (1.3628617899920583, 0.394),
 (1.2862982565956895, 0.396)]

## 5. Evaluating the Model
Looking at predicted values is one thing, but another and better thing is looking at some metrics to get a better idea of how good my model actually is


In [31]:
# Coefficients for the model
linearModel.coefficients

# Intercept for the model
linearModel.intercept

0.9963441266477807