# Regression using in Spark

In [30]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
import numpy as np

In [2]:
spark = SparkSession \
        .builder \
        .appName("PySpark regression example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

### Load dataset

In [4]:
df = spark.read.format('com.databricks.spark.csv'). \
                        options(header='true', \
                                inferschema='true'). \
                        load("./data/Advertising.csv");

In [5]:
df.show(5, True)
df.printSchema()

+-----+-----+---------+-----+
|   TV|Radio|Newspaper|Sales|
+-----+-----+---------+-----+
|230.1| 37.8|     69.2| 22.1|
| 44.5| 39.3|     45.1| 10.4|
| 17.2| 45.9|     69.3|  9.3|
|151.5| 41.3|     58.5| 18.5|
|180.8| 10.8|     58.4| 12.9|
+-----+-----+---------+-----+
only showing top 5 rows

root
 |-- TV: double (nullable = true)
 |-- Radio: double (nullable = true)
 |-- Newspaper: double (nullable = true)
 |-- Sales: double (nullable = true)



In [6]:
df.describe().show()

+-------+-----------------+------------------+------------------+------------------+
|summary|               TV|             Radio|         Newspaper|             Sales|
+-------+-----------------+------------------+------------------+------------------+
|  count|              200|               200|               200|               200|
|   mean|         147.0425|23.264000000000024|30.553999999999995|14.022500000000003|
| stddev|85.85423631490805|14.846809176168728| 21.77862083852283| 5.217456565710477|
|    min|              0.7|               0.0|               0.3|               1.6|
|    max|            296.4|              49.6|             114.0|              27.0|
+-------+-----------------+------------------+------------------+------------------+



In [10]:
# Convert the data to dense vector
def transData(data):
    return data.rdd.map(lambda r: [Vectors.dense(r[:-1]), r[-1]])\
                        .toDF(['features','label'])

In [20]:
# Tansform the dataset to DataFrame
transformed = transData(df)
transformed.show(5)

+-----------------+-----+
|         features|label|
+-----------------+-----+
|[230.1,37.8,69.2]| 22.1|
| [44.5,39.3,45.1]| 10.4|
| [17.2,45.9,69.3]|  9.3|
|[151.5,41.3,58.5]| 18.5|
|[180.8,10.8,58.4]| 12.9|
+-----------------+-----+
only showing top 5 rows



In [22]:
# Deal with Categorical Variables
featureIndexer = VectorIndexer(inputCol="features", \
                              outputCol="indexedFeatures", \
                              maxCategories=4).fit(transformed)
data = featureIndexer.transform(transformed)

In [23]:
data.show(5)

+-----------------+-----+-----------------+
|         features|label|  indexedFeatures|
+-----------------+-----+-----------------+
|[230.1,37.8,69.2]| 22.1|[230.1,37.8,69.2]|
| [44.5,39.3,45.1]| 10.4| [44.5,39.3,45.1]|
| [17.2,45.9,69.3]|  9.3| [17.2,45.9,69.3]|
|[151.5,41.3,58.5]| 18.5|[151.5,41.3,58.5]|
|[180.8,10.8,58.4]| 12.9|[180.8,10.8,58.4]|
+-----------------+-----+-----------------+
only showing top 5 rows



In [27]:
# Split the data into training and test sets
(trainingData, testData) = data.randomSplit([0.6, 0.4])

In [28]:
trainingData.show(5)
testData.show(5)

+---------------+-----+---------------+
|       features|label|indexedFeatures|
+---------------+-----+---------------+
| [0.7,39.6,8.7]|  1.6| [0.7,39.6,8.7]|
| [5.4,29.9,9.4]|  5.3| [5.4,29.9,9.4]|
|[7.3,28.1,41.4]|  5.5|[7.3,28.1,41.4]|
|[7.8,38.9,50.6]|  6.6|[7.8,38.9,50.6]|
|[8.7,48.9,75.0]|  7.2|[8.7,48.9,75.0]|
+---------------+-----+---------------+
only showing top 5 rows

+----------------+-----+----------------+
|        features|label| indexedFeatures|
+----------------+-----+----------------+
|  [4.1,11.6,5.7]|  3.2|  [4.1,11.6,5.7]|
|  [8.4,27.2,2.1]|  5.7|  [8.4,27.2,2.1]|
|   [8.6,2.1,1.0]|  4.8|   [8.6,2.1,1.0]|
|[16.9,43.7,89.4]|  8.7|[16.9,43.7,89.4]|
| [17.2,4.1,31.6]|  5.9| [17.2,4.1,31.6]|
+----------------+-----+----------------+
only showing top 5 rows



In [29]:
# Fit and Pipeline Architecture
lr = LinearRegression()
pipeline = Pipeline(stages=[featureIndexer, lr])

model = pipeline.fit(trainingData)

In [48]:
def modelsummary(model):
    print ("Note: the last rows are the information for Intercept")
    print ("##","-------------------------------------------------")
    print ("##"," Estimate | Std.Error | t Values | P-value")
    coef = np.append(list(model.coefficients),model.intercept)
    Summary=model.summary
    for i in range(len(Summary.pValues)):
        print ("##",'{:10.6f}'.format(coef[i]),\
        '{:10.6f}'.format(Summary.coefficientStandardErrors[i]),\
        '{:8.3f}'.format(Summary.tValues[i]),\
        '{:10.6f}'.format(Summary.pValues[i]))
    print ("##",'---')
    print ("##","Mean squared error: % .6f" \
    % Summary.meanSquaredError, ", RMSE: % .6f" \
    % Summary.rootMeanSquaredError )
    print ("##","Multiple R-squared: %f" % Summary.r2, ", \
    Total iterations: %i"% Summary.totalIterations)

In [49]:
modelsummary(model.stages[-1])

Note: the last rows are the information for Intercept
## -------------------------------------------------
##  Estimate | Std.Error | t Values | P-value
##   0.047075   0.001852   25.423   0.000000
##   0.186401   0.011638   16.016   0.000000
##  -0.001213   0.007855   -0.154   0.877517
##   2.546243   0.448258    5.680   0.000000
## ---
## Mean squared error:  3.188372 , RMSE:  1.785601
## Multiple R-squared: 0.879481 ,     Total iterations: 1


In [51]:
## Predictions
predictions = model.transform(testData)


In [53]:
predictions.show(5)

+----------------+-----+----------------+------------------+
|        features|label| indexedFeatures|        prediction|
+----------------+-----+----------------+------------------+
|  [4.1,11.6,5.7]|  3.2|  [4.1,11.6,5.7]| 4.894589192046976|
|  [8.4,27.2,2.1]|  5.7|  [8.4,27.2,2.1]| 8.009237584899793|
|   [8.6,2.1,1.0]|  4.8|   [8.6,2.1,1.0]| 3.341316634916189|
|[16.9,43.7,89.4]|  8.7|[16.9,43.7,89.4]|11.379084624571089|
| [17.2,4.1,31.6]|  5.9| [17.2,4.1,31.6]| 4.081840259221819|
+----------------+-----+----------------+------------------+
only showing top 5 rows



In [55]:
evaluator = RegressionEvaluator(labelCol="label",
                               predictionCol="prediction",
                               metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("RMSE on test data = %g" %rmse )

RMSE on test data = 1.53375


In [56]:
y_true = predictions.select("label").toPandas()
y_pred = predictions.select("prediction").toPandas()
import sklearn.metrics
r2_score = sklearn.metrics.r2_score(y_true, y_pred)
print('r2_score: {0}'.format(r2_score))

  return f(*args, **kwds)
  return f(*args, **kwds)
  return f(*args, **kwds)
  return f(*args, **kwds)


r2_score: 0.9159748899328883
