## Overview

Regression Problem:

Algorithms Used:

- Linear Regression with Spark ML. Reference: https://www.datacamp.com/community/tutorials/apache-spark-tutorial-machine-learning
- Linear Regression with MLLib. Reference: https://spark.apache.org/docs/1.6.1/mllib-linear-methods.html#regression
- RandomForest Regression with MLLib. Reference: https://spark.apache.org/docs/latest/mllib-ensembles.html#regression

## Loading the file into dataframe

In [3]:
# File location and type
file_location = "/FileStore/tables/california_housing_train.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

df = df.drop('longitude', 'latitude')
display(df)

housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value
15.0,5612.0,1283.0,1015.0,472.0,1.4936,66900.0
19.0,7650.0,1901.0,1129.0,463.0,1.82,80100.0
17.0,720.0,174.0,333.0,117.0,1.6509,85700.0
14.0,1501.0,337.0,515.0,226.0,3.1917,73400.0
20.0,1454.0,326.0,624.0,262.0,1.925,65500.0
29.0,1387.0,236.0,671.0,239.0,3.3438,74000.0
25.0,2907.0,680.0,1841.0,633.0,2.6768,82400.0
41.0,812.0,168.0,375.0,158.0,1.7083,48500.0
34.0,4789.0,1175.0,3134.0,1056.0,2.1782,58400.0
46.0,1497.0,309.0,787.0,271.0,2.1908,48100.0


In [4]:
df.printSchema()

## Linear Regression

In [6]:
# Reference: https://www.datacamp.com/community/tutorials/apache-spark-tutorial-machine-learning 

from pyspark.sql.functions import *

df.groupBy("housing_median_age").count().sort("housing_median_age",ascending=False).show()

In [7]:
df.describe().show()

In [8]:
df = df.withColumn("median_house_value", col("median_house_value")/100000)
df.take(2)

In [9]:
df = df.select("median_house_value",
               "total_bedrooms",
               "population",
               "households", 
               "median_income",
               "housing_median_age", 
               "total_rooms")

In [10]:
from pyspark.ml.linalg import DenseVector

input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

df = spark.createDataFrame(input_data, ["label", "features"])

In [11]:
from pyspark.ml.feature import StandardScaler

standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

scaler = standardScaler.fit(df)

scaled_df = scaler.transform(df)

scaled_df.take(2)

In [12]:
training_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)

print("Number of training set rows: %d" % training_data.count())
print("Number of test set rows: %d" % test_data.count())

In [13]:
# Build the model for Linear Regression
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(labelCol="label", maxIter=100, regParam=0.3, elasticNetParam=0.01)
linearModel = lr.fit(training_data)

In [14]:
predicted = linearModel.transform(test_data)
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])
predictionAndLabel = predictions.zip(labels).collect()
predictionAndLabel[:5]

In [15]:
linearModel.coefficients

linearModel.intercept

In [16]:
# Get the RMSE
print("Root Mean Squared Error is %f" % linearModel.summary.rootMeanSquaredError)

# Get the R2
print("Root Squared Error is %f" % linearModel.summary.r2)

In [17]:
RANDOM_SEED = 13579
TRAINING_DATA_RATIO = 0.7

RF_NUM_TREES = 3
RF_MAX_DEPTH = 4
RF_MAX_BINS = 32

In [18]:
## Data Preprocessing and Test Train Split
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

transformed_df = df.rdd.map(lambda row: LabeledPoint(row[-1], Vectors.dense(row[0:-1])))

splits = [TRAINING_DATA_RATIO, 1.0 - TRAINING_DATA_RATIO]
training_data1, test_data1 = transformed_df.randomSplit(splits, RANDOM_SEED)

print("Number of training set rows: %d" % training_data1.count())
print("Number of test set rows: %d" % test_data1.count())

In [19]:
# Reference : https://spark.apache.org/docs/1.6.1/mllib-linear-methods.html#regression

# Build the model for Linear Regression with Stochastic Gradient Descent
from pyspark.mllib.regression import LinearRegressionWithSGD
model_lr = LinearRegressionWithSGD.train(training_data1, iterations=100, step=0.00000001)

In [20]:
## Calculating the mean squared error value for the model

predictions_lr = model_lr.predict(test_data1.map(lambda x: x.features))
labels_and_predictions_lr = test_data1.map(lambda x: x.label).zip(predictions_lr)
MSE = labels_and_predictions_lr \
    .map(lambda vp: (vp[0] - vp[1])**2) \
    .reduce(lambda x, y: x + y) / labels_and_predictions_lr.count()
print("Mean Squared Error = " + str(MSE))


## RandomForest Regression

In [22]:
# Reference : https://spark.apache.org/docs/latest/mllib-ensembles.html#regression

# Build the model for RandomForest Regression  with 
from pyspark.mllib.tree import RandomForest, RandomForestModel

model_rf = RandomForest.trainRegressor(training_data1, categoricalFeaturesInfo={},
                                    numTrees=3, featureSubsetStrategy="auto",
                                    impurity='variance', maxDepth=RF_MAX_DEPTH, maxBins=RF_MAX_BINS)

In [23]:
## Calculating the mean squared error value for the model

predictions_rf = model_rf.predict(test_data1.map(lambda x: x.features))
labels_and_predictions_rf = test_data1.map(lambda x: x.label).zip(predictions_rf)
testMSE = labels_and_predictions_rf.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /\
    float(test_data1.count())

print('Test Mean Squared Error = ' + str(testMSE/100000000))