In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.ml.feature import VectorAssembler

# IrisDataSet.csv

In [2]:
spark = SparkSession.builder.appName('actividad08').getOrCreate()

23/06/07 10:52:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/07 10:52:17 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
df = spark.read.csv('IrisDataSet.csv', header=True).cache()

In [4]:
df.show(10)

+---+-------------+------------+-------------+------------+-----------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+---+-------------+------------+-------------+------------+-----------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
|  6|          5.4|         3.9|          1.7|         0.4|Iris-setosa|
|  7|          4.6|         3.4|          1.4|         0.3|Iris-setosa|
|  8|          5.0|         3.4|          1.5|         0.2|Iris-setosa|
|  9|          4.4|         2.9|          1.4|         0.2|Iris-setosa|
| 10|          4.9|         3.1|          1.5|         0.1|Iris-setosa|
+---+-------------+------------+-------------+------------+-----

In [5]:
from pyspark.ml.feature import StringIndexer

In [6]:
from pyspark.sql.functions import lit
from pyspark.sql.functions import col

# pasamos de sring a float los valores de input
df = df.select(col('SepalLengthCm').cast('float'),
                   col('SepalWidthCm').cast('float'),
                   col('PetalLengthCm').cast('float'),
                   col('PetalWidthCm').cast('float'),
                    col('Species'))


# Cambiamos el output a valores numericos -- 2 - Transformar los datos correspondientes a la columna especies usando StringIndexer
indexer = StringIndexer(inputCol="Species", outputCol="label")
df = indexer.fit(df).transform(df)

df = df.drop("Species")

df.show(1000)

+-------------+------------+-------------+------------+-----+
|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|label|
+-------------+------------+-------------+------------+-----+
|          5.1|         3.5|          1.4|         0.2|  0.0|
|          4.9|         3.0|          1.4|         0.2|  0.0|
|          4.7|         3.2|          1.3|         0.2|  0.0|
|          4.6|         3.1|          1.5|         0.2|  0.0|
|          5.0|         3.6|          1.4|         0.2|  0.0|
|          5.4|         3.9|          1.7|         0.4|  0.0|
|          4.6|         3.4|          1.4|         0.3|  0.0|
|          5.0|         3.4|          1.5|         0.2|  0.0|
|          4.4|         2.9|          1.4|         0.2|  0.0|
|          4.9|         3.1|          1.5|         0.1|  0.0|
|          5.4|         3.7|          1.5|         0.2|  0.0|
|          4.8|         3.4|          1.6|         0.2|  0.0|
|          4.8|         3.0|          1.4|         0.1|  0.0|
|       

In [7]:
df.dtypes

[('SepalLengthCm', 'float'),
 ('SepalWidthCm', 'float'),
 ('PetalLengthCm', 'float'),
 ('PetalWidthCm', 'float'),
 ('label', 'double')]

In [8]:
# Utilizamos el assembler 1 -- Transformar en vector los datos numéricos usando VectorAssembler
assembler = VectorAssembler(
    inputCols=["SepalLengthCm", "SepalWidthCm", "PetalLengthCm", "PetalWidthCm"],
    outputCol="Features"
)

output = assembler.transform(df)

output.show(truncate=False)

+-------------+------------+-------------+------------+-----+----------------------------------------------------------------------------+
|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|label|Features                                                                    |
+-------------+------------+-------------+------------+-----+----------------------------------------------------------------------------+
|5.1          |3.5         |1.4          |0.2         |0.0  |[5.099999904632568,3.5,1.399999976158142,0.20000000298023224]               |
|4.9          |3.0         |1.4          |0.2         |0.0  |[4.900000095367432,3.0,1.399999976158142,0.20000000298023224]               |
|4.7          |3.2         |1.3          |0.2         |0.0  |[4.699999809265137,3.200000047683716,1.2999999523162842,0.20000000298023224]|
|4.6          |3.1         |1.5          |0.2         |0.0  |[4.599999904632568,3.0999999046325684,1.5,0.20000000298023224]              |
|5.0          |3.6         

In [9]:
# Vamos con las predicciones

# Lo primero es renombrar la columna label y volvemos a la variable df
df = output.withColumnRenamed("label", "Species")
df.show()

+-------------+------------+-------------+------------+-------+--------------------+
|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|Species|            Features|
+-------------+------------+-------------+------------+-------+--------------------+
|          5.1|         3.5|          1.4|         0.2|    0.0|[5.09999990463256...|
|          4.9|         3.0|          1.4|         0.2|    0.0|[4.90000009536743...|
|          4.7|         3.2|          1.3|         0.2|    0.0|[4.69999980926513...|
|          4.6|         3.1|          1.5|         0.2|    0.0|[4.59999990463256...|
|          5.0|         3.6|          1.4|         0.2|    0.0|[5.0,3.5999999046...|
|          5.4|         3.9|          1.7|         0.4|    0.0|[5.40000009536743...|
|          4.6|         3.4|          1.4|         0.3|    0.0|[4.59999990463256...|
|          5.0|         3.4|          1.5|         0.2|    0.0|[5.0,3.4000000953...|
|          4.4|         2.9|          1.4|         0.2|    0.0|[4

In [10]:
# Dividimos el dataframe para las predicciones de despues:
train_df, test_df = df.randomSplit([0.8, 0.2], seed=123)

In [11]:
# 3 - Tree Classifier

from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(labelCol='Species',
                           featuresCol='Features',
                           maxDepth=5)

train_df.show()


+-------------+------------+-------------+------------+-------+--------------------+
|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|Species|            Features|
+-------------+------------+-------------+------------+-------+--------------------+
|          4.3|         3.0|          1.1|         0.1|    0.0|[4.30000019073486...|
|          4.4|         2.9|          1.4|         0.2|    0.0|[4.40000009536743...|
|          4.4|         3.2|          1.3|         0.2|    0.0|[4.40000009536743...|
|          4.5|         2.3|          1.3|         0.3|    0.0|[4.5,2.2999999523...|
|          4.6|         3.1|          1.5|         0.2|    0.0|[4.59999990463256...|
|          4.6|         3.4|          1.4|         0.3|    0.0|[4.59999990463256...|
|          4.6|         3.6|          1.0|         0.2|    0.0|[4.59999990463256...|
|          4.7|         3.2|          1.3|         0.2|    0.0|[4.69999980926513...|
|          4.7|         3.2|          1.6|         0.2|    0.0|[4

In [12]:
modelTC = dt.fit(train_df)

In [13]:
predictions = modelTC.transform(test_df)
predictions.show(5)

+-------------+------------+-------------+------------+-------+--------------------+--------------+-------------+----------+
|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|Species|            Features| rawPrediction|  probability|prediction|
+-------------+------------+-------------+------------+-------+--------------------+--------------+-------------+----------+
|          4.4|         3.0|          1.3|         0.2|    0.0|[4.40000009536743...|[37.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|          4.6|         3.2|          1.4|         0.2|    0.0|[4.59999990463256...|[37.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|          4.8|         3.0|          1.4|         0.3|    0.0|[4.80000019073486...|[37.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|          4.8|         3.1|          1.6|         0.2|    0.0|[4.80000019073486...|[37.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|          4.9|         3.0|          1.4|         0.2|    0.0|[4.90000009536743...|[37.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|


In [14]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol='Species',
                                              predictionCol='prediction',
                                              metricName='accuracy')

accuracy = evaluator.evaluate(predictions)

print('Test Accuracy = ', accuracy)

Test Accuracy =  0.9310344827586207


In [18]:
# 4 - Gradient-boosted tree classifier

from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(labelCol="Species", featuresCol="Features", maxIter=10)


In [19]:
modelGBT = gbt.fit(train_df)

23/06/07 10:57:55 ERROR Executor: Exception in task 0.0 in stage 26.0 (TID 26)
java.lang.IllegalArgumentException: requirement failed: GBTClassifier was given dataset with invalid label 2.0.  Labels must be in {0,1}; note that GBTClassifier currently only supports binary classification.
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.classification.GBTClassifier.$anonfun$train$2(GBTClassifier.scala:177)
	at org.apache.spark.ml.classification.GBTClassifier.$anonfun$train$2$adapted(GBTClassifier.scala:174)
	at org.apache.spark.ml.PredictorParams.$anonfun$extractInstances$2(Predictor.scala:96)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:16

Py4JJavaError: An error occurred while calling o368.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 26.0 failed 1 times, most recent failure: Lost task 0.0 in stage 26.0 (TID 26) (e8614ac1fc66 executor driver): java.lang.IllegalArgumentException: requirement failed: GBTClassifier was given dataset with invalid label 2.0.  Labels must be in {0,1}; note that GBTClassifier currently only supports binary classification.
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.classification.GBTClassifier.$anonfun$train$2(GBTClassifier.scala:177)
	at org.apache.spark.ml.classification.GBTClassifier.$anonfun$train$2$adapted(GBTClassifier.scala:174)
	at org.apache.spark.ml.PredictorParams.$anonfun$extractInstances$2(Predictor.scala:96)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:219)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:219)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1429)
	at org.apache.spark.rdd.RDD.$anonfun$aggregate$2(RDD.scala:1207)
	at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2290)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2291)
	at org.apache.spark.rdd.RDD.$anonfun$aggregate$1(RDD.scala:1209)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1202)
	at org.apache.spark.ml.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:125)
	at org.apache.spark.ml.tree.impl.GradientBoostedTrees$.boost(GradientBoostedTrees.scala:333)
	at org.apache.spark.ml.tree.impl.GradientBoostedTrees$.run(GradientBoostedTrees.scala:61)
	at org.apache.spark.ml.classification.GBTClassifier.$anonfun$train$1(GBTClassifier.scala:210)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.classification.GBTClassifier.train(GBTClassifier.scala:171)
	at org.apache.spark.ml.classification.GBTClassifier.train(GBTClassifier.scala:59)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: requirement failed: GBTClassifier was given dataset with invalid label 2.0.  Labels must be in {0,1}; note that GBTClassifier currently only supports binary classification.
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.classification.GBTClassifier.$anonfun$train$2(GBTClassifier.scala:177)
	at org.apache.spark.ml.classification.GBTClassifier.$anonfun$train$2$adapted(GBTClassifier.scala:174)
	at org.apache.spark.ml.PredictorParams.$anonfun$extractInstances$2(Predictor.scala:96)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:219)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:219)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1429)
	at org.apache.spark.rdd.RDD.$anonfun$aggregate$2(RDD.scala:1207)
	at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2290)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [None]:
# El error indica que "GBTClassifier was given dataset with invalid label 2.0.  Labels must be in {0,1}; note that GBTClassifier currently only supports binary classification"

# Es decir, que al no ser una clasificacion binaria no se puede utilizar. (ya que buscamos tres posibles salidas, los tres tipos de flores)

In [20]:
# 5 - Random Forest Classifier
from pyspark.ml.classification import RandomForestClassifier

In [21]:
rf = RandomForestClassifier(labelCol="Species", featuresCol="Features")

In [22]:
modelRF = rf.fit(train_df)

In [23]:
predictions = modelRF.transform(test_df)
predictions.show(5)

+-------------+------------+-------------+------------+-------+--------------------+--------------+-------------+----------+
|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|Species|            Features| rawPrediction|  probability|prediction|
+-------------+------------+-------------+------------+-------+--------------------+--------------+-------------+----------+
|          4.4|         3.0|          1.3|         0.2|    0.0|[4.40000009536743...|[20.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|          4.6|         3.2|          1.4|         0.2|    0.0|[4.59999990463256...|[20.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|          4.8|         3.0|          1.4|         0.3|    0.0|[4.80000019073486...|[20.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|          4.8|         3.1|          1.6|         0.2|    0.0|[4.80000019073486...|[20.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|          4.9|         3.0|          1.4|         0.2|    0.0|[4.90000009536743...|[20.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|


In [24]:
evaluatorRF = MulticlassClassificationEvaluator(labelCol='Species',
                                              predictionCol='prediction',
                                              metricName='accuracy')

accuracyRF = evaluatorRF.evaluate(predictions)

print('Test Accuracy = ', accuracyRF)

Test Accuracy =  0.9655172413793104
