In [1]:
import findspark, pyspark

from pyspark.sql               import SparkSession
from pyspark.sql.functions     import isnan, when, count, col
from pyspark.ml                import Pipeline
from pyspark.ml.feature        import RFormula, VectorAssembler, Binarizer, Normalizer, OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation     import BinaryClassificationEvaluator
from pyspark.ml.evaluation     import MulticlassClassificationEvaluator
from pyspark.ml.feature        import StringIndexer
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier

findspark.init()
spark = SparkSession.builder.appName("pipeline").getOrCreate()

In [7]:
cancerDF = spark.read.csv("Breast_cancer_data.csv", header=True)
cancerDF.show(2,truncate=False)

+-----------+------------+--------------+---------+---------------+---------+
|mean_radius|mean_texture|mean_perimeter|mean_area|mean_smoothness|diagnosis|
+-----------+------------+--------------+---------+---------------+---------+
|17.99      |10.38       |122.8         |1001.0   |0.1184         |0        |
|20.57      |17.77       |132.9         |1326.0   |0.08474        |0        |
+-----------+------------+--------------+---------+---------------+---------+
only showing top 2 rows



In [8]:
print(cancerDF.count())

569


In [9]:
cancerDF.select([count(when(isnan(c), c)).alias(c) for c in cancerDF.columns]).show()

+-----------+------------+--------------+---------+---------------+---------+
|mean_radius|mean_texture|mean_perimeter|mean_area|mean_smoothness|diagnosis|
+-----------+------------+--------------+---------+---------------+---------+
|          0|           0|             0|        0|              0|        0|
+-----------+------------+--------------+---------+---------------+---------+



In [10]:
Rformula = RFormula(formula="diagnosis ~ mean_radius + mean_texture + mean_perimeter + mean_area + mean_smoothness", 
                    featuresCol="independente", labelCol="dependente")

cancerDF = Rformula.fit(cancerDF).transform(cancerDF)
cancerDF.show()

+-----------+------------+--------------+---------+---------------+---------+--------------------+----------+
|mean_radius|mean_texture|mean_perimeter|mean_area|mean_smoothness|diagnosis|        independente|dependente|
+-----------+------------+--------------+---------+---------------+---------+--------------------+----------+
|      17.99|       10.38|         122.8|   1001.0|         0.1184|        0|(2465,[87,536,104...|       1.0|
|      20.57|       17.77|         132.9|   1326.0|        0.08474|        0|(2465,[383,690,95...|       1.0|
|      19.69|       21.25|         130.0|   1203.0|         0.1096|        0|(2465,[364,517,94...|       1.0|
|      11.42|       20.38|         77.58|    386.1|         0.1425|        0|(2465,[135,774,12...|       1.0|
|      20.29|       14.34|         135.1|   1297.0|         0.1003|        0|(2465,[375,592,10...|       1.0|
|      12.45|        15.7|         82.57|    477.1|         0.1278|        0|(2465,[46,456,131...|       1.0|
|      18.

In [12]:
cancerTreino, cancerTeste = cancerDF.randomSplit([0.7,0.3])
print(cancerTreino.count())
print(cancerTeste.count())

385
184


In [None]:
# Regressão Logistica

In [13]:
logistic = LogisticRegression(featuresCol="independente", labelCol="dependente", maxIter=1000, regParam=0.08)
modelo = logistic.fit(cancerTreino)

In [14]:
predicao = modelo.transform(cancerTeste)

In [15]:
performance = MulticlassClassificationEvaluator(labelCol="dependente", predictionCol="prediction", metricName="accuracy")
acuracia = performance.evaluate(predicao)
print(acuracia)

0.6304347826086957


In [None]:
# Randon Forest

In [16]:
randomforest = RandomForestClassifier(labelCol="dependente", featuresCol="independente", numTrees=40)
modelo = randomforest.fit(cancerTreino)

In [17]:
predicao = modelo.transform(cancerTeste)

In [18]:
performance = MulticlassClassificationEvaluator(labelCol="dependente", predictionCol="prediction", metricName="accuracy")
acuracia = performance.evaluate(predicao)
print(acuracia)

0.6358695652173914


In [None]:
# Gradient-Boosted Trees (GBTs)

In [20]:
gbt = GBTClassifier(labelCol="dependente", featuresCol="independente")
modelo = gbt.fit(cancerTreino)

In [21]:
predicao = modelo.transform(cancerTeste)

In [22]:
performance = MulticlassClassificationEvaluator(labelCol="dependente", predictionCol="prediction", metricName="accuracy")
acuracia = performance.evaluate(predicao)
print(acuracia)

0.6467391304347826


In [None]:
# MultiLayer Perceptron

In [27]:
print(len(cancerDF.columns))

8


In [28]:
mlp = MultilayerPerceptronClassifier(maxIter=1000, layers=[8,5,2], featuresCol="independente", labelCol="dependente")
modelo = mlp.fit(cancerTreino)

In [29]:
predicao = modelo.transform(cancerTeste)

In [30]:
performance = MulticlassClassificationEvaluator(labelCol="dependente", predictionCol="prediction", metricName="accuracy")
acuracia = performance.evaluate(predicao)
print(acuracia)

Py4JJavaError: An error occurred while calling o892.evaluate.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 465.0 failed 1 times, most recent failure: Lost task 0.0 in stage 465.0 (TID 453) (DESKTOP-KETEVB0.mshome.net executor driver): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (ProbabilisticClassificationModel$$Lambda$4298/0x00000001019ec040: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:217)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:197)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: requirement failed: A & B Dimension mismatch!
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.ann.BreezeUtil$.dgemm(BreezeUtil.scala:42)
	at org.apache.spark.ml.ann.AffineLayerModel.eval(Layer.scala:164)
	at org.apache.spark.ml.ann.FeedForwardModel.forward(Layer.scala:508)
	at org.apache.spark.ml.ann.FeedForwardModel.predictRaw(Layer.scala:561)
	at org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel.predictRaw(MultilayerPerceptronClassifier.scala:337)
	at org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel.predictRaw(MultilayerPerceptronClassifier.scala:279)
	at org.apache.spark.ml.classification.ProbabilisticClassificationModel.$anonfun$transform$2(ProbabilisticClassifier.scala:121)
	... 20 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1019)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1018)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$collectAsMap$1(PairRDDFunctions.scala:738)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:737)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.confusions$lzycompute(MulticlassMetrics.scala:61)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.confusions(MulticlassMetrics.scala:52)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass$lzycompute(MulticlassMetrics.scala:78)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass(MulticlassMetrics.scala:76)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.accuracy$lzycompute(MulticlassMetrics.scala:188)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.accuracy(MulticlassMetrics.scala:188)
	at org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator.evaluate(MulticlassClassificationEvaluator.scala:153)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (ProbabilisticClassificationModel$$Lambda$4298/0x00000001019ec040: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:217)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:197)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: java.lang.IllegalArgumentException: requirement failed: A & B Dimension mismatch!
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.ann.BreezeUtil$.dgemm(BreezeUtil.scala:42)
	at org.apache.spark.ml.ann.AffineLayerModel.eval(Layer.scala:164)
	at org.apache.spark.ml.ann.FeedForwardModel.forward(Layer.scala:508)
	at org.apache.spark.ml.ann.FeedForwardModel.predictRaw(Layer.scala:561)
	at org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel.predictRaw(MultilayerPerceptronClassifier.scala:337)
	at org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel.predictRaw(MultilayerPerceptronClassifier.scala:279)
	at org.apache.spark.ml.classification.ProbabilisticClassificationModel.$anonfun$transform$2(ProbabilisticClassifier.scala:121)
	... 20 more
