Using XG Boosting and PySpark 

In [56]:
from pyspark.sql import SparkSession
import pandas as pd

In [57]:
spark = SparkSession.builder.appName('boosting').getOrCreate()

In [58]:
training = spark.read.csv ('poker_train.csv')
testing = spark.read.csv ('poker_test.csv')
training.show()

+---+---+---+---+---+---+---+---+---+---+----+
|_c0|_c1|_c2|_c3|_c4|_c5|_c6|_c7|_c8|_c9|_c10|
+---+---+---+---+---+---+---+---+---+---+----+
|  1| 10|  1| 11|  1| 13|  1| 12|  1|  1|   9|
|  2| 11|  2| 13|  2| 10|  2| 12|  2|  1|   9|
|  3| 12|  3| 11|  3| 13|  3| 10|  3|  1|   9|
|  4| 10|  4| 11|  4|  1|  4| 13|  4| 12|   9|
|  4|  1|  4| 13|  4| 12|  4| 11|  4| 10|   9|
|  1|  2|  1|  4|  1|  5|  1|  3|  1|  6|   8|
|  1|  9|  1| 12|  1| 10|  1| 11|  1| 13|   8|
|  2|  1|  2|  2|  2|  3|  2|  4|  2|  5|   8|
|  3|  5|  3|  6|  3|  9|  3|  7|  3|  8|   8|
|  4|  1|  4|  4|  4|  2|  4|  3|  4|  5|   8|
|  1|  1|  2|  1|  3|  9|  1|  5|  2|  3|   1|
|  2|  6|  2|  1|  4| 13|  2|  4|  4|  9|   0|
|  1| 10|  4|  6|  1|  2|  1|  1|  3|  8|   0|
|  2| 13|  2|  1|  4|  4|  1|  5|  2| 11|   0|
|  3|  8|  4| 12|  3|  9|  4|  2|  3|  2|   1|
|  1|  3|  4|  7|  1|  5|  2|  4|  4| 13|   0|
|  1|  4|  1|  1|  1|  3|  3|  5|  3|  2|   4|
|  3|  8|  3| 12|  2|  7|  2|  6|  1|  2|   0|
|  4|  8|  1|

In [59]:
input = training.drop('_c10')
print (training.printSchema())
print (training.columns)
print (input.columns)

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)

None
['_c0', '_c1', '_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9', '_c10']
['_c0', '_c1', '_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9']


In [37]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=input.columns, outputCol='_c11')

In [38]:
from pyspark.sql.types import IntegerType

for col_name in training.columns:
    training = training.withColumn(col_name, training[col_name].cast(IntegerType()))

In [39]:
output = assembler.transform(training)
output.show(truncate=False)

+---+---+---+---+---+---+---+---+---+---+----+---------------------------------------------+
|_c0|_c1|_c2|_c3|_c4|_c5|_c6|_c7|_c8|_c9|_c10|_c11                                         |
+---+---+---+---+---+---+---+---+---+---+----+---------------------------------------------+
|1  |10 |1  |11 |1  |13 |1  |12 |1  |1  |9   |[1.0,10.0,1.0,11.0,1.0,13.0,1.0,12.0,1.0,1.0]|
|2  |11 |2  |13 |2  |10 |2  |12 |2  |1  |9   |[2.0,11.0,2.0,13.0,2.0,10.0,2.0,12.0,2.0,1.0]|
|3  |12 |3  |11 |3  |13 |3  |10 |3  |1  |9   |[3.0,12.0,3.0,11.0,3.0,13.0,3.0,10.0,3.0,1.0]|
|4  |10 |4  |11 |4  |1  |4  |13 |4  |12 |9   |[4.0,10.0,4.0,11.0,4.0,1.0,4.0,13.0,4.0,12.0]|
|4  |1  |4  |13 |4  |12 |4  |11 |4  |10 |9   |[4.0,1.0,4.0,13.0,4.0,12.0,4.0,11.0,4.0,10.0]|
|1  |2  |1  |4  |1  |5  |1  |3  |1  |6  |8   |[1.0,2.0,1.0,4.0,1.0,5.0,1.0,3.0,1.0,6.0]    |
|1  |9  |1  |12 |1  |10 |1  |11 |1  |13 |8   |[1.0,9.0,1.0,12.0,1.0,10.0,1.0,11.0,1.0,13.0]|
|2  |1  |2  |2  |2  |3  |2  |4  |2  |5  |8   |[2.0,1.0,2.0,2.0,2.0,3.0

In [40]:
final = output.select('_c11', '_c10')
input = final.select ('_c11')
output = final.select ('_c10')
print (input.columns)
print (output.columns)
final.show(truncate=False)

['_c11']
['_c10']
+---------------------------------------------+----+
|_c11                                         |_c10|
+---------------------------------------------+----+
|[1.0,10.0,1.0,11.0,1.0,13.0,1.0,12.0,1.0,1.0]|9   |
|[2.0,11.0,2.0,13.0,2.0,10.0,2.0,12.0,2.0,1.0]|9   |
|[3.0,12.0,3.0,11.0,3.0,13.0,3.0,10.0,3.0,1.0]|9   |
|[4.0,10.0,4.0,11.0,4.0,1.0,4.0,13.0,4.0,12.0]|9   |
|[4.0,1.0,4.0,13.0,4.0,12.0,4.0,11.0,4.0,10.0]|9   |
|[1.0,2.0,1.0,4.0,1.0,5.0,1.0,3.0,1.0,6.0]    |8   |
|[1.0,9.0,1.0,12.0,1.0,10.0,1.0,11.0,1.0,13.0]|8   |
|[2.0,1.0,2.0,2.0,2.0,3.0,2.0,4.0,2.0,5.0]    |8   |
|[3.0,5.0,3.0,6.0,3.0,9.0,3.0,7.0,3.0,8.0]    |8   |
|[4.0,1.0,4.0,4.0,4.0,2.0,4.0,3.0,4.0,5.0]    |8   |
|[1.0,1.0,2.0,1.0,3.0,9.0,1.0,5.0,2.0,3.0]    |1   |
|[2.0,6.0,2.0,1.0,4.0,13.0,2.0,4.0,4.0,9.0]   |0   |
|[1.0,10.0,4.0,6.0,1.0,2.0,1.0,1.0,3.0,8.0]   |0   |
|[2.0,13.0,2.0,1.0,4.0,4.0,1.0,5.0,2.0,11.0]  |0   |
|[3.0,8.0,4.0,12.0,3.0,9.0,4.0,2.0,3.0,2.0]   |1   |
|[1.0,3.0,4.0,7.0,1.0,5.0,2.

In [30]:
from xgboost.spark import SparkXGBClassifier

xgb = SparkXGBClassifier (features_col='_c11', label_col='_c10')

In [32]:
model = xgb.fit (final)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(54, 0) finished unsuccessfully.
java.net.SocketException: Connection reset
	at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:313)
	at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:340)
	at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:789)
	at java.base/java.net.Socket$SocketInputStream.read(Socket.java:1025)
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:255)
	at java.base/java.io.BufferedInputStream.implRead(BufferedInputStream.java:289)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:276)
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:393)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:105)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:86)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:80)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:734)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)

	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:2158)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2978)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1019)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1018)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:76)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1589)


In [51]:
from catboost import CatBoostClassifier

catboost_model = CatBoostClassifier(iterations=10, learning_rate=0.1)

In [55]:
catboost_model.fit(input, output)

CatBoostError: Invalid data type={}: data must be list(), np.ndarray(), DataFrame(), Series(), FeaturesData  scipy.sparse matrix or filename str() or pathlib.Path().