In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Challenge').getOrCreate()

In [3]:
data = spark.read.csv('train.csv', header=True, inferSchema=True)

data.show(4)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
only showing top 4 rows



In [4]:
# """ Select the Columns to use """
selected_data = data[['Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Embarked', 'Survived']]


#""" Drop the na columns """
clean_data = selected_data.na.drop()


# """ Encode the String Columns """
from pyspark.ml.feature import StringIndexer
indexer1 = StringIndexer(inputCol="Sex", outputCol="idx_Sex")
indexer2 = StringIndexer(inputCol="Embarked", outputCol="idx_Embarked")


processed_data = indexer1.fit(clean_data).transform(clean_data)
processed_data = indexer2.fit(processed_data).transform(processed_data)

processed_data.show(5)

+------+------+----+-----+-----+--------+--------+-------+------------+
|Pclass|   Sex| Age|SibSp|Parch|Embarked|Survived|idx_Sex|idx_Embarked|
+------+------+----+-----+-----+--------+--------+-------+------------+
|     3|  male|22.0|    1|    0|       S|       0|    0.0|         0.0|
|     1|female|38.0|    1|    0|       C|       1|    1.0|         1.0|
|     3|female|26.0|    0|    0|       S|       1|    1.0|         0.0|
|     1|female|35.0|    1|    0|       S|       1|    1.0|         0.0|
|     3|  male|35.0|    0|    0|       S|       0|    0.0|         0.0|
+------+------+----+-----+-----+--------+--------+-------+------------+
only showing top 5 rows



In [5]:
processed_data.printSchema()

root
 |-- Pclass: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- idx_Sex: double (nullable = false)
 |-- idx_Embarked: double (nullable = false)



In [6]:
processed_features = ['Pclass', 'Age', 'SibSp', 'Parch', 'idx_Sex', 'idx_Embarked']
processed_data2 = processed_data[processed_features]
processed_data2.show(10)
# processed_data2.printSchema()

+------+----+-----+-----+-------+------------+
|Pclass| Age|SibSp|Parch|idx_Sex|idx_Embarked|
+------+----+-----+-----+-------+------------+
|     3|22.0|    1|    0|    0.0|         0.0|
|     1|38.0|    1|    0|    1.0|         1.0|
|     3|26.0|    0|    0|    1.0|         0.0|
|     1|35.0|    1|    0|    1.0|         0.0|
|     3|35.0|    0|    0|    0.0|         0.0|
|     1|54.0|    0|    0|    0.0|         0.0|
|     3| 2.0|    3|    1|    0.0|         0.0|
|     3|27.0|    0|    2|    1.0|         0.0|
|     2|14.0|    1|    0|    1.0|         1.0|
|     3| 4.0|    1|    1|    1.0|         0.0|
+------+----+-----+-----+-------+------------+
only showing top 10 rows



In [33]:
""" Split it into features and target """
processed_features = ['Pclass', 'Age', 'SibSp', 'Parch', 'idx_Sex', 'idx_Embarked']
target_column   = ['Survived']


from pyspark.ml.feature import VectorAssembler 

vectorizer = VectorAssembler(inputCols=processed_features, outputCol='features')
processed_data3 = vectorizer.transform(processed_data)
processed_data3.show(10)

<class 'list'>
<class 'pyspark.ml.feature.VectorAssembler'>
+------+------+----+-----+-----+--------+--------+-------+------------+--------------------+
|Pclass|   Sex| Age|SibSp|Parch|Embarked|Survived|idx_Sex|idx_Embarked|            features|
+------+------+----+-----+-----+--------+--------+-------+------------+--------------------+
|     3|  male|22.0|    1|    0|       S|       0|    0.0|         0.0|[3.0,22.0,1.0,0.0...|
|     1|female|38.0|    1|    0|       C|       1|    1.0|         1.0|[1.0,38.0,1.0,0.0...|
|     3|female|26.0|    0|    0|       S|       1|    1.0|         0.0|[3.0,26.0,0.0,0.0...|
|     1|female|35.0|    1|    0|       S|       1|    1.0|         0.0|[1.0,35.0,1.0,0.0...|
|     3|  male|35.0|    0|    0|       S|       0|    0.0|         0.0|(6,[0,1],[3.0,35.0])|
|     1|  male|54.0|    0|    0|       S|       0|    0.0|         0.0|(6,[0,1],[1.0,54.0])|
|     3|  male| 2.0|    3|    1|       S|       0|    0.0|         0.0|[3.0,2.0,3.0,1.0,...|
|     3|fe

In [8]:
from pyspark.ml.classification import RandomForestClassifier

random_forest_clf = RandomForestClassifier(featuresCol='features', labelCol='Survived')



In [9]:
df_train, df_test = processed_data3.randomSplit([0.8, 0.2], seed=0)
df_test.show(5)

+------+------+----+-----+-----+--------+--------+-------+------------+--------------------+
|Pclass|   Sex| Age|SibSp|Parch|Embarked|Survived|idx_Sex|idx_Embarked|            features|
+------+------+----+-----+-----+--------+--------+-------+------------+--------------------+
|     1|female|23.0|    1|    0|       C|       1|    1.0|         1.0|[1.0,23.0,1.0,0.0...|
|     1|female|23.0|    3|    2|       S|       1|    1.0|         0.0|[1.0,23.0,3.0,2.0...|
|     1|female|26.0|    0|    0|       S|       1|    1.0|         0.0|[1.0,26.0,0.0,0.0...|
|     1|female|29.0|    0|    0|       S|       1|    1.0|         0.0|[1.0,29.0,0.0,0.0...|
|     1|female|30.0|    0|    0|       C|       1|    1.0|         1.0|[1.0,30.0,0.0,0.0...|
+------+------+----+-----+-----+--------+--------+-------+------------+--------------------+
only showing top 5 rows



In [52]:
random_forest_training = random_forest_clf.fit(df_train)
type(random_forest_training)

pyspark.ml.classification.RandomForestClassificationModel

In [50]:
# random_forest_training.save(".trained_model")

In [51]:
random_forest_test = random_forest_training.transform(df_test)
random_forest_test.show(5)

+------+------+----+-----+-----+--------+--------+-------+------------+--------------------+--------------------+--------------------+----------+
|Pclass|   Sex| Age|SibSp|Parch|Embarked|Survived|idx_Sex|idx_Embarked|            features|       rawPrediction|         probability|prediction|
+------+------+----+-----+-----+--------+--------+-------+------------+--------------------+--------------------+--------------------+----------+
|     1|female|23.0|    1|    0|       C|       1|    1.0|         1.0|[1.0,23.0,1.0,0.0...|[0.68874833043499...|[0.03443741652174...|       1.0|
|     1|female|23.0|    3|    2|       S|       1|    1.0|         0.0|[1.0,23.0,3.0,2.0...|[2.95433229187442...|[0.14771661459372...|       1.0|
|     1|female|26.0|    0|    0|       S|       1|    1.0|         0.0|[1.0,26.0,0.0,0.0...|[3.02090570558017...|[0.15104528527900...|       1.0|
|     1|female|29.0|    0|    0|       S|       1|    1.0|         0.0|[1.0,29.0,0.0,0.0...|[1.24146581486979...|[0.06207329

In [15]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator 

In [16]:
criterion = MulticlassClassificationEvaluator(labelCol='Survived', predictionCol='prediction')

In [17]:
accuracy = criterion.evaluate(random_forest_test)
accuracy

0.7707104285012975

In [None]:
['Pclass', 'Age', 'SibSp', 'Parch', 'idx_Sex', 'idx_Embarked']



In [48]:
# import numpy as np 

# def preprocess(Pclass=3, Sex=1, Age=22, SibSp=1, Parch=0, Embarked=1):

#     from pyspark.ml.feature import VectorAssembler 
    
#     features = [[Pclass, Sex, Age, SibSp, Parch, Embarked]]
#     df_sp = spark.createDataFrame(features)
#     df_sp.show()

#     vectorizer = VectorAssembler(inputCols=df_sp, outputCol='features')
#     processed_data3 = vectorizer.transform(processed_data)
#     processed_data3.show()


#     return


# preprocess()

Py4JJavaError: An error occurred while calling o564.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 37.0 failed 1 times, most recent failure: Lost task 0.0 in stage 37.0 (TID 34) (host.docker.internal executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:189)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:164)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:705)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:749)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:673)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:639)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:615)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:572)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:530)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:176)
	... 29 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:506)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:459)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:288)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:327)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
	at java.base/java.lang.reflect.Method.invoke(Method.java:577)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:189)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:164)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:705)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:749)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:673)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:639)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:615)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:572)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:530)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:176)
	... 29 more
