In [1]:
# import the pyspark

from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName('LinearRegression').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/07/17 01:47:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
#loading dataset 'tips'
df = spark.read.csv('tips.csv', header=True, inferSchema=True)

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [6]:
# print the dataframe
df.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

In [7]:
# print the schema of df
df.printSchema()

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)



In [8]:
# check columns
df.columns

['total_bill', 'tip', 'sex', 'smoker', 'day', 'time', 'size']

There are three categorical columns 'sex','smoker','day'and 'time'
we need to convert it to numerical values

In [9]:
# import StringIndexer for handling categorical features

from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCols=['sex','smoker','day','time'],outputCols=['n_sex','n_smoker','n_day','n_time'])
new_df = indexer.fit(df).transform(df)
new_df.show()

                                                                                

+----------+----+------+------+---+------+----+-----+--------+-----+------+
|total_bill| tip|   sex|smoker|day|  time|size|n_sex|n_smoker|n_day|n_time|
+----------+----+------+------+---+------+----+-----+--------+-----+------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|  1.0|     0.0|  1.0|   0.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|  0.0|     0.0|  1.0|   0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|  0.0|     0.0|  1.0|   0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|  0.0|     0.0|  1.0|   0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|  1.0|     0.0|  1.0|   0.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|  0.0|     0.0|  1.0|   0.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|  0.0|     0.0|  1.0|   0.0|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|  0.0|     0.0|  1.0|   0.0|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|  0.0|     0.0|  1.0|   0.0|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|  0.0|     0.0|  1.0|   0.0|
|     10.27|

In [10]:
new_df.columns

['total_bill',
 'tip',
 'sex',
 'smoker',
 'day',
 'time',
 'size',
 'n_sex',
 'n_smoker',
 'n_day',
 'n_time']

In [11]:
# select the independent feature and group them together
from pyspark.ml.feature import VectorAssembler
featureassembler = VectorAssembler(inputCols=['tip','size','n_sex','n_smoker','n_day','n_time'],
                                   outputCol='Independent features')

output = featureassembler.transform(new_df)

In [12]:
output.select('Independent features').show()

+--------------------+
|Independent features|
+--------------------+
|[1.01,2.0,1.0,0.0...|
|[1.66,3.0,0.0,0.0...|
|[3.5,3.0,0.0,0.0,...|
|[3.31,2.0,0.0,0.0...|
|[3.61,4.0,1.0,0.0...|
|[4.71,4.0,0.0,0.0...|
|[2.0,2.0,0.0,0.0,...|
|[3.12,4.0,0.0,0.0...|
|[1.96,2.0,0.0,0.0...|
|[3.23,2.0,0.0,0.0...|
|[1.71,2.0,0.0,0.0...|
|[5.0,4.0,1.0,0.0,...|
|[1.57,2.0,0.0,0.0...|
|[3.0,4.0,0.0,0.0,...|
|[3.02,2.0,1.0,0.0...|
|[3.92,2.0,0.0,0.0...|
|[1.67,3.0,1.0,0.0...|
|[3.71,3.0,0.0,0.0...|
|[3.5,3.0,1.0,0.0,...|
|(6,[0,1],[3.35,3.0])|
+--------------------+
only showing top 20 rows



In [13]:
# select independent and dependent columns
final_df = output.select(['Independent features', 'total_bill'])
final_df.show()

+--------------------+----------+
|Independent features|total_bill|
+--------------------+----------+
|[1.01,2.0,1.0,0.0...|     16.99|
|[1.66,3.0,0.0,0.0...|     10.34|
|[3.5,3.0,0.0,0.0,...|     21.01|
|[3.31,2.0,0.0,0.0...|     23.68|
|[3.61,4.0,1.0,0.0...|     24.59|
|[4.71,4.0,0.0,0.0...|     25.29|
|[2.0,2.0,0.0,0.0,...|      8.77|
|[3.12,4.0,0.0,0.0...|     26.88|
|[1.96,2.0,0.0,0.0...|     15.04|
|[3.23,2.0,0.0,0.0...|     14.78|
|[1.71,2.0,0.0,0.0...|     10.27|
|[5.0,4.0,1.0,0.0,...|     35.26|
|[1.57,2.0,0.0,0.0...|     15.42|
|[3.0,4.0,0.0,0.0,...|     18.43|
|[3.02,2.0,1.0,0.0...|     14.83|
|[3.92,2.0,0.0,0.0...|     21.58|
|[1.67,3.0,1.0,0.0...|     10.33|
|[3.71,3.0,0.0,0.0...|     16.29|
|[3.5,3.0,1.0,0.0,...|     16.97|
|(6,[0,1],[3.35,3.0])|     20.65|
+--------------------+----------+
only showing top 20 rows



In [21]:
# Import LinearRegression

from pyspark.ml.regression import LinearRegression

train_data,test_data = final_df.randomSplit([0.75,0.25])
reg = LinearRegression(featuresCol='Independent features',labelCol='total_bill')
reg_train = reg.fit(train_data)

22/07/17 02:33:09 WARN Instrumentation: [c1fdfd45] regParam is zero, which might cause numerical instability and overfitting.


In [22]:
reg_train.coefficients

DenseVector([3.0952, 3.5902, 0.2815, 2.3743, -0.556, -0.0408])

In [23]:
reg_train.intercept

0.6081708404376609

In [24]:
# Prediction

pred = reg_train.evaluate(test_data)

In [25]:
pred.predictions.show()

+--------------------+----------+------------------+
|Independent features|total_bill|        prediction|
+--------------------+----------+------------------+
|(6,[0,1],[1.25,2.0])|     10.51| 11.65769422796822|
|(6,[0,1],[1.47,2.0])|     10.77|12.338642925817396|
|(6,[0,1],[2.01,2.0])|     20.23|14.010062456901737|
|(6,[0,1],[2.31,3.0])|     18.69|18.528877210662554|
| (6,[0,1],[3.0,4.0])|     20.45|24.254828292428726|
|(6,[0,1],[3.15,3.0])|     20.08|21.128863147904866|
|(6,[0,1],[3.18,2.0])|     19.82|17.631471440917817|
| (6,[0,1],[5.0,3.0])|     31.27| 26.85502265254567|
|(6,[0,1],[5.92,3.0])|     29.03| 29.70262629809677|
|[1.0,2.0,1.0,1.0,...|      5.75|11.871691145041417|
|[1.5,2.0,0.0,1.0,...|     11.59|14.805760412963254|
|[1.5,2.0,1.0,0.0,...|      8.35|11.560211374859213|
|[1.63,2.0,1.0,0.0...|     11.87|11.962590150860999|
|[1.71,2.0,0.0,0.0...|     10.27| 12.52550142039647|
|[1.98,2.0,0.0,1.0...|     11.02|16.291466662816003|
|[2.0,2.0,0.0,0.0,...|     13.03|12.8262967517

In [26]:
# Performance metric
pred.r2, pred.meanAbsoluteError, pred.meanSquaredError

(0.42586360457168304, 4.6976807338222715, 38.97004654967041)