In [1]:
from pyspark import SparkContext
sc = SparkContext()

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark regression example") \
    .config("spark.some.config.option","some-value") \
    .getOrCreate()

In [3]:
regressionDataFrame = spark.read.csv('Advertising.csv', header=True, inferSchema = True)

In [4]:
regressionDataFrame.show(5)

+---+-----+-----+---------+-----+
|_c0|   TV|radio|newspaper|sales|
+---+-----+-----+---------+-----+
|  1|230.1| 37.8|     69.2| 22.1|
|  2| 44.5| 39.3|     45.1| 10.4|
|  3| 17.2| 45.9|     69.3|  9.3|
|  4|151.5| 41.3|     58.5| 18.5|
|  5|180.8| 10.8|     58.4| 12.9|
+---+-----+-----+---------+-----+
only showing top 5 rows



In [5]:
regressionDataFrame = regressionDataFrame.drop('_c0')
regressionDataFrame.show(5)

+-----+-----+---------+-----+
|   TV|radio|newspaper|sales|
+-----+-----+---------+-----+
|230.1| 37.8|     69.2| 22.1|
| 44.5| 39.3|     45.1| 10.4|
| 17.2| 45.9|     69.3|  9.3|
|151.5| 41.3|     58.5| 18.5|
|180.8| 10.8|     58.4| 12.9|
+-----+-----+---------+-----+
only showing top 5 rows



In [6]:
regressionDataFrame.columns

['TV', 'radio', 'newspaper', 'sales']

In [7]:
regressionDataFrame.groupBy(regressionDataFrame.TV > 100).count().show(5)

+----------+-----+
|(TV > 100)|count|
+----------+-----+
|      true|  130|
|     false|   70|
+----------+-----+



In [9]:
regressionDataFrame.filter(regressionDataFrame.TV > 100).show(5)

+-----+-----+---------+-----+
|   TV|radio|newspaper|sales|
+-----+-----+---------+-----+
|230.1| 37.8|     69.2| 22.1|
|151.5| 41.3|     58.5| 18.5|
|180.8| 10.8|     58.4| 12.9|
|120.2| 19.6|     11.6| 13.2|
|199.8|  2.6|     21.2| 10.6|
+-----+-----+---------+-----+
only showing top 5 rows



In [10]:
regressionDataFrame.select(regressionDataFrame.TV > 100).show(5)

+----------+
|(TV > 100)|
+----------+
|      true|
|     false|
|     false|
|      true|
|      true|
+----------+
only showing top 5 rows



In [11]:
regressionDataFrame.describe

<bound method DataFrame.describe of DataFrame[TV: double, radio: double, newspaper: double, sales: double]>

In [13]:
from pyspark.sql.functions import mean,min,max

regressionDataFrame.select([mean('TV'), min('TV'), max('TV')]).show()

+--------+-------+-------+
| avg(TV)|min(TV)|max(TV)|
+--------+-------+-------+
|147.0425|    0.7|  296.4|
+--------+-------+-------+



In [15]:
regressionDataFrame.crosstab('TV', 'radio').show()

+--------+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|TV_radio|0.0|0.3|0.4|0.8|1.3|1.4|1.5|1.6|1.9|10.0|10.1|10.6|10.8|11.0|11.6|11.7|11.8|12.0|12.1|12.6|13.9|14.0|14.3|14.5|14.7|14.8|15.4|15.5|15.8|15.9|16.0|16.7|16.9|17.0|17.2|17.4|18.1|18.4|1

In [16]:
regressionDataRDD = regressionDataFrame.rdd.map(list)

In [17]:
regressionDataRDD.take(5)

[[230.1, 37.8, 69.2, 22.1],
 [44.5, 39.3, 45.1, 10.4],
 [17.2, 45.9, 69.3, 9.3],
 [151.5, 41.3, 58.5, 18.5],
 [180.8, 10.8, 58.4, 12.9]]

In [18]:
from pyspark.mllib.regression import LabeledPoint
regressionDataLabelPoint = regressionDataRDD.map(lambda data : LabeledPoint(data[3], data[0:3]))

In [19]:
regressionDataLabelPoint.take(5)

[LabeledPoint(22.1, [230.1,37.8,69.2]),
 LabeledPoint(10.4, [44.5,39.3,45.1]),
 LabeledPoint(9.3, [17.2,45.9,69.3]),
 LabeledPoint(18.5, [151.5,41.3,58.5]),
 LabeledPoint(12.9, [180.8,10.8,58.4])]

In [20]:
regressionLabelPointSplit = regressionDataLabelPoint.randomSplit([0.7,0.3])

In [23]:
regressionLabelPointTrainData = regressionLabelPointSplit[0]
regressionLabelPointTestData = regressionLabelPointSplit[1]

In [None]:
from pyspark.mllib.regression import LinearRegressionWithSGD as lrSGD

ourModelWithLinearRegression = lrSGD.train(data = regressionLabelPointTrainData, iterations = 100, step = 0.002, intercept = True)