In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
        .builder \
        .appName("Aula Interativa 2 - ML") \
        .getOrCreate()

spark.version

24/06/19 19:22:07 WARN Utils: Your hostname, MacBook-Air-de-Daniel-2.local resolves to a loopback address: 127.0.0.1; using 192.168.0.2 instead (on interface en0)
24/06/19 19:22:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/19 19:22:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


'3.5.1'

In [5]:
titanic_df = spark.read.csv('./titanic.csv', header='True', inferSchema='True')

titanic_df.printSchema()



root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [21]:
# demostrando o sex_indexer
from pyspark.ml.feature import StringIndexer, OneHotEncoder

sex_indexer = StringIndexer(inputCol='Sex', outputCol='SexIndexer')
sex_indexer.fit(titanic_df).transform(titanic_df).select("PassengerId", "Sex","SexIndexer").show(10)

+-----------+------+----------+
|PassengerId|   Sex|SexIndexer|
+-----------+------+----------+
|          1|  male|       0.0|
|          2|female|       1.0|
|          3|female|       1.0|
|          4|female|       1.0|
|          5|  male|       0.0|
|          6|  male|       0.0|
|          7|  male|       0.0|
|          8|  male|       0.0|
|          9|female|       1.0|
|         10|female|       1.0|
+-----------+------+----------+
only showing top 10 rows



In [22]:
# demostrando o sex_encoder

sex_encoder = OneHotEncoder(inputCol='SexIndexer', outputCol='SexVector')

sex_indexer_model = sex_indexer.fit(titanic_df).transform(titanic_df)

# https://stackoverflow.com/questions/42295001/how-to-interpret-results-of-spark-onehotencoder
# primeiro valor: tamanho do vetor
# segundo valor: índices dos valores que não são zero
# terceiro valor: valores que não são zero
sex_encoder.fit(sex_indexer_model).transform(sex_indexer_model).select("PassengerId", "Sex","SexVector").show(10)

+-----------+------+-------------+
|PassengerId|   Sex|    SexVector|
+-----------+------+-------------+
|          1|  male|(1,[0],[1.0])|
|          2|female|    (1,[],[])|
|          3|female|    (1,[],[])|
|          4|female|    (1,[],[])|
|          5|  male|(1,[0],[1.0])|
|          6|  male|(1,[0],[1.0])|
|          7|  male|(1,[0],[1.0])|
|          8|  male|(1,[0],[1.0])|
|          9|female|    (1,[],[])|
|         10|female|    (1,[],[])|
+-----------+------+-------------+
only showing top 10 rows



In [53]:
from pyspark.ml.feature import VectorAssembler

#assembler = VectorAssembler(inputCols=['SexVector','Age','Pclass', 'Fare'], outputCol='features')
#assembler = VectorAssembler(inputCols=['Age', 'Pclass', 'Fare', 'SexVector'], outputCol='features')
assembler = VectorAssembler(inputCols=['Age','SexVector'], outputCol='features')






In [56]:
from pyspark.ml.classification import DecisionTreeClassifier

classifier = DecisionTreeClassifier(labelCol='Survived', featuresCol='features')

In [55]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler, classifier])
#pipeline = Pipeline(stages=[assembler, classifier])


In [57]:
train_data, test_data = titanic_df.randomSplit([0.7, 0.3])

In [58]:
predictSurvivedModel = pipeline.fit(train_data)

In [59]:
mean_age = titanic_df.agg({'Age': 'mean'}).collect()[0][0]
mean_age

29.699117647058763

In [60]:
titanic_df = titanic_df.fillna(mean_age, subset=['Age'])

In [61]:
train_data, test_data = titanic_df.randomSplit([0.7, 0.3])
predictSurvivedModel = pipeline.fit(train_data)

In [62]:
predictions = predictSurvivedModel.transform(test_data)
predictions.select('passengerId', 'fare', 'pclass', 'age', 'sex', 'rawPrediction', 'prediction').show(50)



+-----------+-------+------+-----------------+------+-------------+----------+
|passengerId|   fare|pclass|              age|   sex|rawPrediction|prediction|
+-----------+-------+------+-----------------+------+-------------+----------+
|          2|71.2833|     1|             38.0|female| [45.0,138.0]|       1.0|
|          4|   53.1|     1|             35.0|female| [45.0,138.0]|       1.0|
|          7|51.8625|     1|             54.0|  male| [322.0,56.0]|       0.0|
|          8| 21.075|     3|              2.0|  male|   [8.0,11.0]|       1.0|
|         13|   8.05|     3|             20.0|  male| [322.0,56.0]|       0.0|
|         19|   18.0|     3|             31.0|female| [45.0,138.0]|       1.0|
|         27|  7.225|     3|29.69911764705882|  male| [322.0,56.0]|       0.0|
|         31|27.7208|     1|             40.0|  male| [322.0,56.0]|       0.0|
|         39|   18.0|     3|             18.0|female| [45.0,138.0]|       1.0|
|         41|  9.475|     3|             40.0|female

In [63]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol='Survived', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)

accuracy



0.7482758620689656

In [64]:
decisionTreeModel = predictSurvivedModel.stages[-1]

In [65]:
decisionTreeModel.depth

3

In [66]:
decisionTreeModel.toDebugString

'DecisionTreeClassificationModel: uid=DecisionTreeClassifier_6cefbdf254fa, depth=3, numNodes=9, numClasses=2, numFeatures=2\n  If (feature 1 in {1.0})\n   If (feature 0 <= 7.5)\n    Predict: 1.0\n   Else (feature 0 > 7.5)\n    Predict: 0.0\n  Else (feature 1 not in {1.0})\n   If (feature 0 <= 48.5)\n    If (feature 0 <= 44.5)\n     Predict: 1.0\n    Else (feature 0 > 44.5)\n     Predict: 0.0\n   Else (feature 0 > 48.5)\n    Predict: 1.0\n'

In [67]:
assembler.getInputCols()


['Age', 'SexVector']

In [68]:
decisionTreeModel.featureImportances

SparseVector(2, {0: 0.089, 1: 0.911})

In [75]:

list(zip(assembler.getInputCols(), decisionTreeModel.featureImportances))

[('Age', 0.08904209757149073), ('SexVector', 0.9109579024285093)]

In [71]:
assembler.getInputCols()

['Age', 'SexVector']

In [76]:
basePath = "./titanic.csv"
decisionTreeModel.write().overwrite().save(basePath + "/model2")

                                                                                