## MLlib usando Titanic dataset ##

In [55]:
from pyspark.sql import SparkSession
from pyspark .sql.types import StructType
spark = SparkSession.builder.appName("iris").getOrCreate()


In [56]:

df = spark.read.csv("iris.csv").cache()


In [57]:
df.show(10)

+---+---+---+---+------+
|_c0|_c1|_c2|_c3|   _c4|
+---+---+---+---+------+
|5.1|3.5|1.4|0.2|setosa|
|4.9|3.0|1.4|0.2|setosa|
|4.7|3.2|1.3|0.2|setosa|
|4.6|3.1|1.5|0.2|setosa|
|5.0|3.6|1.4|0.2|setosa|
|5.4|3.9|1.7|0.4|setosa|
|4.6|3.4|1.4|0.3|setosa|
|5.0|3.4|1.5|0.2|setosa|
|4.4|2.9|1.4|0.2|setosa|
|4.9|3.1|1.5|0.1|setosa|
+---+---+---+---+------+
only showing top 10 rows



In [58]:
df.toPandas()


Unnamed: 0,_c0,_c1,_c2,_c3,_c4
0,5.1,3.5,1.4,0.2,setosa
1,4.9,3.0,1.4,0.2,setosa
2,4.7,3.2,1.3,0.2,setosa
3,4.6,3.1,1.5,0.2,setosa
4,5.0,3.6,1.4,0.2,setosa
...,...,...,...,...,...
146,6.3,2.5,5.0,1.9,virginica
147,6.5,3.0,5.2,2.0,virginica
148,6.2,3.4,5.4,2.3,virginica
149,5.9,3.0,5.1,1.8,virginica


In [59]:
df.count()


151

In [60]:

df.columns

['_c0', '_c1', '_c2', '_c3', '_c4']

In [61]:

df.dtypes

[('_c0', 'string'),
 ('_c1', 'string'),
 ('_c2', 'string'),
 ('_c3', 'string'),
 ('_c4', 'string')]

In [62]:
df.describe().toPandas()


Unnamed: 0,summary,_c0,_c1,_c2,_c3,_c4
0,count,151.0,151.0,151.0,151.0,151
1,mean,5.83973509933775,3.06225165562914,3.747682119205301,1.1940397350993384,
2,stddev,0.8264848835236843,0.4438831260372194,1.7637019565707457,0.7627347103748434,
3,min,4.3,2.0,1.0,0.1,setosa
4,max,7.9,4.4,6.9,2.5,virginica


In [63]:

# Realizamos la transformación de las columnas de string a numeros:
from pyspark.sql.functions import col
dataset = df.select(col("_c0").cast("float"),
 col("_c1").cast("float"),
 col("_c2").cast("float"),
 col("_c3").cast("float"),
 col("_c4"))
dataset.show()

+---+---+---+---+------+
|_c0|_c1|_c2|_c3|   _c4|
+---+---+---+---+------+
|5.1|3.5|1.4|0.2|setosa|
|4.9|3.0|1.4|0.2|setosa|
|4.7|3.2|1.3|0.2|setosa|
|4.6|3.1|1.5|0.2|setosa|
|5.0|3.6|1.4|0.2|setosa|
|5.4|3.9|1.7|0.4|setosa|
|4.6|3.4|1.4|0.3|setosa|
|5.0|3.4|1.5|0.2|setosa|
|4.4|2.9|1.4|0.2|setosa|
|4.9|3.1|1.5|0.1|setosa|
|5.4|3.7|1.5|0.2|setosa|
|4.8|3.4|1.6|0.2|setosa|
|4.8|3.0|1.4|0.1|setosa|
|4.3|3.0|1.1|0.1|setosa|
|5.8|4.0|1.2|0.2|setosa|
|5.7|4.4|1.5|0.4|setosa|
|5.4|3.9|1.3|0.4|setosa|
|5.1|3.5|1.4|0.3|setosa|
|5.7|3.8|1.7|0.3|setosa|
|5.1|3.8|1.5|0.3|setosa|
+---+---+---+---+------+
only showing top 20 rows



In [64]:
from pyspark.sql.functions import isnull, when, count


In [65]:

dataset = dataset.replace("?", None).dropna(how="any")



In [66]:
dataset.dtypes

[('_c0', 'float'),
 ('_c1', 'float'),
 ('_c2', 'float'),
 ('_c3', 'float'),
 ('_c4', 'string')]

In [67]:
# Modificación de las columnas en formato string a número.
from pyspark.ml.feature import StringIndexer
dataset = StringIndexer(inputCol="_c4",
 outputCol="tipo",
 handleInvalid="keep")\
 .fit(dataset).transform(dataset)

In [68]:
dataset.show()

+---+---+---+---+------+----+
|_c0|_c1|_c2|_c3|   _c4|tipo|
+---+---+---+---+------+----+
|5.1|3.5|1.4|0.2|setosa| 0.0|
|4.9|3.0|1.4|0.2|setosa| 0.0|
|4.7|3.2|1.3|0.2|setosa| 0.0|
|4.6|3.1|1.5|0.2|setosa| 0.0|
|5.0|3.6|1.4|0.2|setosa| 0.0|
|5.4|3.9|1.7|0.4|setosa| 0.0|
|4.6|3.4|1.4|0.3|setosa| 0.0|
|5.0|3.4|1.5|0.2|setosa| 0.0|
|4.4|2.9|1.4|0.2|setosa| 0.0|
|4.9|3.1|1.5|0.1|setosa| 0.0|
|5.4|3.7|1.5|0.2|setosa| 0.0|
|4.8|3.4|1.6|0.2|setosa| 0.0|
|4.8|3.0|1.4|0.1|setosa| 0.0|
|4.3|3.0|1.1|0.1|setosa| 0.0|
|5.8|4.0|1.2|0.2|setosa| 0.0|
|5.7|4.4|1.5|0.4|setosa| 0.0|
|5.4|3.9|1.3|0.4|setosa| 0.0|
|5.1|3.5|1.4|0.3|setosa| 0.0|
|5.7|3.8|1.7|0.3|setosa| 0.0|
|5.1|3.8|1.5|0.3|setosa| 0.0|
+---+---+---+---+------+----+
only showing top 20 rows



In [69]:
dataset.describe().toPandas()

Unnamed: 0,summary,_c0,_c1,_c2,_c3,_c4,tipo
0,count,151.0,151.0,151.0,151.0,151,151.0
1,mean,5.839735094285169,3.0622516594185734,3.7476821072054234,1.194039726592847,,0.9933774834437086
2,stddev,0.8264848676137385,0.4438831292262711,1.7637019516072574,0.7627347005839645,,0.8205420057638477
3,min,4.3,2.0,1.0,0.1,setosa,0.0
4,max,7.9,4.4,6.9,2.5,virginica,2.0


In [70]:
dataset = dataset.drop("_c4")
dataset.show()

+---+---+---+---+----+
|_c0|_c1|_c2|_c3|tipo|
+---+---+---+---+----+
|5.1|3.5|1.4|0.2| 0.0|
|4.9|3.0|1.4|0.2| 0.0|
|4.7|3.2|1.3|0.2| 0.0|
|4.6|3.1|1.5|0.2| 0.0|
|5.0|3.6|1.4|0.2| 0.0|
|5.4|3.9|1.7|0.4| 0.0|
|4.6|3.4|1.4|0.3| 0.0|
|5.0|3.4|1.5|0.2| 0.0|
|4.4|2.9|1.4|0.2| 0.0|
|4.9|3.1|1.5|0.1| 0.0|
|5.4|3.7|1.5|0.2| 0.0|
|4.8|3.4|1.6|0.2| 0.0|
|4.8|3.0|1.4|0.1| 0.0|
|4.3|3.0|1.1|0.1| 0.0|
|5.8|4.0|1.2|0.2| 0.0|
|5.7|4.4|1.5|0.4| 0.0|
|5.4|3.9|1.3|0.4| 0.0|
|5.1|3.5|1.4|0.3| 0.0|
|5.7|3.8|1.7|0.3| 0.0|
|5.1|3.8|1.5|0.3| 0.0|
+---+---+---+---+----+
only showing top 20 rows



## Realizamos un vetor con la información correspondiente a la X: ##

In [71]:
required_features = ['_c0', '_c1',
 '_c2', '_c3']


In [72]:
from pyspark.ml.feature import VectorAssembler

In [73]:
assembler = VectorAssembler(inputCols=required_features,
 outputCol="feature")
transformed_data = assembler.transform(dataset)


In [75]:
transformed_data.show(100)

+---+---+---+---+----+--------------------+
|_c0|_c1|_c2|_c3|tipo|             feature|
+---+---+---+---+----+--------------------+
|5.1|3.5|1.4|0.2| 0.0|[5.09999990463256...|
|4.9|3.0|1.4|0.2| 0.0|[4.90000009536743...|
|4.7|3.2|1.3|0.2| 0.0|[4.69999980926513...|
|4.6|3.1|1.5|0.2| 0.0|[4.59999990463256...|
|5.0|3.6|1.4|0.2| 0.0|[5.0,3.5999999046...|
|5.4|3.9|1.7|0.4| 0.0|[5.40000009536743...|
|4.6|3.4|1.4|0.3| 0.0|[4.59999990463256...|
|5.0|3.4|1.5|0.2| 0.0|[5.0,3.4000000953...|
|4.4|2.9|1.4|0.2| 0.0|[4.40000009536743...|
|4.9|3.1|1.5|0.1| 0.0|[4.90000009536743...|
|5.4|3.7|1.5|0.2| 0.0|[5.40000009536743...|
|4.8|3.4|1.6|0.2| 0.0|[4.80000019073486...|
|4.8|3.0|1.4|0.1| 0.0|[4.80000019073486...|
|4.3|3.0|1.1|0.1| 0.0|[4.30000019073486...|
|5.8|4.0|1.2|0.2| 0.0|[5.80000019073486...|
|5.7|4.4|1.5|0.4| 0.0|[5.69999980926513...|
|5.4|3.9|1.3|0.4| 0.0|[5.40000009536743...|
|5.1|3.5|1.4|0.3| 0.0|[5.09999990463256...|
|5.7|3.8|1.7|0.3| 0.0|[5.69999980926513...|
|5.1|3.8|1.5|0.3| 0.0|[5.0999999

## Realizamos los modelos... ## 

**Algoritmo de Decission Tree Classifier**

In [77]:
# Crearemos los dataset de entrenamiento y test:
training_data, test_data = transformed_data.randomSplit([0.8, 0.2])


In [78]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="tipo",
 featuresCol="feature",
 maxDepth=5)

In [79]:
model = dt.fit(training_data)


In [80]:
predictions = model.transform(test_data)

In [81]:
predictions.show()

+---+---+---+---+----+--------------------+------------------+-----------------+----------+
|_c0|_c1|_c2|_c3|tipo|             feature|     rawPrediction|      probability|prediction|
+---+---+---+---+----+--------------------+------------------+-----------------+----------+
|4.4|3.0|1.3|0.2| 0.0|[4.40000009536743...|[28.0,0.0,0.0,0.0]|[1.0,0.0,0.0,0.0]|       0.0|
|4.6|3.2|1.4|0.2| 0.0|[4.59999990463256...|[28.0,0.0,0.0,0.0]|[1.0,0.0,0.0,0.0]|       0.0|
|4.7|3.2|1.3|0.2| 0.0|[4.69999980926513...|[28.0,0.0,0.0,0.0]|[1.0,0.0,0.0,0.0]|       0.0|
|4.8|3.0|1.4|0.1| 0.0|[4.80000019073486...|[28.0,0.0,0.0,0.0]|[1.0,0.0,0.0,0.0]|       0.0|
|4.8|3.0|1.4|0.3| 0.0|[4.80000019073486...|[28.0,0.0,0.0,0.0]|[1.0,0.0,0.0,0.0]|       0.0|
|4.8|3.1|1.6|0.2| 0.0|[4.80000019073486...|[28.0,0.0,0.0,0.0]|[1.0,0.0,0.0,0.0]|       0.0|
|4.8|3.4|1.9|0.2| 0.0|[4.80000019073486...|[28.0,0.0,0.0,0.0]|[1.0,0.0,0.0,0.0]|       0.0|
|4.9|2.5|4.5|1.7| 2.0|[4.90000009536743...|[0.0,29.0,0.0,0.0]|[0.0,1.0,0.0,0.0]|

In [82]:
# Evaluamos el modelo:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="tipo",
 predictionCol="prediction",
 metricName="accuracy")

In [83]:
accuracy = evaluator.evaluate(predictions)
print("Test Accuracy: ", accuracy*100)



Test Accuracy:  87.6923076923077


**Algortimo de Gradient-boosted tree Classifier**


In [89]:
training_data, test_data = transformed_data.randomSplit([0.8, 0.2])

In [90]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="tipo",
 featuresCol="feature",
 maxIter=5)

In [91]:
model = gbt.fit(training_data)
##Error porque es binario

Py4JJavaError: An error occurred while calling o1079.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 110.0 failed 1 times, most recent failure: Lost task 0.0 in stage 110.0 (TID 100) (61f4a26e2037 executor driver): java.lang.IllegalArgumentException: requirement failed: GBTClassifier was given dataset with invalid label 2.0.  Labels must be in {0,1}; note that GBTClassifier currently only supports binary classification.
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.classification.GBTClassifier.$anonfun$train$2(GBTClassifier.scala:176)
	at org.apache.spark.ml.classification.GBTClassifier.$anonfun$train$2$adapted(GBTClassifier.scala:173)
	at org.apache.spark.ml.PredictorParams.$anonfun$extractInstances$2(Predictor.scala:96)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$aggregate$2(RDD.scala:1198)
	at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2322)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2323)
	at org.apache.spark.rdd.RDD.$anonfun$aggregate$1(RDD.scala:1200)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1193)
	at org.apache.spark.ml.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:125)
	at org.apache.spark.ml.tree.impl.GradientBoostedTrees$.boost(GradientBoostedTrees.scala:333)
	at org.apache.spark.ml.tree.impl.GradientBoostedTrees$.run(GradientBoostedTrees.scala:61)
	at org.apache.spark.ml.classification.GBTClassifier.$anonfun$train$1(GBTClassifier.scala:209)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.classification.GBTClassifier.train(GBTClassifier.scala:170)
	at org.apache.spark.ml.classification.GBTClassifier.train(GBTClassifier.scala:58)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:115)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.IllegalArgumentException: requirement failed: GBTClassifier was given dataset with invalid label 2.0.  Labels must be in {0,1}; note that GBTClassifier currently only supports binary classification.
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.classification.GBTClassifier.$anonfun$train$2(GBTClassifier.scala:176)
	at org.apache.spark.ml.classification.GBTClassifier.$anonfun$train$2$adapted(GBTClassifier.scala:173)
	at org.apache.spark.ml.PredictorParams.$anonfun$extractInstances$2(Predictor.scala:96)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$aggregate$2(RDD.scala:1198)
	at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2322)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more


In [None]:

predictions = model.transform(test_data)


In [None]:

accuracy = evaluator.evaluate(predictions)
print("Test Accuracy: ", accuracy*100)

Test Accuracy:  96.29629629629629
