In [1]:
import findspark
findspark.init('/usr/local/spark23/')

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession \
        .builder \
        .appName('hellospark') \
        .getOrCreate()

In [4]:
df = spark.read.format('csv') \
        .options(header='true', inferSchema='true') \
        .load('/home/hadoop/project/pydone/spark/dataset/Advertising.csv')

In [5]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- TV: double (nullable = true)
 |-- Radio: double (nullable = true)
 |-- Newspaper: double (nullable = true)
 |-- Sales: double (nullable = true)



In [6]:
df.count()

200

In [7]:
df.describe().show()

+-------+------------------+-----------------+------------------+------------------+------------------+
|summary|               _c0|               TV|             Radio|         Newspaper|             Sales|
+-------+------------------+-----------------+------------------+------------------+------------------+
|  count|               200|              200|               200|               200|               200|
|   mean|             100.5|         147.0425|23.264000000000024|30.553999999999995|14.022500000000003|
| stddev|57.879184513951124|85.85423631490805|14.846809176168728| 21.77862083852283| 5.217456565710477|
|    min|                 1|              0.7|               0.0|               0.3|               1.6|
|    max|               200|            296.4|              49.6|             114.0|              27.0|
+-------+------------------+-----------------+------------------+------------------+------------------+



Convert the data to dense vector (features and label)

In [8]:
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

In [9]:
#function to convert data to dense vector

def transData(data):
    return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])

In [35]:
##convert data to dense vector
transformed= transData(df)
transformed.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[1.0,230.1,37.8,6...| 22.1|
|[2.0,44.5,39.3,45.1]| 10.4|
|[3.0,17.2,45.9,69.3]|  9.3|
|[4.0,151.5,41.3,5...| 18.5|
|[5.0,180.8,10.8,5...| 12.9|
+--------------------+-----+
only showing top 5 rows



In [12]:
#split data
(trainingData, testData) = transformed.randomSplit([0.6, 0.4])

In [13]:
trainingData.show(5)
testData.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[3.0,17.2,45.9,69.3]|  9.3|
|[5.0,180.8,10.8,5...| 12.9|
|[8.0,120.2,19.6,1...| 13.2|
|[10.0,199.8,2.6,2...| 10.6|
|[11.0,66.1,5.8,24.2]|  8.6|
+--------------------+-----+
only showing top 5 rows

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[1.0,230.1,37.8,6...| 22.1|
|[2.0,44.5,39.3,45.1]| 10.4|
|[4.0,151.5,41.3,5...| 18.5|
| [6.0,8.7,48.9,75.0]|  7.2|
|[7.0,57.5,32.8,23.5]| 11.8|
+--------------------+-----+
only showing top 5 rows



In [14]:
# import linear regression
from pyspark.ml.regression import LinearRegression

lr = LinearRegression()

In [15]:
#train model
model = lr.fit(trainingData)

In [33]:
#model summary
summary = model.summary

In [34]:
summary.r2

0.9030491914562739

In [36]:
#do prediction
predictions = model.transform(testData)

In [37]:
#show prediction columns
predictions.columns

['features', 'label', 'prediction']

In [38]:
# Select few rows to display.
predictions.select("features","label","prediction").show()

+--------------------+-----+------------------+
|            features|label|        prediction|
+--------------------+-----+------------------+
|[1.0,230.1,37.8,6...| 22.1|20.904771023439174|
|[2.0,44.5,39.3,45.1]| 10.4|12.453696002954887|
|[4.0,151.5,41.3,5...| 18.5|17.846289704248306|
| [6.0,8.7,48.9,75.0]|  7.2|12.799930618817228|
|[7.0,57.5,32.8,23.5]| 11.8| 11.68872108942375|
|   [9.0,8.6,2.1,1.0]|  4.8| 3.632932510358961|
|[12.0,214.7,24.0,...| 17.4|17.151230769537676|
|[13.0,23.8,35.1,6...|  9.2|10.889998215404841|
|[15.0,204.1,32.9,...| 19.0|18.623337640571073|
|[17.0,67.8,36.6,1...| 12.5|13.563949303145977|
|[18.0,281.4,39.6,...| 24.4|23.483112276611152|
|[19.0,69.2,20.5,1...| 11.3| 9.926079913074105|
|[26.0,262.9,3.5,1...| 12.0|15.725092163069117|
|[27.0,142.9,29.3,...| 15.0|14.882104487799701|
| [35.0,95.7,1.4,7.4]|  9.5| 7.551096867736685|
|[36.0,290.7,4.1,8.5]| 12.8| 17.02390982149156|
|[38.0,74.7,49.4,4...| 14.7|15.685842843594225|
|[39.0,43.1,26.7,3...| 10.1| 9.986347649

In [28]:
#regresion evaluator

from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="label",
                                predictionCol="prediction",
                                metricName="rmse")

rmse = evaluator.evaluate(predictions)
print("RMSE = ", rmse)

RMSE =  1.694430830499316


In [39]:
#compare prediction label and datatest label
y_true = predictions.select("label").toPandas()
y_pred = predictions.select("prediction").toPandas()

import sklearn.metrics
r2_score = sklearn.metrics.r2_score(y_true, y_pred)
print('r2_score: {0}'.format(r2_score))


r2_score: 0.8836839904670508
