In [1]:
#importing the classes and functions
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.mllib.tree import RandomForest
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel

In [2]:
#setting the data files for users and their friend list
training_data_path = 'data/poker-train.csv'
testing_data_path = 'data/poker-test.csv'
app_name = 'Poker Hand Prediction'
master = 'local'

In [3]:
#configuring the Spark and setting the master & app name
spark = SparkConf().setAppName(app_name).setMaster(master)
sc = SparkContext(conf=spark)

In [4]:
poker_train_data = sc.textFile(training_data_path).map(lambda x :[int(x) for x in x.split(',')]).map(lambda x: LabeledPoint(x[10],x[0:10]))
poker_test_data = sc.textFile(testing_data_path).map(lambda x :[int(x) for x in x.split(',')]).map(lambda x: LabeledPoint(x[10],x[0:10]))

In [5]:
error = []
with open('Random Forest.txt', 'w+') as f:
    f.write('Depth\tImpurity\tNum of Trees\tFeature Subset\tError')
    for depth in range(4,10):
        for num_trees in range(4,10,2):
            for impurity in ['gini','entropy']:
                for feature in ['auto', 'all', 'sqrt', 'log2', 'onethird']:
                    model = RandomForest.trainClassifier(poker_train_data, \
                                                         numClasses=10, \
                                                         categoricalFeaturesInfo={}, \
                                                         numTrees=num_trees, \
                                                         featureSubsetStrategy=feature, \
                                                         impurity=impurity, \
                                                         maxDepth=depth, \
                                                         maxBins=32)
                    predictions = model.predict(poker_test_data.map(lambda x: x.features))
                    labelsAndPredictions = poker_test_data.map(lambda lp: lp.label).zip(predictions)
                    testErr = labelsAndPredictions.filter(lambda lp: lp[0] != lp[1]).count()/float(poker_test_data.count())
                    error.append(testErr)
                    f.write(str(depth) + '\t'+ impurity + '\t\t' + str(num_trees) \
                            + '\t\t' + feature + '\t\t' + str(testErr))
print(min(error))

0.421794


In [7]:
error = []
with open('Gradient Boosting.txt', 'w+') as f:
    f.write('Iterations\Rate\tDepth\tLoss\tError')
    for iters in range(4,10):
        for rate in [0.1,0.3,0.5,0.7,0.9]:
            for depth in range(4,10):
                for loss in ['logLoss', 'leastSquaresError', 'leastAbsoluteError']:
                    model = GradientBoostedTrees.trainClassifier(poker_train_data, \
                                                                 categoricalFeaturesInfo={}, \
                                                                 loss=loss, \
                                                                 numIterations=iters, \
                                                                 learningRate=rate, \
                                                                 maxDepth=depth)
                    predictions = model.predict(poker_test_data.map(lambda x: x.features))
                    labelsAndPredictions = poker_test_data.map(lambda lp: lp.label).zip(predictions)
                    testErr = labelsAndPredictions.filter(lambda lp: lp[0] != lp[1]).count()/float(poker_test_data.count())
                    error.append(testErr)
                    f.write(str(iters) + '\t'+ str(rate) + '\t\t' + str(depth) \
                            + '\t\t' + loss + '\t\t' + str(testErr))
print(min(error))

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 46487.0 failed 1 times, most recent failure: Lost task 0.0 in stage 46487.0 (TID 46487, localhost, executor driver): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 15210577 ms
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:165)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor66.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
