In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/home/hu/spark-3.2.2-bin-hadoop3.2"

#Buscando e inicializando la instalación de Spark
import findspark
findspark.init()
findspark.find()

'/home/hu/spark-3.2.2-bin-hadoop3.2'

In [5]:
from pyspark.sql import SparkSession

df_ml = SparkSession.builder.appName('iris').getOrCreate()

22/11/18 15:47:45 WARN Utils: Your hostname, hu-ThinkPad-W520 resolves to a loopback address: 127.0.1.1; using 192.168.1.96 instead (on interface wlp3s0)
22/11/18 15:47:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/11/18 15:47:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
dataset = df_ml.read.csv('iris.csv', header = True, inferSchema=True)
dataset.show()

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|variety|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| Setosa|
|         4.9|        3.0|         1.4|        0.2| Setosa|
|         4.7|        3.2|         1.3|        0.2| Setosa|
|         4.6|        3.1|         1.5|        0.2| Setosa|
|         5.0|        3.6|         1.4|        0.2| Setosa|
|         5.4|        3.9|         1.7|        0.4| Setosa|
|         4.6|        3.4|         1.4|        0.3| Setosa|
|         5.0|        3.4|         1.5|        0.2| Setosa|
|         4.4|        2.9|         1.4|        0.2| Setosa|
|         4.9|        3.1|         1.5|        0.1| Setosa|
|         5.4|        3.7|         1.5|        0.2| Setosa|
|         4.8|        3.4|         1.6|        0.2| Setosa|
|         4.8|        3.0|         1.4|        0.1| Setosa|
|         4.3|        3.0|         1.1| 

To work with regression models we have to use *VectorAssembler* to convert the independent variables into a vector that includes them  

vectorize all numerical columns into a single feature column

In [7]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
#import sys

In [8]:
feature_cols = dataset.columns[:-1]
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')

Later they are integrated into the dataset that was already loaded using the *transform()* function.

In [9]:
dataset_feature = assembler.transform(dataset)
dataset_feature.show()

+------------+-----------+------------+-----------+-------+-----------------+
|sepal_length|sepal_width|petal_length|petal_width|variety|         features|
+------------+-----------+------------+-----------+-------+-----------------+
|         5.1|        3.5|         1.4|        0.2| Setosa|[5.1,3.5,1.4,0.2]|
|         4.9|        3.0|         1.4|        0.2| Setosa|[4.9,3.0,1.4,0.2]|
|         4.7|        3.2|         1.3|        0.2| Setosa|[4.7,3.2,1.3,0.2]|
|         4.6|        3.1|         1.5|        0.2| Setosa|[4.6,3.1,1.5,0.2]|
|         5.0|        3.6|         1.4|        0.2| Setosa|[5.0,3.6,1.4,0.2]|
|         5.4|        3.9|         1.7|        0.4| Setosa|[5.4,3.9,1.7,0.4]|
|         4.6|        3.4|         1.4|        0.3| Setosa|[4.6,3.4,1.4,0.3]|
|         5.0|        3.4|         1.5|        0.2| Setosa|[5.0,3.4,1.5,0.2]|
|         4.4|        2.9|         1.4|        0.2| Setosa|[4.4,2.9,1.4,0.2]|
|         4.9|        3.1|         1.5|        0.1| Setosa|[4.9,

In [10]:
# convert text labels into indexed label
dataset_feature_label = dataset_feature.select(['features', 'variety'])
label_indexer = StringIndexer(inputCol='variety', outputCol='label').fit(dataset_feature_label)
dataset_feature_label_indexed = label_indexer.transform(dataset_feature_label)

# only select the features and indexed label column
dataset_feature_label_indexed = dataset_feature_label_indexed.select(['features', 'label'])
dataset_feature_label_indexed.show()

+-----------------+-----+
|         features|label|
+-----------------+-----+
|[5.1,3.5,1.4,0.2]|  0.0|
|[4.9,3.0,1.4,0.2]|  0.0|
|[4.7,3.2,1.3,0.2]|  0.0|
|[4.6,3.1,1.5,0.2]|  0.0|
|[5.0,3.6,1.4,0.2]|  0.0|
|[5.4,3.9,1.7,0.4]|  0.0|
|[4.6,3.4,1.4,0.3]|  0.0|
|[5.0,3.4,1.5,0.2]|  0.0|
|[4.4,2.9,1.4,0.2]|  0.0|
|[4.9,3.1,1.5,0.1]|  0.0|
|[5.4,3.7,1.5,0.2]|  0.0|
|[4.8,3.4,1.6,0.2]|  0.0|
|[4.8,3.0,1.4,0.1]|  0.0|
|[4.3,3.0,1.1,0.1]|  0.0|
|[5.8,4.0,1.2,0.2]|  0.0|
|[5.7,4.4,1.5,0.4]|  0.0|
|[5.4,3.9,1.3,0.4]|  0.0|
|[5.1,3.5,1.4,0.3]|  0.0|
|[5.7,3.8,1.7,0.3]|  0.0|
|[5.1,3.8,1.5,0.3]|  0.0|
+-----------------+-----+
only showing top 20 rows



In [11]:
# set regularization rate
reg = 0.01

In [12]:
# use Logistic Regression to train on the training set
train, test = dataset_feature_label_indexed.randomSplit([0.75, 0.25])
lr = LogisticRegression(regParam=reg)
#model = lr.fit(train)

In [19]:
# k folder cross validation
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

#modelEvaluator=RegressionEvaluator()
evaluator = MulticlassClassificationEvaluator(metricName='accuracy')
pipeline = Pipeline(stages=[lr])
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).addGrid(lr.elasticNetParam, [0, 1]).build()

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

cvModel = crossval.fit(test)                          

In [20]:
#Print the summery of cross validation
trainingSummary = cvModel.bestModel.summary
trainingSummary.objectiveHistory
trainingSummary.totalIterations

61

In [21]:
# predict on the test set
#prediction = model.transform(test)
prediction = cvModel.transform(test)
prediction.show(10)

+-----------------+-----+--------------------+--------------------+----------+
|         features|label|       rawPrediction|         probability|prediction|
+-----------------+-----+--------------------+--------------------+----------+
|[4.4,3.0,1.3,0.2]|  0.0|[4.71991255911337...|[0.98278420914390...|       0.0|
|[4.5,2.3,1.3,0.3]|  0.0|[4.71991255911337...|[0.95289140284613...|       0.0|
|[4.6,3.2,1.4,0.2]|  0.0|[4.45095390056219...|[0.98324293341434...|       0.0|
|[4.6,3.4,1.4,0.3]|  0.0|[4.45095390056219...|[0.98748778586317...|       0.0|
|[4.6,3.6,1.0,0.2]|  0.0|[5.52678853476693...|[0.99679787624147...|       0.0|
|[4.7,3.2,1.3,0.2]|  0.0|[4.71991255911337...|[0.98714386713815...|       0.0|
|[4.7,3.2,1.6,0.2]|  0.0|[3.91303658345982...|[0.97164311881975...|       0.0|
|[4.8,3.0,1.4,0.1]|  0.0|[4.45095390056219...|[0.97759050539651...|       0.0|
|[4.8,3.0,1.4,0.3]|  0.0|[4.45095390056219...|[0.97759034514026...|       0.0|
|[4.9,3.1,1.5,0.1]|  0.0|[4.18199524201100...|[0.974

In [22]:
# evaluate the accuracy of the model using the test set
evaluator = MulticlassClassificationEvaluator(metricName='accuracy')
accuracy = evaluator.evaluate(prediction)
print('Prediction Accuracy is ' + str(accuracy * 100) + '%')

Prediction Accuracy is 100.0%
