# Predicting Songs' release years using Linear Regression
We use a subset of [Million Song Dataset](http://labrosa.ee.columbia.edu/millionsong/) from the [UCI Machine Learning Repository](https://archive.ics.uci.edu/ml/datasets/YearPredictionMSD). Our goal is to train a linear regression model to predict the release year of a song given a set of audio features.

# 1. ETL

In [3]:
# load testing library
from databricks_test_helper import Test
import os.path
file_name = os.path.join('databricks-datasets', 'cs190', 'data-001', 'millionsong.txt')

raw_data_df = sqlContext.read.load(file_name, 'text')
print raw_data_df.count()

In [4]:
from pyspark.sql.functions import lit
from pyspark.mllib.regression import LabeledPoint
parsedDF = raw_data_df.rdd.map(lambda x: x[0].split(','))
lp_df = parsedDF.map(lambda x: LabeledPoint(x[0],x[1:])).toDF()
lp_df.show(5)


In [5]:
import matplotlib.pyplot as plt
def prepare_plot(xticks, yticks, figsize=(10.5, 6), hideLabels=False, gridColor='#999999',
                 gridWidth=1.0):
    """Template for generating the plot layout."""
    plt.close()
    fig, ax = plt.subplots(figsize=figsize, facecolor='white', edgecolor='white')
    ax.axes.tick_params(labelcolor='#999999', labelsize='10')
    for axis, ticks in [(ax.get_xaxis(), xticks), (ax.get_yaxis(), yticks)]:
        axis.set_ticks_position('none')
        axis.set_ticks(ticks)
        axis.label.set_color('#999999')
        if hideLabels: axis.set_ticklabels([])
    plt.grid(color=gridColor, linewidth=gridWidth, linestyle='-')
    map(lambda position: ax.spines[position].set_visible(False), ['bottom', 'top', 'left', 'right'])
    return fig, ax
  
import numpy as np
minyear = lp_df.groupBy().min('label').collect()[0][0]
print "Earliest year", minyear
lp_df_year_adjusted = lp_df.withColumn('shifted_year',lp_df.label-lit(minyear)).drop('label').withColumnRenamed("shifted_year",'label')
print lp_df_year_adjusted.take(5)


new_data = (lp_df_year_adjusted
             .rdd
             .map(lambda lp: (lp.label, 1))
             .reduceByKey(lambda x, y: x + y)
             .collect())
x, y = zip(*new_data)

# generate layout and plot data
fig, ax = prepare_plot(np.arange(0, 120, 20), np.arange(0, 120, 20))
plt.scatter(x, y, s=14**2, c='#d6ebf2', edgecolors='#8cbfd0', alpha=0.75)
ax.set_xlabel('Year (shifted)'), ax.set_ylabel('Count')
display(fig)
pass

In [6]:
import matplotlib.pyplot as plt
import matplotlib.cm as cm

data_values = (lp_df_year_adjusted
               .rdd
               .map(lambda lp: lp.features.toArray())
               .takeSample(False, 50, 47))

# We can uncomment the line below to see randomly selected features.  
# data_values = (parsedPointsDF
#                .rdd
#                .map(lambda lp: lp.features.toArray())
#                .takeSample(False, 50))

# generate layout and plot
fig, ax = prepare_plot(np.arange(.5, 11, 1), np.arange(.5, 49, 1), figsize=(8,7), hideLabels=True,
                       gridColor='#eeeeee', gridWidth=1.1)
image = plt.imshow(data_values,interpolation='nearest', aspect='auto', cmap=cm.Greys)
for x, y, s in zip(np.arange(-.125, 12, 1), np.repeat(-.75, 12), [str(x) for x in range(12)]):
    plt.text(x, y, s, color='#999999', size='10')
plt.text(4.7, -3, 'Feature', color='#999999', size='11'), ax.set_ylabel('Observation')
display(fig)

# 2. Bulding a linear regression model from scratch

In [8]:
seed = 42
weights = [.8, .1, .1]
train,validation,test = lp_df_year_adjusted.randomSplit(weights,seed)
train.cache()
validation.cache()
test.cache()

# print n_train, n_val, n_test, n_train + n_val + n_test

print "train, test, validation",train.count(),validation.count(), test.count()

In [9]:
from pyspark.sql.functions import sqrt, sum, count
from pyspark.ml.evaluation import RegressionEvaluator

def calc_RMSE(pl_df):
  return pl_df.select(sqrt(sum((pl_df.prediction - pl_df.label)**2)/count(pl_df.label))).collect()[0][0]

# spark alternative:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")
preds_and_labels = [(1., 3.), (2., 1.), (2., 2.)]
preds_and_labels_df = sqlContext.createDataFrame(preds_and_labels, ["prediction", "label"])
print calc_RMSE(preds_and_labels_df)
print evaluator.evaluate(preds_and_labels_df)


In [11]:
from pyspark.ml.evaluation import RegressionEvaluator
average_train_year = train.groupBy().avg('label').collect()[0][0]
train_lp = train.select(train.label).withColumn('prediction',lit(average_train_year))
test_lp = test.select(test.label).withColumn('prediction',lit(average_train_year))
validation_lp = validation.select(validation.label).withColumn('prediction',lit(average_train_year))
print 'Baseline Train RMSE = {0:.3f}'.format(evaluator.evaluate(train_lp))
print 'Baseline Validation RMSE = {0:.3f}'.format(calc_RMSE(test_lp))
print 'Baseline Test RMSE = {0:.3f}'.format(calc_RMSE(validation_lp))



In [12]:
from pyspark.mllib.linalg import DenseVector

def calc_delta(x, w,b):
  e = (w.dot(x.features)+b-x.label)
  return (e*x.features,e)

def predict_rdd(x,w,b):
  return float(w.dot(x.features)+b),x.label

def lr_gd(train_data, num_iters):
  w = np.zeros(len(train_data.first().features))
  b=0
  n = train_data.count()
  error = []
  alpha = 1.0
  for i in range(num_iters):
    pred_label = train_data.rdd.map(lambda x:predict_rdd(x,w,b)) # make prediction
    pred_label_df = sqlContext.createDataFrame(pred_label, ["prediction", "label"])
    error.append(calc_RMSE(pred_label_df))
    #gradient_w,gradient_b, = train_data.rdd.map(lambda x:calc_delta(x,w,b)).reduce(lambda x,y:x+y)
    gradient_w,gradient_b, = train_data.rdd.map(lambda x:calc_delta(x,w,b)).reduce(lambda (a,b),(c,d):(a+c,b+d))
    alpha_i = alpha / (n * np.sqrt(i+1))
    w -= alpha_i*gradient_w
    b -= alpha_i*gradient_b
  return w,b, error    


num_iters = 50
weights_LR0,b50,e50 = lr_gd(train,num_iters)

#Predicting using the trained model
preds_and_labels = (validation.rdd.map(lambda lp: predict_rdd(lp,weights_LR0,b50)))
preds_and_labels_df = sqlContext.createDataFrame(preds_and_labels, ["prediction", "label"])
rmse_val_LR0 = calc_RMSE(preds_and_labels_df)
print "Validation Error ->" , rmse_val_LR0



In [13]:
from matplotlib.colors import ListedColormap, Normalize
from matplotlib.cm import get_cmap
cmap = get_cmap('YlOrRd')

norm = Normalize()
clrs = cmap(np.asarray(norm(e50[6:])))[:,0:3]

fig, ax = prepare_plot(np.arange(0, 60, 10), np.arange(17, 22, 1))
ax.set_ylim(17.8, 21.2)
plt.scatter(range(0, num_iters-6), e50[6:], s=14**2, c=clrs, edgecolors='#888888', alpha=0.75)
ax.set_xticklabels(map(str, range(6, 66, 10)))
ax.set_xlabel('Iteration'), ax.set_ylabel(r'Training Error')
display(fig)

# 3. Spark MLlib LinearRegression

In [15]:
from pyspark.ml.regression import LinearRegression
# Values to use when training the linear regression model

num_iters = 500  # iterations
reg = 1e-1  # regParam
alpha = .2  # elasticNetParam
use_intercept = True  # intercept
lin_reg = LinearRegression(maxIter = num_iters , elasticNetParam = alpha ,regParam = reg, fitIntercept = use_intercept)
first_model = lin_reg.fit(train)

#coeffsLR1 stores the model coefficients; interceptLR1 stores the model intercept
coeffs_LR1 =first_model.coefficients
intercept_LR1 = first_model.intercept
print coeffs_LR1, intercept_LR1


In [16]:
#let's do some predictions
val_pred_df = first_model.transform(validation )
rmse_val_LR1 = calc_RMSE(val_pred_df)

print ('Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 (My implementation) = {1:.3f}' +
       '\n\tLR1 (PySpark) = {2:.3f}').format(calc_RMSE(validation_lp), rmse_val_LR0, rmse_val_LR1)

In [17]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

reg_params = [1.0, 2.0, 4.0, 8.0, 16.0, 32.0]
alpha_params = [0.0, .1, .2, .4, .8, 1.0]
lr = LinearRegression(maxIter=50,fitIntercept = True)
paramGrid = ParamGridBuilder() \
    .addGrid(lr.elasticNetParam, alpha_params) \
    .addGrid(lr.regParam, reg_params) \
    .build()

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=2) 

cvModel = crossval.fit(train)
rmse_val_LR_grid = evaluator.evaluate(cvModel.transform(validation))
print "Validation error of the best model ->",rmse_val_LR_grid

print ('Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 (My implementation) = {1:.3f}' +
       '\n\tLR1 (PySpark) = {2:.3f}'+'\n\tLRGrid (Hyper Parameter Optimized) = {2:.3f}').format(calc_RMSE(validation_lp), rmse_val_LR0, rmse_val_LR1,rmse_val_LR_grid)


In [18]:
def addf(x):# create 2way interactions in addition to existing variables
  newf = [a*b for a in x['features'] for b in x['features']]
  return LabeledPoint(x['label'],np.append(x['features'],newf))

train2way = train.map(lambda lp:addf(lp)).toDF()
validation2way = validation.map(lambda lp:addf(lp)).toDF()

num_iters = 500
reg = 1e-10
alpha = .2
use_intercept = True

lin_reg = LinearRegression(maxIter=num_iters, regParam=reg, elasticNetParam=alpha, fitIntercept=use_intercept)
model_interact = lin_reg.fit(train2way)
val_2way_predict = model_interact.transform(validation2way)
rmse_val_2way = calc_RMSE(val_2way_predict)

print ('Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 (my implementation) = {1:.3f}\n\tLR1 (PySpark) = {2:.3f}\n\tLRGrid (Hyper Parameter Optimized) = ' + '{3:.3f}\n\tLRInteract (2way interactions) = {4:.3f}').format(evaluator.evaluate(train_lp), rmse_val_LR0, rmse_val_LR1,
                                                 rmse_val_LR_grid, rmse_val_2way)
print "\nAdding two way interactions help"

In [19]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import PolynomialExpansion
num_iters = 500
reg = 1e-10
alpha = .2
use_intercept = True


pe2_pipe = PolynomialExpansion(degree=2, inputCol="features", outputCol="polyFeatures")
lr_pipe = LinearRegression(maxIter=num_iters, regParam=reg, elasticNetParam=alpha,
                                     fitIntercept=use_intercept, featuresCol='polyFeatures')
#defining the pipeline and its 
pl = Pipeline(stages=[pe2_pipe,lr_pipe])

pl_model = pl.fit(train)

print "Validation set RMSE using the pipeline ->", evaluator.evaluate(pl_model.transform(validation))

