### Examples of Pyspark ML

In [1]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Missing').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/18 09:45:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
### Read in the dataset
training = spark.read.csv('DATA/test4.csv', header=True, inferSchema=True)

                                                                                

In [3]:
training.show()

+-------+---+----------+------+
|   name|age|expereince|salary|
+-------+---+----------+------+
|  Emily| 72|        14|159850|
|Jessica| 56|         6|211460|
| Amanda| 22|         2|124570|
| Amanda| 30|         9|164512|
|Michael| 62|         4|177736|
|  Sarah| 25|         1|165257|
| Ashley| 28|        23|137139|
| Amanda| 43|        15|205704|
| Amanda| 32|         9|182715|
|  Emily| 26|        16|208207|
|Matthew| 57|         6|257795|
|  Emily| 77|        22|239294|
|Michael| 47|        20|120671|
|Matthew| 59|        25|209227|
|  Jacob| 62|        14|270384|
|   John| 38|        21|281512|
|  Emily| 24|         7|201611|
| Ashley| 73|        22|121666|
|Matthew| 49|         4|243129|
|Jessica| 53|        20|112919|
+-------+---+----------+------+
only showing top 20 rows



In [4]:
# Slicing the df
training.collect()[2:6]

                                                                                

[Row(name='Amanda', age=22, expereince=2, salary=124570),
 Row(name='Amanda', age=30, expereince=9, salary=164512),
 Row(name='Michael', age=62, expereince=4, salary=177736),
 Row(name='Sarah', age=25, expereince=1, salary=165257)]

In [5]:
# Printing Schema 
training.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- expereince: integer (nullable = true)
 |-- salary: integer (nullable = true)



In [8]:
# Importing Vector Assembler
from pyspark.ml.feature import VectorAssembler
featureassembler = VectorAssembler(inputCols=['age', 'expereince'], outputCol='independent features')

In [9]:
output = featureassembler.transform(training)

In [11]:
output.show(3)

+-------+---+----------+------+--------------------+
|   name|age|expereince|salary|independent features|
+-------+---+----------+------+--------------------+
|  Emily| 72|        14|159850|         [72.0,14.0]|
|Jessica| 56|         6|211460|          [56.0,6.0]|
| Amanda| 22|         2|124570|          [22.0,2.0]|
+-------+---+----------+------+--------------------+
only showing top 3 rows



In [12]:
output.columns

['name', 'age', 'expereince', 'salary', 'independent features']

In [13]:
finalized_data = output.select('independent features', 'salary')

In [14]:
finalized_data.show(3)

+--------------------+------+
|independent features|salary|
+--------------------+------+
|         [72.0,14.0]|159850|
|          [56.0,6.0]|211460|
|          [22.0,2.0]|124570|
+--------------------+------+
only showing top 3 rows



In [15]:
## ML regression
from pyspark.ml.regression import LinearRegression
## Train test split
train_data, test_data = finalized_data.randomSplit([0.75, 0.25])
regressor = LinearRegression(featuresCol='independent features', labelCol='salary')
regressor = regressor.fit(train_data)

23/01/18 10:23:54 WARN Instrumentation: [1e4821b2] regParam is zero, which might cause numerical instability and overfitting.


[Stage 7:>                                                          (0 + 5) / 5]

23/01/18 10:23:57 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/01/18 10:23:57 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


                                                                                

23/01/18 10:23:58 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


                                                                                

In [16]:
## Coefficients
regressor.coefficients

DenseVector([-8.4975, 3.7774])

In [17]:
## Intercepts
regressor.intercept

187975.46140886357

In [20]:
## Prediction 
pred_results=regressor.evaluate(test_data)

                                                                                

In [21]:
pred_results.predictions.show()

[Stage 10:>                                                         (0 + 1) / 1]                                                                                

+--------------------+------+------------------+
|independent features|salary|        prediction|
+--------------------+------+------------------+
|          [18.0,1.0]| 87385|187826.28466400618|
|          [18.0,1.0]|110685|187826.28466400618|
|          [18.0,1.0]|111719|187826.28466400618|
|          [18.0,1.0]|120056|187826.28466400618|
|          [18.0,1.0]|120234|187826.28466400618|
|          [18.0,1.0]|122866|187826.28466400618|
|          [18.0,1.0]|129809|187826.28466400618|
|          [18.0,1.0]|132495|187826.28466400618|
|          [18.0,1.0]|139245|187826.28466400618|
|          [18.0,1.0]|153239|187826.28466400618|
|          [18.0,1.0]|154343|187826.28466400618|
|          [18.0,1.0]|167013|187826.28466400618|
|          [18.0,1.0]|174737|187826.28466400618|
|          [18.0,1.0]|189779|187826.28466400618|
|          [18.0,1.0]|201507|187826.28466400618|
|          [18.0,1.0]|204241|187826.28466400618|
|          [18.0,1.0]|221052|187826.28466400618|
|          [18.0,1.0

In [22]:
pred_results.meanAbsoluteError, pred_results.meanSquaredError

(56296.5176771802, 4225077951.5808015)