In [None]:
# Imports

In [None]:
%pylab inline
%matplotlib inline
from matplotlib import pyplot as plt
from pyspark.mllib.regression import LabeledPoint
import numpy as np
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.linalg import Vectors
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel
from pyspark.ml.regression import LinearRegression

In [None]:
# Load the data into Resilient Distributed Datasets (RDD)

In [None]:
path = "/regression-models/Movie_TMDb/tmdb-movies-final-features-no-header.csv"
raw_data = sc.textFile(path)
num_data = raw_data.count()
records = raw_data.map(lambda x: x.split(","))
first = records.first()
print('First record: ', first)
print('Total number of records: ', num_data)


In [None]:
# Cache the results

In [None]:
records.cache()

In [None]:
# extract each categorical feature into a binary vector form, 
# we will need to know the feature mapping of each feature value to the index of the nonzero value 
# in our binary vector

In [None]:
def get_mappings(rdd, idx):
    print('index:', idx)
    return rdd.map(lambda fields: fields[idx]).distinct().zipWithIndex().collectAsMap()

In [None]:
# We have two categorical features
# 1. genres, is at index : 0
# 2. release_year is at index :  1

In [None]:
print("Mapping of first categorical feasture column: %s" % get_mappings(records, 0))

In [None]:
# Apply Mapping function to each categorical column (0, 1)

In [None]:
mappings = [get_mappings(records, i) for i in range(0,2)]

cat_len = sum([len(b) for b in mappings])
num_len = len(records.first()[2:8])
total_len = num_len + cat_len

In [None]:
# We now have the mappings for each variable, 
# and we can see how many values in total we need for our binary vector representation:

In [None]:
print("Feature vector length for categorical features: %d" % cat_len)
print("Feature vector length for numerical features: %d" % num_len)
print("Total feature vector length: %d" % total_len)

In [None]:
# The next step is to use our extracted mappings to convert the categorical features to binary-encoded features.

In [None]:
def extract_features(record):
    cat_vec = np.zeros(cat_len)
    i = 0
    step = 0
    for field in record[0:1]: # catogorical feature
        print('extract_features', i)
        m = mappings[i]
        idx = m[field]
        cat_vec[idx + step] = 1
        i = i + 1
        step = step + len(m)
    num_vec = np.array([float(field) for field in record[1:7]])
    return np.concatenate((cat_vec, num_vec))



In [None]:
# The extract_label function simply converts the last column variable (Revenue) into a float

In [None]:
def extract_label(record):
    return float(record[-1])

In [None]:
#### 2.1 Decision Tree #######

In [None]:
# decision tree models typically work on raw features

In [None]:
def extract_features_dt(record):
    return np.array(map(float, record[0:6]))

In [None]:
# Extract the Data so that we are ready for training and prediction on Decision Tree model

In [None]:
data_dt = records.map(lambda r: LabeledPoint(extract_label(r), extract_features(r)))

In [None]:
# Show the Label and Feature Vector from the Dataset 
# 2.1.1 Decision Tree Categorical features

In [None]:
first_point_dt = data_dt.first()
print("Raw data: " + str(first[2:]))
print("Decision Tree Label: " + str(first_point_dt.label))
print("Decision Tree feature vector: " + str(first_point_dt.features))
print("Decision Tree feature vector length: " + str(len(first_point_dt.features)))

In [None]:
# Split the data into training and test sets (30% held out for testing)

In [None]:
(trainingData_dt, testData_dt) = data_dt.randomSplit([0.7, 0.3])

In [None]:
# Train a DecisionTree model.
#  Empty categoricalFeaturesInfo indicates all features are continuous.

In [None]:
model_dt = DecisionTree.trainRegressor(trainingData_dt, categoricalFeaturesInfo={},
                                    impurity='variance', maxDepth=5, maxBins=3000)

# Evaluate model on test instances and compute test error
predictions_dt = model_dt.predict(testData_dt.map(lambda x: x.features))
labelsAndPredictions_dt = testData_dt.map(lambda lp: lp.label).zip(predictions_dt)

In [None]:
# dt_model = DecisionTree.trainRegressor(data_dt,{})
# preds = dt_model.predict(data_dt.map(lambda p: p.features))
# actual = data.map(lambda p: p.label)
# true_vs_predicted_dt = actual.zip(preds)

In [None]:
print('Learned regression tree model:')
print(model_dt.toDebugString())


In [None]:
# Root Suarred Error
def squared_log_error(pred, actual):
    return (np.log(pred + 1) - np.log(actual + 1))**2

# Aboslute Error
def abs_error(actual, pred):
     return np.abs(pred - actual)
# Mean Squared Error     
def squared_error(actual, pred):
    return (pred - actual)**2

In [None]:
# 2.1.2) Decision Tree Log

In [None]:
# testMSE_dt = labelsAndPredictions_dt.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() / float(testData.count())

rmsle_dt = np.sqrt(labelsAndPredictions_dt.map(lambda lp: squared_log_error(lp[0], lp[1])).mean())
mse = labelsAndPredictions_dt.map(lambda lp: squared_error(lp[0], lp[1])).mean()
mae = labelsAndPredictions_dt.map(lambda lp: abs_error(lp[0], lp[1])).mean()


In [None]:
print("Decision Tree Model - Root Mean Squared Log Error: %2.4f" % rmsle_dt)
print("Decision Tree Model - Mean Squared Error: %2.4f" % mse)
print("Decision Tree Model - Mean Absolute Error: %2.4f" % mae)

In [None]:
targets = records.map(lambda r: float(r[-1])).collect()
hist(targets, bins=20, color='lightblue', normed=True)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(12, 8)

In [None]:
# 2.1.3) Decision Tree Max Bins
# 2.14) Decision Tree Max Depth

In [None]:
# (RMSLE vs MaxDepth) and (RMSLE vs MaxBins) evaluation metric

In [None]:
def evaluate_dt(trainData, testData, maxDepthValue, maxBinsValue):
    modelDT = DecisionTree.trainRegressor(trainData, categoricalFeaturesInfo={},
                                    impurity='variance', maxDepth=maxDepthValue, maxBins=maxBinsValue)

    # Evaluate model on test instances and compute test error
    predits = modelDT.predict(testData.map(lambda x: x.features))
    labelsAndPredicts = testData.map(lambda lp: lp.label).zip(predits)
    rmsleDT = np.sqrt(labelsAndPredicts.map(lambda lp: squared_log_error(lp[0], lp[1])).mean())
    return rmsleDT

In [None]:
# 2.1.3) Decision Tree Max Bins

In [None]:
constantMaxDepthAndBinsParams = [32, 64, 80, 100, 200, 400]

metrics = [evaluate_dt(trainingData_dt, testData_dt, 5, param) # constant Max Depth
           for param in constantMaxDepthAndBinsParams]
print(constantMaxDepthAndBinsParams)
print(metrics)

In [None]:
# Plotting : Decision Tree Max Bins
plot(constantMaxDepthAndBinsParams, metrics)
plt.xlabel('Max Bins')
plt.ylabel('RMSLE')

In [None]:
# 2.1.4) Decision Tree Max Depth

In [None]:
maxDepthAndConstantBinsParams = [3, 4, 5, 6, 7, 8]

metricsMaxDepth = [evaluate_dt(trainingData_dt, testData_dt, param, 150) 
           for param in maxDepthAndConstantBinsParams]
print(maxDepthAndConstantBinsParams)
print(metricsMaxDepth)

In [None]:
# Plotting : Decision Tree Max Depth
plot(maxDepthAndConstantBinsParams, metricsMaxDepth)
plt.xlabel('Max Depth')
plt.ylabel('RMSLE')

In [None]:
# 2.2. Gradient boost tree 

In [None]:
# Train a GradientBoostedTrees model.
#  Notes: (a) Empty categoricalFeaturesInfo indicates all features are continuous.
#         (b) Use more iterations in practice.

model_GBT = GradientBoostedTrees.trainRegressor(trainingData_dt,
                                            categoricalFeaturesInfo={}, numIterations=3, maxDepth=5, maxBins=64)

predictions_GBT = model_GBT.predict(testData_dt.map(lambda x: x.features))
labelsAndPredictions_GBT = testData_dt.map(lambda lp: lp.label).zip(predictions_GBT)

print('Learned regression Gradient boosted tree lmodel:')
print(model_GBT.toDebugString())

rmsle_gbt = np.sqrt(labelsAndPredictions_GBT.map(lambda lp: squared_log_error(lp[0], lp[1])).mean())
mse_gbt = labelsAndPredictions_GBT.map(lambda lp: squared_error(lp[0], lp[1])).mean()
mae_gbt = labelsAndPredictions_GBT.map(lambda lp: abs_error(lp[0], lp[1])).mean()

print("GradientBoostedTrees Model - Root Mean Squared Log Error: %2.4f" % rmsle_gbt)
print("GradientBoostedTrees Model - Mean Squared Error: %2.4f" % mse_gbt)
print("GradientBoostedTrees Model - Mean Absolute Error: %2.4f" % mae_gbt)

In [None]:
# 2.2.1) Gradient boost tree iterations

In [None]:
def evaluate_gbt(trainData, testData, numIterationsValue, maxDepthValue, maxBinsValue):
    model_GBT = GradientBoostedTrees.trainRegressor(trainData,
                                            categoricalFeaturesInfo={}, numIterations=numIterationsValue,
                                                    maxDepth=maxDepthValue, maxBins=maxBinsValue)

    predictions_GBT = model_GBT.predict(testData.map(lambda x: x.features))
    labelsAndPredictions_GBT = testData.map(lambda lp: lp.label).zip(predictions_GBT)
    rmsleGBT = np.sqrt(labelsAndPredictions_GBT.map(lambda lp: squared_log_error(lp[0], lp[1])).mean())
    return rmsleGBT

In [None]:
numInterationsParams = [2, 3, 4]

metrics_gbt_iterations = [evaluate_gbt(trainingData_dt, testData_dt, param, 5, 32)
           for param in numInterationsParams]
print(numInterationsParams)
print(metrics_gbt_iterations)

# Plotting
plot(numInterationsParams, metrics_gbt_iterations)
plt.xlabel('Iterations log scale')
plt.ylabel('RMSLE')

In [None]:
# 2.2.2) Gradient boost tree Max Bins

In [None]:
maxBinsParams = [32, 64, 100]

metrics_gbt_maxBins = [evaluate_gbt(trainingData_dt, testData_dt, 3, 5, param)
           for param in maxBinsParams]
print(maxBinsParams)
print(metrics_gbt_maxBins)

# Plotting : Decision Tree Max Depth
plot(maxBinsParams, metrics_gbt_maxBins)
plt.xlabel('Max Bins')
plt.ylabel('RMSLE')

In [None]:
# 2.2.3) Gradient boost tree Max Depth

In [None]:
maxDepthParams = [4, 5, 6]

metrics_gbt_maxDepth = [evaluate_gbt(trainingData_dt, testData_dt, 3, param, 32)
           for param in maxDepthParams]
print(maxDepthParams)
print(metrics_gbt_maxDepth)

# Plotting
plot(maxDepthParams, metrics_gbt_maxDepth)
plt.xlabel('Max Depths')
plt.ylabel('RMSLE')

In [None]:
# 2.3. Linear regression model

In [None]:
# we can proceed with extracting feature vectors and labels from our data records

In [None]:
data_linear = records.map(lambda r: LabeledPoint(extract_label(r), extract_features(r)))

In [None]:
# Let's inspect the first record in the extracted feature RDD:

In [None]:
first_point = data_linear.first()
print("Raw data: " + str(first[2:]))
print("Label: " + str(first_point.label))
print("Linear Model feature vector:\n" + str(first_point.features))
print("Linear Model feature vector length: " + str(len(first_point.
features)))

In [None]:
(trainingData_linear, testData_linear) = data_linear.randomSplit([0.7, 0.3])

In [None]:
model_LR = LinearRegressionWithSGD.train(trainingData_linear, iterations=5, step=0.01)
# Building a Regression Model with Spark
true_vs_predicted_LR = testData_linear.map(lambda p: (p.label, model_LR.predict(p.features)))
print("Linear Model predictions: " + str(true_vs_predicted_LR.take(5)))


predictions_linear = model_LR.predict(testData_linear.map(lambda x: x.features))
labelsAndPredictions_linear = testData_linear.map(lambda lp: lp.label).zip(predictions_linear)

In [None]:
# 2.3.1) Linear regression Cross Validation

In [None]:
print('Training data size', trainingData_linear.count())
print('Test data size', testData_linear.count())

print('Total data size', trainingData_linear.count() + testData_linear.count())


In [None]:
def evaluate(train, test, iterations, step, regParam, regType, intercept):
    model = LinearRegressionWithSGD.train(train, iterations, step, 
                                          regParam=regParam, regType=regType, intercept=intercept)
    tp = test.map(lambda p: (p.label, model.predict(p.features)))
    rmsle = np.sqrt(tp.map(lambda lp: squared_log_error(lp[0], lp[1])).mean())
    return rmsle

In [None]:
# 2.3.2.1) Intercept

params_intercept = [True, False]
metrics_intercept = [evaluate(trainingData_linear, testData_linear, 3, 0.01, 0.0, 'l2', param) for param in params_intercept]

print(params_intercept)
print(metrics_intercept)

# Plotting
plot(params_intercept, metrics_intercept)
plt.xlabel('Intercept')
plt.ylabel('RMSLE')

In [None]:
# 2.3.2.2) Iterations

params_iterations = [1, 3, 6]
metrics_iterations = [evaluate(trainingData_linear, testData_linear, param, 0.01, 0.0, 'l2', False) for param in params_iterations]

print(params_iterations)
print(metrics_iterations)

# Plotting
plot(params_iterations, metrics_iterations)
plt.xlabel('Iterations')
plt.ylabel('RMSLE')

In [None]:
# 2.3.2.3) Step size

params_step = [0.01, 0.025, 0.05, 0.1, 1.0]
metrics_step = [evaluate(trainingData_linear, testData_linear, 3, param, 0.0, 'l2', False) for param in params_step]

print(params_step)
print(metrics_step)

# Plotting
plot(params_step, metrics_step)
plt.xlabel('Step')
plt.ylabel('RMSLE')

In [None]:
# 2.3.2.4) L1 Regularization

params_l1 = params = [0.0, 0.01, 0.1, 1.0, 10.0, 100.0]
metrics_l1 = [evaluate(trainingData_linear, testData_linear, 3, 0.01, param, 'l1', False) for param in params_l1]

print(params_l1)
print(metrics_l1)

# Plotting
plot(params_l1, metrics_l1)
plt.xlabel('L1 Regularization')
plt.ylabel('RMSLE')


In [None]:
# 2.3.2.4) L2 Regularization

params_l2 = params = [0.0, 0.01, 0.1, 1.0, 5.0, 10.0]
metrics_l2 = [evaluate(trainingData_linear, testData_linear, 5, 0.01, param, 'l2', False) for param in params_l2]

print(params_l2)
print(metrics_l2)

# Plotting
plot(params_l2, metrics_l2)
plt.xlabel('L2 Regularization')
plt.ylabel('RMSLE')

In [None]:
# 2.3.3) Linear regression Log

rmsle_lr = np.sqrt(labelsAndPredictions_linear.map(lambda lp: squared_log_error(lp[0], lp[1])).mean())
mse_lr = labelsAndPredictions_linear.map(lambda lp: squared_error(lp[0], lp[1])).mean()
mae_lr = labelsAndPredictions_linear.map(lambda lp: abs_error(lp[0], lp[1])).mean()

print("LinearRegression Root Mean Squared Log Error: %2.4f" % rmsle_lr)
print("LinearRegression - Mean Squared Error: %2.4f" % mse_lr)
print("LinearRegression - Mean Absolute Error: %2.4f" % mae_lr)