In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('titanic').getOrCreate()

In [2]:
train = spark.read.csv('train.csv', header=True, inferSchema=True)

In [4]:
train.describe().toPandas()

Unnamed: 0,summary,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,count,891.0,891.0,891.0,891,891,714.0,891.0,891.0,891,891.0,204,889
1,mean,446.0,0.3838383838383838,2.308641975308642,,,29.69911764705882,0.5230078563411896,0.3815937149270482,260318.54916792738,32.2042079685746,,
2,stddev,257.3538420152301,0.4865924542648575,0.8360712409770491,,,14.526497332334037,1.1027434322934315,0.8060572211299488,471609.26868834975,49.69342859718089,,
3,min,1.0,0.0,1.0,"""Andersson, Mr. August Edvard (""""Wennerstrom"""")""",female,0.42,0.0,0.0,110152,0.0,A10,C
4,max,891.0,1.0,3.0,"van Melkebeke, Mr. Philemon",male,80.0,8.0,6.0,WE/P 5735,512.3292,T,S


In [5]:
train.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [6]:
test= spark.read.csv('test.csv',header=True, inferSchema=True)

In [7]:
test.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



los datos tienen nulls, pasamos de momento.  
Seleccionar columnas interesantes.

In [8]:
df=train.drop('Name')

In [9]:
df.describe().toPandas()

Unnamed: 0,summary,PassengerId,Survived,Pclass,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,count,891.0,891.0,891.0,891,714.0,891.0,891.0,891,891.0,204,889
1,mean,446.0,0.3838383838383838,2.308641975308642,,29.69911764705882,0.5230078563411896,0.3815937149270482,260318.54916792738,32.2042079685746,,
2,stddev,257.3538420152301,0.4865924542648575,0.8360712409770491,,14.526497332334037,1.1027434322934315,0.8060572211299488,471609.26868834975,49.69342859718089,,
3,min,1.0,0.0,1.0,female,0.42,0.0,0.0,110152,0.0,A10,C
4,max,891.0,1.0,3.0,male,80.0,8.0,6.0,WE/P 5735,512.3292,T,S


In [12]:
mycols=train.select(['Survived','Pclass','Sex','Age','SibSp', 'Parch', 'Fare', 'Embarked'])
mycols.count()

891

In [13]:
final_data=mycols.na.drop() #desechar nulls
final_data.count()

712

In [15]:
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, VectorIndexer, StringIndexer
#el encoder produce un conjunto de números a partir de string que se tienen que indexar
#Col1 A B C
#A   [1,0,0] string de bits, one hot encoder traduce sex en dos bits

In [17]:
gender_indexer=StringIndexer(inputCol='Sex', outputCol='SexIndex')

In [18]:
gender_encoder=OneHotEncoder(inputCol='SexIndex', outputCol='SexVec')

In [19]:
embarked_indexer=StringIndexer(inputCol='Embarked', outputCol='EmbarkedIndex')
embarked_encoder=OneHotEncoder(inputCol='EmbarkedIndex', outputCol='EmbarkedVec')

In [22]:
assembler=VectorAssembler(inputCols=['Pclass','SexVec','Age','SibSp', 'Parch', 'Fare', 'EmbarkedVec'],
                         outputCol='features')

In [24]:
#todo a la vez con pipeline
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol='features',labelCol='Survived')

In [26]:
from pyspark.ml import Pipeline
pipeline= Pipeline(stages=[gender_indexer,embarked_indexer,gender_encoder,embarked_encoder,assembler,lr])

In [29]:
model = pipeline.fit(final_data)

In [40]:
results = model.transform(test.na.drop())

In [45]:
results.select('prediction').show()

+----------+
|prediction|
+----------+
|       1.0|
|       1.0|
|       1.0|
|       1.0|
|       0.0|
|       1.0|
|       1.0|
|       0.0|
|       1.0|
|       1.0|
|       1.0|
|       0.0|
|       1.0|
|       1.0|
|       1.0|
|       1.0|
|       1.0|
|       1.0|
|       1.0|
|       1.0|
+----------+
only showing top 20 rows



In [47]:
train,tes=final_data.randomSplit([0.7,0.3])

In [48]:
results = model.transform(tes)

In [49]:
results.select('prediction','Survived').show()

+----------+--------+
|prediction|Survived|
+----------+--------+
|       1.0|       0|
|       1.0|       0|
|       0.0|       0|
|       1.0|       0|
|       1.0|       0|
|       1.0|       0|
|       0.0|       0|
|       0.0|       0|
|       0.0|       0|
|       1.0|       0|
|       0.0|       0|
|       0.0|       0|
|       0.0|       0|
|       0.0|       0|
|       0.0|       0|
|       0.0|       0|
|       0.0|       0|
|       0.0|       0|
|       0.0|       0|
|       0.0|       0|
+----------+--------+
only showing top 20 rows



In [50]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [53]:
evaluator= BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='Survived',metricName='areaUnderROC')

In [54]:
evalResult=evaluator.evaluate(results.select('Survived','prediction'))

In [55]:
evalResult

0.7474335351408266