## Regression: Boston Housing Data
https://archive.ics.uci.edu/ml/machine-learning-databases/housing/

** Dataset Information: **

506 Boston housing information including value of houses

** Attribute Information: (13 features and 1 class)**

1. CRIM      per capita crime rate by town
2. ZN        proportion of residential land zoned for lots over 25,000 sq.ft.
                 
3. INDUS     proportion of non-retail business acres per town
4. CHAS      Charles River dummy variable (= 1 if tract bounds river; 0 otherwise)
                 
5. NOX       nitric oxides concentration (parts per 10 million)
6. RM        average number of rooms per dwelling
7. AGE       proportion of owner-occupied units built prior to 1940
8. DIS       weighted distances to five Boston employment centres
9. RAD       index of accessibility to radial highways
10. TAX      full-value property-tax rate per $10,000
11. PTRATIO  pupil-teacher ratio by town
12. B        1000(Bk - 0.63)^2 where Bk is the proportion of blacks by town
                 
13. LSTAT    % lower status of the population
14. MEDV     Median value of owner-occupied homes in $1000's

** Objective of this project **

predict value of house (MEDV) based on the other features

## Data

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('boston_housing').getOrCreate()

In [45]:
# Load Data
data = spark.read.csv('boston_housing.csv',inferSchema=True,header=True)
# Inspect Data
data.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: integer (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: integer (nullable = true)
 |-- TAX: integer (nullable = true)
 |-- PTRATIO: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- MEDV: double (nullable = true)



In [46]:
data.show(5)

+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+
|   CRIM|  ZN|INDUS|CHAS|  NOX|   RM| AGE|   DIS|RAD|TAX|PTRATIO|     B|LSTAT|MEDV|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+
|0.00632|18.0| 2.31|   0|0.538|6.575|65.2|  4.09|  1|296|   15.3| 396.9| 4.98|24.0|
|0.02731| 0.0| 7.07|   0|0.469|6.421|78.9|4.9671|  2|242|   17.8| 396.9| 9.14|21.6|
|0.02729| 0.0| 7.07|   0|0.469|7.185|61.1|4.9671|  2|242|   17.8|392.83| 4.03|34.7|
|0.03237| 0.0| 2.18|   0|0.458|6.998|45.8|6.0622|  3|222|   18.7|394.63| 2.94|33.4|
|0.06905| 0.0| 2.18|   0|0.458|7.147|54.2|6.0622|  3|222|   18.7| 396.9| 5.33|36.2|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+
only showing top 5 rows



In [49]:
data.head()
#for item in data.head():
#    print(item)

Row(CRIM=0.00632, ZN=18.0, INDUS=2.31, CHAS=0, NOX=0.538, RM=6.575, AGE=65.2, DIS=4.09, RAD=1, TAX=296, PTRATIO=15.3, B=396.9, LSTAT=4.98, MEDV=24.0)

## Data preprocessing

** Split Features & Class (or target) **

In [79]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

# combine features into a single column
print(data.columns[:-1],'\n')
assembler = VectorAssembler(inputCols=data.columns[:-1],outputCol='features')                            
output = assembler.transform(data)
output.show(5)
output.select('features').show(5)

# features and target
final_data = output.select("features",'MEDV')
final_data.show(5)

['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B', 'LSTAT'] 

+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+--------------------+
|   CRIM|  ZN|INDUS|CHAS|  NOX|   RM| AGE|   DIS|RAD|TAX|PTRATIO|     B|LSTAT|MEDV|            features|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+--------------------+
|0.00632|18.0| 2.31|   0|0.538|6.575|65.2|  4.09|  1|296|   15.3| 396.9| 4.98|24.0|[0.00632,18.0,2.3...|
|0.02731| 0.0| 7.07|   0|0.469|6.421|78.9|4.9671|  2|242|   17.8| 396.9| 9.14|21.6|[0.02731,0.0,7.07...|
|0.02729| 0.0| 7.07|   0|0.469|7.185|61.1|4.9671|  2|242|   17.8|392.83| 4.03|34.7|[0.02729,0.0,7.07...|
|0.03237| 0.0| 2.18|   0|0.458|6.998|45.8|6.0622|  3|222|   18.7|394.63| 2.94|33.4|[0.03237,0.0,2.18...|
|0.06905| 0.0| 2.18|   0|0.458|7.147|54.2|6.0622|  3|222|   18.7| 396.9| 5.33|36.2|[0.06905,0.0,2.18...|
+-------+----+-----+----+-----+-----+----+------+---+---+---

** Split Train Test Sets **

In [80]:
seed = 101
train_data,test_data = final_data.randomSplit([0.7,0.3],seed=seed)
train_data.describe().show()
test_data.describe().show()

+-------+------------------+
|summary|              MEDV|
+-------+------------------+
|  count|               372|
|   mean|22.347580645161297|
| stddev| 9.073222327441464|
|    min|               5.0|
|    max|              50.0|
+-------+------------------+

+-------+------------------+
|summary|              MEDV|
+-------+------------------+
|  count|               134|
|   mean|23.047014925373137|
| stddev| 9.548504510694489|
|    min|               5.0|
|    max|              50.0|
+-------+------------------+



** Scale Features **

In [81]:
from pyspark.ml.feature import StandardScaler
standardScaler = StandardScaler(inputCol='features', outputCol='features_scaled')
scaler = standardScaler.fit(train_data)
train_data = scaler.transform(train_data)
test_data = scaler.transform(test_data)
train_data.show(2)
test_data.show(2)
train_data.take(2)

## Linear Regression Model

In [73]:
# Train the Model
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(labelCol='MEDV')
lrModel = lr.fit(train_data,)
print("Coefficients: {} Intercept: {}".format(lrModel.coefficients,lrModel.intercept))

Py4JJavaError: An error occurred while calling o420.fit.
: java.lang.IllegalArgumentException
	at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
	at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
	at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
	at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:443)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:426)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
	at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:426)
	at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
	at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
	at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
	at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
	at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:257)
	at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:256)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:256)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2435)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2434)
	at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2842)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2841)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:2434)
	at org.apache.spark.ml.regression.LinearRegressionSummary.numInstances$lzycompute(LinearRegression.scala:696)
	at org.apache.spark.ml.regression.LinearRegressionSummary.numInstances(LinearRegression.scala:696)
	at org.apache.spark.ml.regression.LinearRegressionSummary.<init>(LinearRegression.scala:701)
	at org.apache.spark.ml.regression.LinearRegressionTrainingSummary.<init>(LinearRegression.scala:588)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:225)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:76)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.base/java.lang.Thread.run(Thread.java:844)


In [87]:
spark.stop()

** Scale Features **

In [None]:
from sklearn.preprocessing import StandardScaler
sc = StandardScaler()
X_train = sc.fit_transform(X_train)
X_test = sc.transform(X_test)

## Baseline Models

In [None]:
# Import libraries
from sklearn.model_selection import KFold,cross_val_score,GridSearchCV
from sklearn.metrics import mean_squared_error,r2_score
from sklearn.linear_model import LinearRegression,Ridge,Lasso
from sklearn.neighbors import KNeighborsRegressor
from sklearn.tree import DecisionTreeRegressor
from sklearn.svm import SVR
from sklearn.ensemble import RandomForestRegressor
from xgboost import XGBRegressor

In [None]:
models = []
#linear
models.append(('LR', LinearRegression()))
models.append(('RG', Ridge(random_state=seed)))
models.append(('LS', Lasso(random_state=seed)))
#non-linear
models.append(('KNN', KNeighborsRegressor()))
models.append(('DT', DecisionTreeRegressor(random_state=seed)))
models.append(('SVM', SVR()))
#ensemble
models.append(('RF', RandomForestRegressor(n_estimators=100,random_state=seed)))
models.append(('XGB', XGBRegressor(random_state=seed)))

In [None]:
# cross validation
np.random.seed(seed)
kfold = KFold(n_splits=10, random_state=seed)
scoring='neg_mean_squared_error'
names = []
results = []
print('Accuracy: mean +/- std')
for name,model in models:
    cv_results = cross_val_score(model, X_train, y_train, 
                                 cv=kfold, scoring=scoring)
    results.append(cv_results)
    names.append(name)    
    print('{}: {:0.3f} +/- {:0.3f}'.format(name, cv_results.mean(), 
                                             cv_results.std()))

# visualize results
ax = sns.boxplot(data=results)
ax.set_xticklabels(names)
plt.title('Model Comparison')
plt.ylabel('MSE')

## Parameter Tuning
try to improve top 4 algorithms: XGB, RF, DT, RG

#### Model 1: XGBoost

In [None]:
# Default
model = XGBRegressor(n_estimators=100,random_state=seed)
model.fit(X_train,y_train)
print(model.get_params())
print('\n')
print('MSE of test set: {:0.3f}'.format(-1*mean_squared_error(y_test,model.predict(X_test))))

In [None]:
# Grid Search
model = XGBRegressor(random_state=seed)

# parameters
n_estimators = [100, 200, 300]
#max_depth = [2,4,6]
param_grid = dict(n_estimators=n_estimators)#, max_depth=max_depth)
param_grid

# grid search
np.random.seed(seed)
scoring='neg_mean_squared_error'
kfold = KFold(n_splits=10, random_state=seed)
grid_search = GridSearchCV(model, param_grid=param_grid, 
                          cv=kfold, verbose=1, scoring=scoring)
grid_result = grid_search.fit(X_train,y_train)

# results
means = grid_result.cv_results_['mean_test_score']
stds = grid_result.cv_results_['std_test_score']
params = grid_result.cv_results_['params']
for mean, stdev, param in zip(means, stds, params):
    print("%f (%f) with: %r" % (mean, stdev, param))

# best
print('\n')
print("Best MSE of training set: {:0.3f} using {}".format(grid_result.best_score_, grid_result.best_params_))
# test set
print('MSE of test set: {:0.1f}'.format(grid_result.score(X_test,y_test)))

In [None]:
# Feature Importance
from xgboost import plot_importance  
model = XGBRegressor(n_estimators=100, random_state=seed)                      
model.fit(X_train,y_train)
print(model.feature_importances_)
plot_importance(model)

#### Model 2: RandomForest

In [None]:
# Default
model = RandomForestRegressor(n_estimators=100,random_state=seed)
model.fit(X_train,y_train)
print(model.get_params())
print('\n')
print('MSE of test set: {:0.3f}'.format(-1*mean_squared_error(y_test,model.predict(X_test))))

In [None]:
# Grid Search
model = RandomForestRegressor(random_state=seed)

# parameters
n_estimators = [50,100,150]
#max_depth = [2,4,6]
param_grid = dict(n_estimators=n_estimators)#, max_depth=max_depth)
param_grid

# grid search
np.random.seed(seed)
scoring='neg_mean_squared_error'
kfold = KFold(n_splits=10, random_state=seed)
grid_search = GridSearchCV(model, param_grid=param_grid, 
                          cv=kfold, verbose=1, scoring=scoring)
grid_result = grid_search.fit(X_train,y_train)

# results
means = grid_result.cv_results_['mean_test_score']
stds = grid_result.cv_results_['std_test_score']
params = grid_result.cv_results_['params']
for mean, stdev, param in zip(means, stds, params):
    print("%f (%f) with: %r" % (mean, stdev, param))

# best
print('\n')
print("Best MSE of training set: {:0.3f} using {}".format(grid_result.best_score_, grid_result.best_params_))
# test set
print('MSE of test set: {:0.1f}'.format(grid_result.score(X_test,y_test)))

In [None]:
# Feature Importance
model = RandomForestRegressor(n_estimators=100,random_state=seed)
model.fit(X_train, y_train)
pd.DataFrame(data=model.feature_importances_,
             index=df.drop('MEDV',axis=1).columns).plot(kind='bar',legend=False,
                                                        title='Feature Importances')

#### Model 3: Decision Tree

In [None]:
# Default
model = DecisionTreeRegressor(random_state=seed)
model.fit(X_train,y_train)
print(model.get_params())
print('\n')
print('MSE of test set: {:0.3f}'.format(-1*mean_squared_error(y_test,model.predict(X_test))))

In [None]:
# Grid Search
model = DecisionTreeRegressor(random_state=seed)

# parameters
#n_estimators = [50,100,150]
max_depth = [2,4,8,16]
param_grid = dict(max_depth=max_depth)
param_grid

# grid search
np.random.seed(seed)
scoring='neg_mean_squared_error'
kfold = KFold(n_splits=10, random_state=seed)
grid_search = GridSearchCV(model, param_grid=param_grid, 
                          cv=kfold, verbose=1, scoring=scoring)
grid_result = grid_search.fit(X_train,y_train)

# results
means = grid_result.cv_results_['mean_test_score']
stds = grid_result.cv_results_['std_test_score']
params = grid_result.cv_results_['params']
for mean, stdev, param in zip(means, stds, params):
    print("%f (%f) with: %r" % (mean, stdev, param))

# best
print('\n')
print("Best MSE of training set: {:0.3f} using {}".format(grid_result.best_score_, grid_result.best_params_))
# test set
print('MSE of test set: {:0.1f}'.format(grid_result.score(X_test,y_test)))

#### Model 4: Ridge Regression

In [None]:
# Default
model = Ridge(random_state=seed)
model.fit(X_train,y_train)
print(model.get_params())
print('\n')
print('MSE of test set: {:0.3f}'.format(-1*mean_squared_error(y_test,model.predict(X_test))))

In [None]:
# Grid Search
model = Ridge(random_state=seed)

# parameters
alpha = [10,1,0.1,0.001]
param_grid = dict(alpha=alpha)
param_grid

# grid search
np.random.seed(seed)
scoring='neg_mean_squared_error'
kfold = KFold(n_splits=10, random_state=seed)
grid_search = GridSearchCV(model, param_grid=param_grid, 
                          cv=kfold, verbose=1, scoring=scoring)
grid_result = grid_search.fit(X_train,y_train)

# results
means = grid_result.cv_results_['mean_test_score']
stds = grid_result.cv_results_['std_test_score']
params = grid_result.cv_results_['params']
for mean, stdev, param in zip(means, stds, params):
    print("%f (%f) with: %r" % (mean, stdev, param))

# best
print('\n')
print("Best MSE of training set: {:0.3f} using {}".format(grid_result.best_score_, grid_result.best_params_))
# test set
print('MSE of test set: {:0.1f}'.format(grid_result.score(X_test,y_test)))

## Final Model: XGB (n_estimators=100)

** Final Model **

In [None]:
model = XGBRegressor(n_estimators=100,random_state=seed)
model.fit(X_train, y_train)
print('MSE of train set: {:0.3f}'.format(mean_squared_error(y_train,model.predict(X_train))))
print('R^2 of train set: {:0.3f}'.format(r2_score(y_train,model.predict(X_train))))

** Save and load the final model **

In [None]:
#save model to disk
import pickle
filename='model.sav'
pickle.dump(model, open(filename, 'wb'))

In [None]:
#load the model from disk
model = pickle.load(open(filename,'rb'))
accuracy = model.score(X_train, y_train)
print('MSE of train set: {:0.3f}'.format(mean_squared_error(y_train,model.predict(X_train))))
print('R^2 of train set: {:0.3f}'.format(r2_score(y_train,model.predict(X_train))))

## Model Evaluation with Test set

In [None]:
# predict class with test set (new, unseen)
y_pred = model.predict(X_test)
print('MSE of test set: {:0.3f}'.format(mean_squared_error(y_test,model.predict(X_test))))
print('R^2 of test set: {:0.3f}'.format(r2_score(y_test,model.predict(X_test))))

In [None]:
plt.figure(figsize=(5,5))
plt.scatter(y_test,y_pred)
plt.title('MEDV: True vs Predicted')
plt.xlabel('True')
plt.ylabel('Predicted')

In [None]:
sns.distplot((y_test-y_pred.ravel()), bins=50, kde=False, axlabel='Error (True - Pred)')
plt.title('MEDV: Error Distribution')

## Summary

- Best model: ** XGBoost ** w/default param


- MSE / R2 score of test set: ** 14.3 / 0.87 **