In [None]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.linalg import Vectors
import numpy as np

#Plotting 
%pylab inline
%matplotlib inline
from matplotlib import pyplot as plt

#Main model imports
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.ml.regression import LinearRegression

In [None]:
path = "Restaurant_train_Remove_Header.csv"
raw_data = sc.textFile(path)
num_data = raw_data.count()
records = raw_data.map(lambda x: x.split(","))
first = records.first()
print('First record: ', first)
print('Total number of records: ', num_data)

In [None]:
#Cache
records.cache()

In [None]:
#Map categorical features into a binary vector form
def get_mapping(rdd, idx):
    print('index:', idx)
    return rdd.map(lambda fields: fields[idx]).distinct().zipWithIndex().collectAsMap()

print("Categorical feature Type Mapping Output: %s" % get_mapping(records, 1))

#Apply Mapping function for two categorical columns
mappings = [get_mapping(records, i) for i in range(0,2)]

cat_len = sum([len(b) for b in mappings])
num_len = len(records.first()[2:39])
total_len = num_len + cat_len

#We now have the mappings for each variable, and we can see how many values in total we need for our binary vector representation:

print("Feature vector length for categorical features: %d" % cat_len)
print("Feature vector length for numerical features: %d" % num_len)
print("Total feature vector length: %d" % total_len)

In [None]:
#Using the extracted mappings get binary-encoded features from the categorical features.
#Note: Re-used this logic from Bike share assignment.
def extract_features(record):
    cat_vec = np.zeros(cat_len)
    i = 0
    step = 0
    for field in record[0:1]: # catogorical feature
        print('extract_features', i)
        m = mappings[i]
        idx = m[field]
        cat_vec[idx + step] = 1
        i = i + 1
        step = step + len(m)
    num_vec = np.array([float(field) for field in record[2:38]])
    return np.concatenate((cat_vec, num_vec))

#extract_label function changes to float
def extract_label(record):
    return float(record[-1])
# Root Suarred Error
def squared_log_error(pred, actual):
    return (np.log(pred + 1) - np.log(actual + 1))**2

# Aboslute Error
def abs_error(actual, pred):
     return np.abs(pred - actual)
# Mean Squared Error     
def squared_error(actual, pred):
    return (pred - actual)**2

In [None]:
#2.1 Task: DecisionTree

In [None]:
# Show the Label and Feature Vector from the Dataset 
# 2.1.1 Decision Tree Categorical features

decision_tree_data = records.map(lambda r: LabeledPoint(extract_label(r), extract_features(r)))

first_point_decision_tree = decision_tree_data.first()
print("Raw data: " + str(first[2:]))
print("Decision Tree regression output label: " + str(first_point_decision_tree.label))
print("Decision Tree regression feature vector: " + str(first_point_decision_tree.features))
print("Decision Tree regression feature vector length: " + str(len(first_point_decision_tree.features)))

(trainData_decision_tree, testData_decision_tree) = decision_tree_data.randomSplit([0.7, 0.3])
model_decision_tree = DecisionTree.trainRegressor(trainData_decision_tree, categoricalFeaturesInfo={},
                                    impurity='variance', maxDepth=10, maxBins=64)

# Evaluate model on test instances and compute test error
predictions_decision_tree = model_decision_tree.predict(testData_decision_tree.map(lambda x: x.features))
labelsAndPredictions_decision_tree = testData_decision_tree.map(lambda lp: lp.label).zip(predictions_decision_tree)

print('Learned regression tree model:')
print(model_decision_tree.toDebugString())


In [None]:
# 2.1.2) Decision Tree Log

In [None]:
rmsle_decision_tree = np.sqrt(labelsAndPredictions_decision_tree.map(lambda lp: squared_log_error(lp[0], lp[1])).mean())
mse = labelsAndPredictions_decision_tree.map(lambda lp: squared_error(lp[0], lp[1])).mean()
mae = labelsAndPredictions_decision_tree.map(lambda lp: abs_error(lp[0], lp[1])).mean()

print("Decision Tree Model - Root Mean Squared Log Error: %2.4f" % rmsle_decision_tree)
print("Decision Tree Model - Mean Squared Error: %2.4f" % mse)
print("Decision Tree Model - Mean Absolute Error: %2.4f" % mae)

targets = records.map(lambda r: float(r[-1])).collect()
hist(targets, bins=20, color='lightblue', normed=True)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(12, 8)

In [None]:
def evaluate_decision_tree(trainData, testData, maxDepthValue, maxBinsValue):
    modelDT = DecisionTree.trainRegressor(trainData, categoricalFeaturesInfo={},
                                    impurity='variance', maxDepth=maxDepthValue, maxBins=maxBinsValue)

    # Evaluate model on test instances and compute test error
    predits = modelDT.predict(testData.map(lambda x: x.features))
    labelsAndPredicts = testData.map(lambda lp: lp.label).zip(predits)
    rmsleDT = np.sqrt(labelsAndPredicts.map(lambda lp: squared_log_error(lp[0], lp[1])).mean())
    return rmsleDT

In [None]:
# 2.1.3) Decision Tree Max Bins
constantMaxDepthAndBinsParams = [5, 10, 20, 30, 40, 50]

metrics = [evaluate_decision_tree(trainData_decision_tree, testData_decision_tree, 20, param) # constant Max Depth
           for param in constantMaxDepthAndBinsParams]
print(constantMaxDepthAndBinsParams)
print(metrics)

# Plotting
plot(constantMaxDepthAndBinsParams, metrics)
plt.xlabel('Max bins')
plt.ylabel('RMSLE')

In [None]:
# 2.1.4) Decision Tree Max Depth

maxDepthAndConstantBinsParams = [5, 10, 12, 15, 20, 25]

metricsMaxDepth = [evaluate_decision_tree(trainData_decision_tree, testData_decision_tree, param, 150) 
           for param in maxDepthAndConstantBinsParams]
print(maxDepthAndConstantBinsParams)
print(metricsMaxDepth)

# Plotting
plot(maxDepthAndConstantBinsParams, metricsMaxDepth)
plt.xlabel('Max depth')
plt.ylabel('RMSLE')

In [None]:
# 2.2. Gradient boost tree 

In [None]:
# Train a GradientBoostedTrees model.
model_gradient_tree = GradientBoostedTrees.trainRegressor(trainData_decision_tree,
                                            categoricalFeaturesInfo={}, numIterations=5, maxDepth=15, maxBins=32)
predictions_gradient_tree = model_gradient_tree.predict(testData_decision_tree.map(lambda x: x.features))
labelsAndPredictions_gradient_tree = testData_decision_tree.map(lambda lp: lp.label).zip(predictions_gradient_tree)

print('Completed Gradient boosted tree model building:')
print(model_gradient_tree.toDebugString())

rmsle_gradient_tree = np.sqrt(labelsAndPredictions_gradient_tree.map(lambda lp: squared_log_error(lp[0], lp[1])).mean())
mse_gradient_tree = labelsAndPredictions_gradient_tree.map(lambda lp: squared_error(lp[0], lp[1])).mean()
mae_gradient_tree = labelsAndPredictions_gradient_tree.map(lambda lp: abs_error(lp[0], lp[1])).mean()

print("Gradient boosted tree Model - Root Mean Squared Log Error: %2.4f" % rmsle_gradient_tree)
print("Gradient boosted tree Model - Mean Squared Error: %2.4f" % mse_gradient_tree)
print("Gradient boosted tree Model - Mean Absolute Error: %2.4f" % mae_gradient_tree)

In [None]:
# 2.2.1) Gradient boost tree iterations

In [None]:
def evaluate_gradient_tree(trainData, testData, numIterationsValue, maxDepthValue, maxBinsValue):
    model_gradient_tree = GradientBoostedTrees.trainRegressor(trainData,
                                            categoricalFeaturesInfo={}, numIterations=numIterationsValue,
                                                    maxDepth=maxDepthValue, maxBins=maxBinsValue)

    predictions_gradient_tree = model_gradient_tree.predict(testData.map(lambda x: x.features))
    labelsAndPredictions_gradient_tree = testData.map(lambda lp: lp.label).zip(predictions_gradient_tree)
    rmsle_gradient_tree = np.sqrt(labelsAndPredictions_gradient_tree.map(lambda lp: squared_log_error(lp[0], lp[1])).mean())
    return rmsle_gradient_tree

numInterationsParams = [2, 3, 4, 5, 6]

metrics_gradient_tree_iterations = [evaluate_gradient_tree(trainData_decision_tree, testData_decision_tree, param, 10, 32)
           for param in numInterationsParams]
print(numInterationsParams)
print(metrics_gradient_tree_iterations)

# Plotting
plot(numInterationsParams, metrics_gradient_tree_iterations)
plt.xlabel('Iterations log scale')
plt.ylabel('RMSLE')

In [None]:
# 2.2.2) Gradient boost tree Max Bins

In [None]:
maxBinsParams = [5, 10, 15, 20 , 25]

metrics_gradient_tree_maxBins = [evaluate_gradient_tree(trainData_decision_tree, testData_decision_tree, 3, 5, param)
           for param in maxBinsParams]
print(maxBinsParams)
print(metrics_gradient_tree_maxBins)

# Plotting : Decision Tree Max Depth
plot(maxBinsParams, metrics_gradient_tree_maxBins)
plt.xlabel('Max Bins')
plt.ylabel('RMSLE')

In [None]:
#2.2.3 - Gradient boost tree Max Depth

In [None]:
maxDepthParams = [5, 10, 15, 20, 25]

metrics_gradient_tree_maxDepth = [evaluate_gradient_tree(trainData_decision_tree, testData_decision_tree, 3, param, 32)
           for param in maxDepthParams]
print(maxDepthParams)
print(metrics_gradient_tree_maxDepth)

# Plotting
plot(maxDepthParams, metrics_gradient_tree_maxDepth)
plt.xlabel('Max Depths')
plt.ylabel('RMSLE')

In [None]:
#2.3 Task - Linear regression model

In [None]:
# we can proceed with extracting feature vectors and labels from our data records
data_linear_regression = records.map(lambda r: LabeledPoint(extract_label(r), extract_features(r)))

# Let's inspect the first record in the extracted feature RDD:

first_point = data_linear_regression.first()
print("Raw data: " + str(first[2:]))
print("Label: " + str(first_point.label))
print("Linear Model feature vector:\n" + str(first_point.features))
print("Linear Model feature vector length: " + str(len(first_point.
features)))

(trainData_linear_regression, testData_linear_regression) = data_linear_regression.randomSplit([0.75, 0.25])

In [None]:
model_linear_regression = LinearRegressionWithSGD.train(trainData_linear_regression, iterations=20, step=0.01)
# Building a Regression Model with Spark
true_vs_predicted_linear_regression = testData_linear_regression.map(lambda p: (p.label, model_linear_regression.predict(p.features)))
print("Linear Model predictions: " + str(true_vs_predicted_linear_regression.take(5)))


predictions_linear_regression = model_linear_regression.predict(testData_linear_regression.map(lambda x: x.features))
labelsAndPredictions_linear_regression = testData_linear_regression.map(lambda lp: lp.label).zip(predictions_linear_regression)

In [None]:
#2.3.1 - Linear regression Cross Validation

In [None]:
print('Training data size', trainData_linear_regression.count())
print('Test data size', testData_linear_regression.count())

print('Total data size', trainData_linear_regression.count() + testData_linear_regression.count())


def evaluate(train, test, iterations, step, regParam, regType, intercept):
    model = LinearRegressionWithSGD.train(train, iterations, step, 
                                          regParam=regParam, regType=regType, intercept=intercept)
    tp = test.map(lambda p: (p.label, model.predict(p.features)))
    rmsle = np.sqrt(tp.map(lambda lp: squared_log_error(lp[0], lp[1])).mean())
    return rmsle


In [None]:
#2.3.2.1 - Intercept

params_intercept = [True, False]
metrics_intercept = [evaluate(trainData_linear_regression, testData_linear_regression, 3, 0.01, 0.0, 'l2', param) for param in params_intercept]

print(params_intercept)
print(metrics_intercept)

# Plotting
plot(params_intercept, metrics_intercept)
plt.xlabel('Intercept')
plt.ylabel('RMSLE')

In [None]:
#2.3.2.2 - Iterations

params_iterations = [3, 6, 9, 12]
metrics_iterations = [evaluate(trainData_linear_regression, testData_linear_regression, param, 0.01, 0.0, 'l2', False) for param in params_iterations]

print(params_iterations)
print(metrics_iterations)

# Plotting
plot(params_iterations, metrics_iterations)
plt.xlabel('Iterations')
plt.ylabel('RMSLE')

In [None]:
#2.3.2.3 -  Step size

params_step = [0.01, 0.05, 0.075, 0.5, 1]
metrics_step = [evaluate(trainData_linear_regression, testData_linear_regression, 3, param, 0.0, 'l2', False) for param in params_step]

print(params_step)
print(metrics_step)

# Plotting
plot(params_step, metrics_step)
plt.xlabel('Step')
plt.ylabel('RMSLE')

In [None]:
#2.3.2.4 - L1 Regularization

params_l1 = params = [0.0, 0.1, 0.5, 1.0, 10.0, 100.0]
metrics_l1 = [evaluate(trainData_linear_regression, testData_linear_regression, 3, 0.01, param, 'l1', False) for param in params_l1]

print(params_l1)
print(metrics_l1)

# Plotting
plot(params_l1, metrics_l1)
plt.xlabel('L1 Regularization')
plt.ylabel('RMSLE')


In [None]:
#2.3.2.4 - L2 Regularization

params_l2 = params = [0.0, 0.01, 0.05, 0.5, 1.0, 5.0]
metrics_l2 = [evaluate(trainData_linear_regression, testData_linear_regression, 5, 0.01, param, 'l2', False) for param in params_l2]

print(params_l2)
print(metrics_l2)

# Plotting
plot(params_l2, metrics_l2)
plt.xlabel('L2 Regularization')
plt.ylabel('RMSLE')

In [None]:
#2.3.3 - Linear regression Log

rmsle_lr = np.sqrt(labelsAndPredictions_linear_regression.map(lambda lp: squared_log_error(lp[0], lp[1])).mean())
mse_lr = labelsAndPredictions_linear_regression.map(lambda lp: squared_error(lp[0], lp[1])).mean()
mae_lr = labelsAndPredictions_linear_regression.map(lambda lp: abs_error(lp[0], lp[1])).mean()

print("Linear regression Log - Root Mean Squared Log Error: %2.4f" % rmsle_lr)
print("Linear regression Log - Mean Squared Error: %2.4f" % mse_lr)
print("Linear regression Log - Mean Absolute Error: %2.4f" % mae_lr)