We are going to train a linear regression model to predict the release year of a song given a set of audio features. We are using a subset of the [Million Song Dataset](http://labrosa.ee.columbia.edu/millionsong/) from the [UCI Machine Learning Repository](https://archive.ics.uci.edu/ml/datasets/YearPredictionMSD) which is hosted on databricks cloud. Before we start coding, check the SparkContext version.

In [2]:
print sc.version
print type(sqlContext)
sqlContext = sqlContext

In [3]:
# load data
file_name = '/databricks-datasets/cs190/data-001/millionsong.txt'
raw_data_df = sqlContext.read.load(file_name, 'text')
print raw_data_df.show()

In the dataset, for each row the first value is the label and the remaining values are the features.

In [5]:
# view data and data points.
num_points = raw_data_df.count()
print num_points
sample_points = raw_data_df.take(5)
print sample_points

In MLlib, labeled training instances are stored using the LabeledPoint object. So, we write a function that takes in a DataFrame and parse each row in the DataFrame into a LabeledPoint.

In [7]:
from pyspark.mllib.regression import LabeledPoint
import numpy as np

In [8]:
from pyspark.sql import functions as sql_functions

def parse_points(df):
  """Converts a DataFrame of comma separated unicode strings into a DataFrame of LabeledPoints. 
  
  Args:
    df: DataFrame where each row is a comma separated unicode string. The first element in the string
        is the label and the remaining elements are the features.
  
  Returns:
    DataFrame: Each row is converted into a `LabeledPoint`, which consists of a label and
               features.
  """
  return (df.select(sql_functions.split(df.value, ',')).alias('value')
          .map(lambda row: LabeledPoint(float(row[0][0]), list(row[0][1:])))
          .toDF(['features', 'label']))

parsed_points_df = parse_points(raw_data_df)
print(parsed_points_df)
first_point_features = parsed_points_df.first().features
first_point_label = parsed_points_df.first().label
print first_point_label, first_point_features

d = len(first_point_features)
print(d)

In [9]:
# find range of song years and shift labels such that they start from zero.
content_stats = (parsed_points_df
                 .selectExpr('min(label)', 'max(label)')
                 .collect())
print(content_stats)
min_year = content_stats[0][0]
max_year = content_stats[0][1]

print min_year,max_year

# create new dataframe with shifted labels.
parsed_data_df = (parsed_points_df.select(parsed_points_df.features, parsed_points_df.label - min_year)
                  .withColumnRenamed('(label - ' + str(min_year) + ')', 'label'))

print parsed_data_df.first()

Now lets split the dataset into training, validation and test sets. Once that is done we cache each of these datasets because we will be accessing those multiple times.

In [11]:
# split and cache the datasets.
weights = [.8,.1,.1]
seed= 23
parsed_train_data_df, parsed_val_data_df, parsed_test_data_df = parsed_data_df.randomSplit(weights, seed)
parsed_train_data_df.cache()
parsed_val_data_df.cache()
parsed_test_data_df.cache()
n_train = parsed_train_data_df.count()
n_val = parsed_val_data_df.count()
n_test = parsed_test_data_df.count()

print n_train, n_val, n_test, n_train+n_val+n_test
print parsed_data_df.count()

Baseline model is one where we always make the same prediction independent of the given data point, using the average label in the training set as the constant prediction value. Lets compute the average song year for the training set.

In [13]:
# calculate average label.
average_train_year = (parsed_train_data_df
                     .selectExpr('avg(label)').first()[0])
print average_train_year

We will use root mean squared error (RMSE) for evaluation purposes. We will write a function to calculate the root mean squared error of a given dataset.

In [15]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator()

def calc_RMSE(dataset):
  """ Calculates root mean squared error for a dataset of (prediction, label) tuples.
  
  Args:
    dataset (Dataframe of (float, float)): A Dataframe consisting of (prediction, label) tuples.
  
  Returns:
    float: The square root of the mean of the squared errors.
  """
  return evaluator.evaluate(dataset)

In [16]:
# lets test the function.
preds_labels = [(1.,2.), (2.,2.), (2.,3.)]
preds_labels_df = sqlContext.createDataFrame(preds_labels, ["prediction", "label"])

pred_RMSE = calc_RMSE(preds_labels_df)
# RMSE = sqrt[((1-2)^2 + (2-2)^2 + (2-3)^2) / 3] = 0.81
print pred_RMSE

Lets calcuate the training, validation and test RMSE of our baseline model. To do this, we first create DataFrame of (prediction, label) tuples for each dataset and then call calc_RMSE().

In [18]:
preds_labels_train = parsed_train_data_df.map(lambda x : (average_train_year, x.label))
preds_labels_train_df = sqlContext.createDataFrame(preds_labels_train, ["prediction", "label"])
rmse_train_base = calc_RMSE(preds_labels_train_df)

preds_labels_val = parsed_val_data_df.map(lambda x : (average_train_year, x.label))
preds_labels_val_df = sqlContext.createDataFrame(preds_labels_val, ["prediction", "label"])
rmse_val_base = calc_RMSE(preds_labels_val_df)

preds_labels_test = parsed_test_data_df.map(lambda x : (average_train_year, x.label))
preds_labels_test_df = sqlContext.createDataFrame(preds_labels_test, ["prediction", "label"])
rmse_test_base = calc_RMSE(preds_labels_test_df)

print 'Baseline Train RMSE = {0:.3f}'.format(rmse_train_base)
print 'Baseline Validation RMSE = {0:.3f}'.format(rmse_val_base)
print 'Baseline Test RMSE = {0:.3f}'.format(rmse_test_base)

Now let's see if we can do better via linear regression, training a model via gradient descent (we'll omit the intercept for now). Recall that the gradient descent update for linear regression is:
\\[ \scriptsize \mathbf{w}_{i+1} = \mathbf{w}_i - \alpha_i \sum_j (\mathbf{w}_i^\top\mathbf{x}_j  - y_j) \mathbf{x}_j \,.\\]
where \\( \scriptsize i \\) is the iteration number of the gradient descent algorithm, and \\( \scriptsize j \\) identifies the observation.

First, we implement a function that computes the summand for this update, i.e., the summand equals \\( \scriptsize (\mathbf{w}^\top \mathbf{x} - y) \mathbf{x} \, ,\\) and test out this function on two examples.

In [20]:
from pyspark.mllib.linalg import DenseVector

def gradient_summand(weights, lp):
  """Calculates the gradient summand for a given weight and LabeledPoint.
  
  Args:
    weights(DenseVector): An array of model weights(betas).
    lp(LabeledPoint): The LabeledPoint for a single observation.
  
  Returns:
    DenseVector: An array of values the same length as weights. The gradient summand.
  """
  gradient_summand = ((weights.transpose().dot(lp.features)) - lp.label) * (lp.features)
  return gradient_summand

In [21]:
# test gradient summand function.

example_weights = DenseVector([1,1,1])
example_lp = LabeledPoint(3.0, [2, 3, 4])
summand_example = gradient_summand(example_weights, example_lp)
print summand_example

Next, implement a `get_labeled_predictions` function that takes in weights and an observation's `LabeledPoint` and returns a _(prediction, label)_ tuple. We can predict by computing the dot product between weights and an observation's features.

In [23]:
def get_labeled_prediction(weights, observation):
  """Calculates predictions and returns a (prediction, label) tuple.
  
  Args:
    weights: an array with one weight for each feature in data.
    observation(LabeledPoint): A LabeledPoint that contain the correct label and the feature for data point.
  
  Returns:
    tuple: A (prediction, label) tuple.
  """
  predictions = weights.dot(observation.features)
  label = observation.label
  return(float(predictions), float(label))

In [24]:
# test get_labeled_predictions

weights = np.array([1.0, 1.5])
prediction_example = sc.parallelize([LabeledPoint(2, np.array([.5, 1.0])),
                                     LabeledPoint(1, np.array([.5, .5]))])
preds_and_labels_example = prediction_example.map(lambda lp:get_labeled_predictions(weights, lp))
print preds_and_labels_example.collect()

Next we implement a gradient descent function for linear regression.

In [26]:
def linreg_gradient_descent(train_data, num_iters):
  """Calculates the weights and error for a linear regression model trained with gradient descent.
  
  Args:
    train_data: labeled data for use in training the model.
    num_iters: number of iterations of gradient descent to perform.
  
  Returns:
    (np.ndarray, np.ndarray): A tuple of (weights, training_errors)
  """
  # The length of the training data
  n = train_data.count()
  # The number of features in the training data
  d = len(train_data.first().features)
  w = np.zeros(d)
  alpha = 1.0
  error_train = np.zeros(num_iters)
  for i in range(num_iters):
    preds_and_labels_train = train_data.map(lambda lp: get_labeled_prediction(w, lp))
    preds_and_labels_train_df = sqlContext.createDataFrame(preds_and_labels_train, ["prediction", "label"])
    error_train[i] = calc_RMSE(preds_and_labels_train_df)
    
    gradient = train_data.map(lambda lp: gradient_summand(w, lp)).sum()
    # Update the weights
    alpha_i = alpha / (n * np.sqrt(i+1))
    w -= alpha_i*gradient
  return w, error_train

In [27]:
# test linreg_gradient_descent
example_n = 10
example_d = 3
example_data = (sc
                .parallelize(parsed_train_data_df.take(example_n))
                .map(lambda lp: LabeledPoint(lp.label, lp.features[0:example_d])))
print example_data.take(2)
example_num_iters = 5
example_weights, example_error_train = linreg_gradient_descent(example_data, example_num_iters)
print example_weights

Lets train a linear regression model on all our training data and evaluate its accuracy on the validation set.

In [29]:
num_iters = 50
weights_LR0, error_train_LR0 = linreg_gradient_descent(parsed_train_data_df, num_iters)
preds_and_labels = (parsed_val_data_df
                    .map(lambda lp: get_labeled_prediction(weights_LR0, lp)))
preds_and_labels_df = sqlContext.createDataFrame(preds_and_labels, ["prediction", "label"])
rmse_val_LR0 = calc_RMSE(preds_and_labels_df)

print 'Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 = {1:.3f}'.format(rmse_val_base,
                                                                       rmse_val_LR0)

We have done better than the baseline model, but now we see if we can do better by adding an intercept using regularization and training more iterations. We use LinearRegression to train model with elastic net regularization and an intercept.

In [31]:
from pyspark.ml.regression import LinearRegression

# values to use when training the linear regression model
num_iters = 500
reg = 1e-1
alpha = .2
use_intercept = True

# train a Linear Regression model
lin_reg = LinearRegression(maxIter=num_iters, regParam=reg, elasticNetParam=alpha, fitIntercept=use_intercept)
first_model = lin_reg.fit(parsed_train_data_df)

# coeffsLR1 stores the model coefficients; interceptLR1 stores the model intercept
coeffs_LR1 = first_model.coefficients
intercept_LR1 = first_model.intercept
print coeffs_LR1, intercept_LR1

In [32]:
# make predictions on the parsed_train_data_df
sample_prediction = first_model.transform(parsed_train_data_df)
display(sample_prediction)

In [33]:
# evaluate the accuracy of the model on the validation set.
val_pred_df = first_model.transform(parsed_val_data_df)
rmse_val_LR1 = calc_RMSE(val_pred_df)

print ('Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 = {1:.3f}\n\tLR1 = {2:.3f}').format(rmse_val_base, rmse_val_LR0, rmse_val_LR1)

We're already outperforming the baseline on the validation set by almost 3 years on average, but let's see if we can do better. We perform grid search to find a good regularization parameter

In [35]:
best_RMSE = rmse_val_LR1
best_reg_param = reg
best_model = first_model

num_iters = 500  # iterations
alpha = .2  # elasticNetParam
use_intercept = True  # intercept

for reg in [1e-10, 1e-5, 1.0]:
    lin_reg = LinearRegression(maxIter=num_iters, regParam=reg, elasticNetParam=alpha, fitIntercept=use_intercept)
    model = lin_reg.fit(parsed_train_data_df)
    val_pred_df = model.transform(parsed_val_data_df)

    rmse_val_grid = calc_RMSE(val_pred_df)
    print rmse_val_grid

    if rmse_val_grid < best_RMSE:
        best_RMSE = rmse_val_grid
        best_reg_param = reg
        best_model = model

rmse_val_LR_grid = best_RMSE

print ('Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 = {1:.3f}\n\tLR1 = {2:.3f}\n' +
       '\tLRGrid = {3:.3f}').format(rmse_val_base, rmse_val_LR0, rmse_val_LR1, rmse_val_LR_grid)

So far, we've used the features as they were provided.  Now, we will add features that capture the two-way interactions between our existing features.

In [37]:
import itertools

def two_way_interactions(lp):
    """Creates a new `LabeledPoint` that includes two-way interactions.
    
    Args:
        lp (LabeledPoint): The label and features for this observation.

    Returns:
        LabeledPoint: The new `LabeledPoint` should have the same label as `lp`.  Its features
            should include the features from `lp` followed by the two-way interaction features.
    """
    combinations = itertools.product(lp.features, repeat = 2)
    list_combs = []
    final_list = [lp.label]
    for y in lp.features:
      list_combs.append(y)
    for x in combinations:
      prods = np.prod(np.array(x))
      list_combs.append(prods)
    final_list.append(list_combs)
    lbp = LabeledPoint(label=final_list[0], features=final_list[1])
    return lbp

print two_way_interactions(LabeledPoint(0.0, [2, 3]))

In [38]:
# Transform the existing train, validation, and test sets to include two-way interactions.
train_data_interact_df = parsed_train_data_df.map(two_way_interactions).toDF()
val_data_interact_df = parsed_val_data_df.map(two_way_interactions).toDF()
test_data_interact_df = parsed_test_data_df.map(two_way_interactions).toDF()
a=parsed_val_data_df.rdd.first()
print a.features
f=two_way_interactions(LabeledPoint(a.label, a.features)).features
print sum(f)

In [39]:
# lets build the interaction model
num_iters = 500
reg = 1e-10
alpha = .2
use_intercept = True

lin_reg = LinearRegression(maxIter=num_iters, regParam=reg, elasticNetParam=alpha, fitIntercept=use_intercept)
model_interact = lin_reg.fit(train_data_interact_df)
preds_and_labels_interact_df = model_interact.transform(val_data_interact_df)
rmse_val_interact = calc_RMSE(preds_and_labels_interact_df)

print ('Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 = {1:.3f}\n\tLR1 = {2:.3f}\n\tLRGrid = ' +
       '{3:.3f}\n\tLRInteract = {4:.3f}').format(rmse_val_base, rmse_val_LR0, rmse_val_LR1,
                                                 rmse_val_LR_grid, rmse_val_interact)

In [40]:
# evaluate interaction model on test data.
preds_and_labels_test_df = model_interact.transform(test_data_interact_df)
rmse_test_interact = calc_RMSE(preds_and_labels_test_df)

print ('Test RMSE:\n\tBaseline = {0:.3f}\n\tLRInteract = {1:.3f}'
       .format(rmse_test_base, rmse_test_interact))

Our final step is to create the interaction model using a [Pipeline](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.Pipeline).

In [42]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import PolynomialExpansion

num_iters = 500
reg = 1e-10
alpha = .2
use_intercept = True

polynomial_expansion = PolynomialExpansion(degree=2, inputCol='features', outputCol='polyFeatures')
linear_regression = LinearRegression(maxIter=num_iters, regParam=reg, elasticNetParam=alpha,
                                     fitIntercept=use_intercept, featuresCol='polyFeatures')

pipeline = Pipeline(stages=[polynomial_expansion, linear_regression])
pipeline_model = pipeline.fit(parsed_train_data_df)

predictions_df = pipeline_model.transform(parsed_test_data_df)

evaluator = RegressionEvaluator()
rmse_test_pipeline = evaluator.evaluate(predictions_df, {evaluator.metricName: "rmse"})
print('RMSE for test data set using pipelines: {0:.3f}'.format(rmse_test_pipeline))