In [33]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import VectorAssembler
from sklearn.metrics import confusion_matrix
from pyspark.sql.functions import when
import pandas as pd 

In [34]:
config = SparkConf()
spark = SparkSession.builder.master("local").appName("test").config(conf=config).getOrCreate()

In [35]:
# Preparing the train
train = spark.read.option("header", "true").csv(r"data\train.csv")
# filtre et supprime les donn√©es "NA"
train = train.filter(train.Age != "NA")
train = train.withColumn("Gender", when(train.Sex == "male","1").when(train.Sex == "female","2"))
# Supprime les colonnes inutiles
cols = ('SibSp', 'Parch', 'Fare', 'Ticket' ,'Cabin', 'Embarked', 'Name', 'Sex')
train = train.drop(*cols)

train.show(10)

+-----------+--------+------+---+------+
|PassengerId|Survived|Pclass|Age|Gender|
+-----------+--------+------+---+------+
|          1|       0|     3| 22|     1|
|          2|       1|     1| 38|     2|
|          3|       1|     3| 26|     2|
|          4|       1|     1| 35|     2|
|          5|       0|     3| 35|     1|
|          7|       0|     1| 54|     1|
|          8|       0|     3|  2|     1|
|          9|       1|     3| 27|     2|
|         10|       1|     2| 14|     2|
|         11|       1|     3|  4|     2|
+-----------+--------+------+---+------+
only showing top 10 rows



In [36]:
from pyspark.sql.types import IntegerType
from pyspark.sql.types import FloatType

train = train.withColumn("PassengerId", train["PassengerId"].cast('float'))
train = train.withColumn("Survived" ,train["Survived"].cast('float'))
train = train.withColumn("Pclass" ,train["Pclass"].cast('float'))
train = train.withColumn("Age" ,train["Age"].cast(('float')))
train = train.withColumn("Gender" ,train["Gender"].cast(('float')))
train.printSchema()
features = ['Pclass','Age', 'Gender']
va = VectorAssembler(inputCols = features, outputCol='features')
va_df = va.transform(train)
va_df.show(3)

root
 |-- PassengerId: float (nullable = true)
 |-- Survived: float (nullable = true)
 |-- Pclass: float (nullable = true)
 |-- Age: float (nullable = true)
 |-- Gender: float (nullable = true)

+-----------+--------+------+----+------+--------------+
|PassengerId|Survived|Pclass| Age|Gender|      features|
+-----------+--------+------+----+------+--------------+
|        1.0|     0.0|   3.0|22.0|   1.0|[3.0,22.0,1.0]|
|        2.0|     1.0|   1.0|38.0|   2.0|[1.0,38.0,2.0]|
|        3.0|     1.0|   3.0|26.0|   2.0|[3.0,26.0,2.0]|
+-----------+--------+------+----+------+--------------+
only showing top 3 rows



In [37]:
(train_, test) = va_df.randomSplit([0.8, 0.2])

In [38]:
from pyspark.ml.feature import StringIndexer

dtc = DecisionTreeClassifier(featuresCol="features", labelCol="Survived")

#indexer = StringIndexer().setInputCol("Survived").setOutputCol("label_idx").fit(train_)

prediction = dtc.fit(train_) #dtc.setLabelCol("label_idx").fit(dtc)

prediction

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_a9841c9469ba, depth=5, numNodes=37, numClasses=2, numFeatures=3

In [39]:
pred = prediction.transform(test)
pred = pred.withColumnRenamed('Survived', 'label')
pred.show(3)

+-----------+-----+------+----+------+--------------+-------------+--------------------+----------+
|PassengerId|label|Pclass| Age|Gender|      features|rawPrediction|         probability|prediction|
+-----------+-----+------+----+------+--------------+-------------+--------------------+----------+
|        2.0|  1.0|   1.0|38.0|   2.0|[1.0,38.0,2.0]|   [3.0,70.0]|[0.04109589041095...|       1.0|
|        9.0|  1.0|   3.0|27.0|   2.0|[3.0,27.0,2.0]|  [29.0,22.0]|[0.56862745098039...|       0.0|
|       14.0|  0.0|   3.0|39.0|   1.0|[3.0,39.0,1.0]| [233.0,31.0]|[0.88257575757575...|       0.0|
+-----------+-----+------+----+------+--------------+-------------+--------------------+----------+
only showing top 3 rows



In [40]:
evaluator=MulticlassClassificationEvaluator(predictionCol="prediction")
acc = evaluator.evaluate(pred)

print("Prediction Accuracy: ", acc*100)

y_pred=pred.select("prediction").collect()
y_orig=pred.select("label").collect()

confusion_M = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(confusion_M)

Prediction Accuracy:  77.35042735042735
Confusion Matrix:
[[85  8]
 [26 37]]


In [41]:
spark.stop()