In [2]:
import os,sys
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


os.environ["PYSPARK_PYTHON"] = "python2"
os.environ['PYTHONPATH'] = ':'.join(sys.path)

spark = SparkSession \
    .builder \
    .appName("Spark ML App") \
     .getOrCreate()
data = spark.read.format("csv")\
.options(header='true', inferschema='true')\
.load("resources/advertising.csv")
data.describe().toPandas().transpose()
data.printSchema()
print(data.describe().toPandas().transpose())

(trainingData, testingData) = data.randomSplit([0.9, 0.1])
trainingData = trainingData.withColumnRenamed("sales","label")



assembler = VectorAssembler(inputCols=["TV","Radio","Newspaper"], outputCol="features")

standardizer = StandardScaler(withMean=True, withStd=True,
                              inputCol='features',
                              outputCol='std_features')


lr = LinearRegression(featuresCol = 'std_features', labelCol = 'label')

pipeline = Pipeline(stages=[assembler,standardizer, lr])

lrModel=pipeline.fit(trainingData);
# Variable's coefficients  and intercept of linear regression
print("Coefficients: %s" % str(lrModel.stages[2].coefficients))
print("Intercept: %s" % str(lrModel.stages[2].intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.stages[2].summary
print("numIterations: %d" % trainingSummary.totalIterations)
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

lrPredictions=lrModel.transform(testingData);
lrPredictions.select("prediction", "sales", "std_features").show(5)

paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(predictionCol='prediction', labelCol='label',metricName= "r2"),
                          numFolds=3)

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(trainingData)

print(cvModel.avgMetrics)
#print( cvModel.bestModel.stages[2].summary.r2)

for param in paramGrid:
    print (param)


cvPrediction = cvModel.transform(testingData)
cvPrediction.select("prediction", "sales", "std_features").show(5)



root
 |-- _c0: integer (nullable = true)
 |-- TV: double (nullable = true)
 |-- Radio: double (nullable = true)
 |-- Newspaper: double (nullable = true)
 |-- Sales: double (nullable = true)

               0                   1                   2    3      4
summary    count                mean              stddev  min    max
_c0          200               100.5  57.879184513951124    1    200
TV           200            147.0425   85.85423631490805  0.7  296.4
Radio        200  23.264000000000024  14.846809176168728  0.0   49.6
Newspaper    200  30.553999999999995   21.77862083852283  0.3  114.0
Sales        200  14.022500000000003   5.217456565710477  1.6   27.0
Coefficients: [3.86518373846,2.7428706721,-0.0331421842817]
Intercept: 14.029885057471262
numIterations: 1
+--------------------+
|           residuals|
+--------------------+
|  1.6190690838803974|
| -1.8628242324933293|
|  0.9738869841311555|
| -0.3860229966918638|
|   -5.13125885287361|
|  0.0979796476748831|
|  1.0166725