In [1]:
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel,
                                           RandomForestClassifier}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder, CrossValidatorModel}
import org.apache.spark.ml.feature.{VectorAssembler, Imputer, OneHotEncoderEstimator, 
                                    StringIndexer, StandardScaler}
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.sql.functions.{sum, col, round, regexp_extract, lit, _}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.SaveMode

## Считывание данных

In [2]:
val trainDf = spark
    .read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("../data/train.csv")
val testDf = spark
    .read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("../data/test.csv")

trainDf = [PassengerId: int, Survived: int ... 10 more fields]
testDf = [PassengerId: int, Pclass: int ... 9 more fields]


[PassengerId: int, Pclass: int ... 9 more fields]

## Обзор данных

In [3]:
def viewMisData(df:DataFrame) {
    println("Missing data:")
    df.select(df.columns.map(c => sum(col(c).isNull.cast("int")).alias(c)): _*).show()
}

def viewDf(df:DataFrame) {
    println("Size = " + df.count)
    df.printSchema()
    df.show()
    viewMisData(df)
}

viewMisData: (df: org.apache.spark.sql.DataFrame)Unit
viewDf: (df: org.apache.spark.sql.DataFrame)Unit


Объединим два _dataframe_ для дальнейшей обработки данных

In [4]:
val df = trainDf.union(testDf.withColumn("Survived", lit(null: String))
                     .select(trainDf.columns.head, trainDf.columns.tail: _*))
viewDf(df)

Size = 1309
root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|   

df = [PassengerId: int, Survived: int ... 10 more fields]


[PassengerId: int, Survived: int ... 10 more fields]

Видно, что в выборке имеются пропущенные данные, причем признака _Cabin_ существенно не хватает, поэтому в дальнейшем иключим его из выборок. Признак _Fare_ являются числовым, и пропущенных значений в этом столбце немного, поэтому их можно заменить, например, на среднее значение. Признак _Embarked_ строковый, и поскольку пропущенных значений немного, то заполним их часто встречающимся значением. Признак _Age_ является числовым, но замена пропущенных значений на среднее значение здесь не подойдет, его обработку рассмотрим в дальнейшем.

## Обработка данных

#### Заполенение пропущенных значений

In [5]:
var processedDf = df.drop("Cabin")

processedDf = [PassengerId: int, Survived: int ... 9 more fields]


[PassengerId: int, Survived: int ... 9 more fields]

In [6]:
val mostEmbarked = df.groupBy("Embarked")
                     .count()
                     .orderBy(desc("count"))
                     .first()
                     .getAs[String](0)
processedDf = processedDf.na.fill(mostEmbarked, Seq("Embarked"))

mostEmbarked = S
processedDf = [PassengerId: int, Survived: int ... 9 more fields]


[PassengerId: int, Survived: int ... 9 more fields]

In [7]:
val fareImputer = new Imputer()
  .setInputCols(Array("Fare"))
  .setOutputCols(Array("Fare"))
  .setStrategy("mean")
processedDf = fareImputer.fit(processedDf).transform(processedDf)
processedDf = processedDf.withColumn("Fare", round(col("Fare"), 4))

fareImputer = imputer_227fa927b11f
processedDf = [PassengerId: int, Survived: int ... 9 more fields]
processedDf = [PassengerId: int, Survived: int ... 9 more fields]


[PassengerId: int, Survived: int ... 9 more fields]

In [8]:
viewMisData(processedDf)

Missing data:
+-----------+--------+------+----+---+---+-----+-----+------+----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+--------+
|          0|     418|     0|   0|  0|263|    0|    0|     0|   0|       0|
+-----------+--------+------+----+---+---+-----+-----+------+----+--------+



Можно заметить, что в поле имя присустсвует аббревиатура (_Mr_ , _Miss_ и т.д.), которая может помочь заполнить пропущенные значения в столбце _Age_. Но данные значения нужно нормализовать

In [9]:
processedDf = processedDf.withColumn("Title",regexp_extract($"Name","([A-Za-z]+)\\.",1))
processedDf.select("Title").distinct().show()

+--------+
|   Title|
+--------+
|    Dona|
|     Don|
|    Miss|
|Countess|
|     Col|
|     Rev|
|    Lady|
|  Master|
|     Mme|
|    Capt|
|      Mr|
|      Dr|
|     Mrs|
|     Sir|
|Jonkheer|
|    Mlle|
|   Major|
|      Ms|
+--------+



processedDf = [PassengerId: int, Survived: int ... 10 more fields]


[PassengerId: int, Survived: int ... 10 more fields]

In [10]:
val mapTitle = Map("Dona" -> "Royalty", "Don" -> "Royalty", 
                   "Miss" -> "Miss", "Countess" -> "Royalty", 
                   "Col" -> "Officer", "Rev" -> "Officer", 
                   "Lady"-> "Royalty", "Master" -> "Master", 
                   "Mme" -> "Mrs", "Capt" -> "Officer", 
                   "Mr" -> "Mr", "Dr" -> "Officer", 
                   "Mrs" -> "Mrs", "Sir" -> "Royalty", 
                   "Jonkheer" -> "Royalty", "Mlle" -> "Miss", 
                   "Major" -> "Officer", "Ms" -> "Mrs")
val modifyTitle: (String => String) = (oldTitle: String) => mapTitle(oldTitle)
val modifyTitleUDF = udf(modifyTitle)

processedDf = processedDf.withColumn("Title", modifyTitleUDF($"Title"))
processedDf.select("Title").distinct().show()

+-------+
|  Title|
+-------+
|   Miss|
|Officer|
|Royalty|
| Master|
|     Mr|
|    Mrs|
+-------+



mapTitle = Map(Master -> Master, Countess -> Royalty, Capt -> Officer, Mr -> Mr, Dr -> Officer, Don -> Royalty, Rev -> Officer, Lady -> Royalty, Mrs -> Mrs, Miss -> Miss, Mlle -> Miss, Major -> Officer, Col -> Officer, Dona -> Royalty, Mme -> Mrs, Sir -> Royalty, Jonkheer -> Royalty, Ms -> Mrs)
modifyTitle = > String = <function1>
modifyTitleUDF = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
processedDf = [PassengerId: int, Survived: int ... 10 more fields]


[PassengerId: int, Survived: int ... 10 more fields]

In [11]:
val titleToAgeMap = scala.collection.mutable.Map[String, Double]()
val titleToAgeArr = processedDf.groupBy("Title").avg("Age").collect()
for (row <- titleToAgeArr) {
    titleToAgeMap += 
    (row.getAs[String](0) -> 
     BigDecimal(row.getAs[Double](1)).setScale(0, BigDecimal.RoundingMode.HALF_UP).toDouble)
}
println(titleToAgeMap)

Map(Officer -> 46.0, Royalty -> 41.0, Mr -> 32.0, Mrs -> 37.0, Miss -> 22.0, Master -> 5.0)


titleToAgeMap = Map(Officer -> 46.0, Royalty -> 41.0, Mr -> 32.0, Mrs -> 37.0, Miss -> 22.0, Master -> 5.0)
titleToAgeArr = Array([Miss,21.795235849056603], [Officer,46.27272727272727], [Royalty,41.166666666666664], [Master,5.482641509433963], [Mr,32.25215146299484], [Mrs,36.866279069767444])


Array([Miss,21.795235849056603], [Officer,46.27272727272727], [Royalty,41.166666666666664], [Master,5.482641509433963], [Mr,32.25215146299484], [Mrs,36.866279069767444])

In [12]:
val fillAge = (title: String) => titleToAgeMap(title)
val fillAgeUDF = udf(fillAge)
processedDf = processedDf.withColumn("Age", when($"Age".isNull, fillAgeUDF($"Title")).otherwise($"Age"))
viewMisData(processedDf)

Missing data:
+-----------+--------+------+----+---+---+-----+-----+------+----+--------+-----+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Embarked|Title|
+-----------+--------+------+----+---+---+-----+-----+------+----+--------+-----+
|          0|     418|     0|   0|  0|  0|    0|    0|     0|   0|       0|    0|
+-----------+--------+------+----+---+---+-----+-----+------+----+--------+-----+



fillAge = > Double = <function1>
fillAgeUDF = UserDefinedFunction(<function1>,DoubleType,Some(List(StringType)))
processedDf = [PassengerId: int, Survived: int ... 10 more fields]


[PassengerId: int, Survived: int ... 10 more fields]

Возможно на выживаемость влиял и размер семьи у пассажира, поэтому создадим новый признак _FamilySize_

In [13]:
processedDf = processedDf.withColumn("FamilySize", $"SibSp" + $"Parch" + 1)
processedDf.select("PassengerId", "FamilySize").show()

+-----------+----------+
|PassengerId|FamilySize|
+-----------+----------+
|          1|         2|
|          2|         2|
|          3|         1|
|          4|         2|
|          5|         1|
|          6|         1|
|          7|         1|
|          8|         5|
|          9|         3|
|         10|         2|
|         11|         3|
|         12|         1|
|         13|         1|
|         14|         7|
|         15|         1|
|         16|         1|
|         17|         6|
|         18|         1|
|         19|         2|
|         20|         1|
+-----------+----------+
only showing top 20 rows



processedDf = [PassengerId: int, Survived: int ... 11 more fields]


[PassengerId: int, Survived: int ... 11 more fields]

Делим _dataframe_ на два исходных

In [14]:
val processedTrainDf = processedDf.filter($"Survived".isNotNull)
val processedTestDf = processedDf.filter($"Survived".isNull).drop("Survived")

processedTrainDf = [PassengerId: int, Survived: int ... 11 more fields]
processedTestDf = [PassengerId: int, Pclass: int ... 10 more fields]


[PassengerId: int, Pclass: int ... 10 more fields]

## Создание модели

In [15]:
val Array(train, test) = processedTrainDf.randomSplit(Array(0.7, 0.3))

train = [PassengerId: int, Survived: int ... 11 more fields]
test = [PassengerId: int, Survived: int ... 11 more fields]


[PassengerId: int, Survived: int ... 11 more fields]

In [16]:
val assemblerForScaling = new VectorAssembler()
    .setInputCols(Array("Age", "Fare", "Parch", "SibSp", "FamilySize"))
    .setOutputCol("NeedScaling")

val scaler = new StandardScaler()
    .setInputCol("NeedScaling")
    .setOutputCol("ScaledFeatures")
    .setWithStd(true)
    .setWithMean(false)

val sexIndexer = new StringIndexer()
    .setInputCol("Sex")
    .setOutputCol("SexIndex")

val embarkedIndexer = new StringIndexer()
    .setInputCol("Embarked")
    .setOutputCol("EmbarkedIndex")

val titleIndexer = new StringIndexer()
    .setInputCol("Title")
    .setOutputCol("TitleIndex")

val encoder = new OneHotEncoderEstimator()
    .setInputCols(Array("Pclass", "SexIndex", "EmbarkedIndex", "TitleIndex"))
    .setOutputCols(Array("PclassOHE", "SexOHE", "EmbarkedOHE", "TitleIndexOHE"))
    .setHandleInvalid("keep")

val finalAssembler = new VectorAssembler()
    .setInputCols(Array("ScaledFeatures", "PclassOHE", "SexOHE", "EmbarkedOHE", "TitleIndexOHE"))
    .setOutputCol("features")

val eval = new BinaryClassificationEvaluator()
    .setLabelCol("Survived")

val preProcessStages = Array(assemblerForScaling, scaler, sexIndexer, 
                             embarkedIndexer, titleIndexer ,encoder, finalAssembler)

assemblerForScaling = vecAssembler_25944073dd21
scaler = stdScal_c517ae49c4be
sexIndexer = strIdx_998b80382d0b
embarkedIndexer = strIdx_ec72a2570305
titleIndexer = strIdx_f0827ae1cd0d
encoder = oneHotEncoder_ed127930034f
finalAssembler = vecAssembler_968c12202e0e
eval = binEval_e76f79e3ffc0


preProcessStages: Array[org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: or...


binEval_e76f79e3ffc0

#### Logistic Regression

In [17]:
val lr = new LogisticRegression()
    .setFeaturesCol("features")
    .setLabelCol("Survived")

val lrParamGrid = new ParamGridBuilder()
    .addGrid(lr.regParam, Array(1e-2, 5e-3, 1e-3, 5e-4, 1e-4))
    .build()

val lrCV = new CrossValidator()
    .setEstimator(new Pipeline().setStages(preProcessStages ++ Array(lr)))
    .setEvaluator(eval)
    .setEstimatorParamMaps(lrParamGrid)
    .setNumFolds(5)

val lrModelCV = lrCV.fit(train)

lr = logreg_db629369bdd2
lrParamGrid = 
lrCV = cv_70bee67a6938
lrModelCV = cv_70bee67a6938


Array({
	logreg_db629369bdd2-regParam: 0.01
}, {
	logreg_db629369bdd2-regParam: 0.005
}, {
	logreg_db629369bdd2-regParam: 0.001
}, {
	logreg_db629369bdd2-regParam: 5.0E-4
}, {
	logreg_db629369bdd2-regParam: 1.0E-4
})


cv_70bee67a6938

#### Random Forest

In [18]:
val rf = new RandomForestClassifier()
    .setFeaturesCol("features")
    .setLabelCol("Survived")

val rfParamGrid = new ParamGridBuilder()
    .addGrid(rf.impurity, Array("gini", "entropy"))
    .addGrid(rf.maxDepth, Array(1, 2, 5, 10, 15))
    .addGrid(rf.minInstancesPerNode, Array(1, 2, 4, 5, 10))
    .build()

val rfCV = new CrossValidator()
    .setEstimator(new Pipeline().setStages(preProcessStages ++ Array(rf)))
    .setEvaluator(eval)
    .setEstimatorParamMaps(rfParamGrid)
    .setNumFolds(5)

val rfModelCV = rfCV.fit(train)

rf = rfc_9e3a9759082c
rfParamGrid = 


Array({
	rfc_9e3a9759082c-impurity: gini,
	rfc_9e3a9759082c-maxDepth: 1,
	rfc_9e3a9759082c-minInstancesPerNode: 1
}, {
	rfc_9e3a9759082c-impurity: gini,
	rfc_9e3a9759082c-maxDepth: 2,
	rfc_9e3a9759082c-minInstancesPerNode: 1
}, {
	rfc_9e3a9759082c-impurity: gini,
	rfc_9e3a9759082c-maxDepth: 5,
	rfc_9e3a9759082c-minInstancesPerNode: 1
}, {
	rfc_9e3a9759082c-impurity: gini,
	rfc_9e3a9759082c-maxDepth: 10,
	rfc_9e3a9759082c-minInstancesPerNode: 1
}, {
	rfc_9e3a9759082c-impurity: gini,
	rfc_9e3a9759082c-maxDepth: 15,
	rfc_9e3a9759082c-minInstancesPerNode: 1
}, {
	rfc_9e3a9759082c-impurity: entropy,
	rfc_9e3a9759082c-maxDepth: 1,
	rfc_9e3a9759082c-minInsta...


## Результаты модели

In [19]:
def modelResults(model: CrossValidatorModel) {
    println("cross-validated areaUnderROC: " + model.avgMetrics.max)
    println("test areaUnderROC: " + eval.evaluate(model.transform(test)))
}

modelResults: (model: org.apache.spark.ml.tuning.CrossValidatorModel)Unit


In [20]:
println("Logistic Regression: ")
modelResults(lrModelCV)
println
println("Random Forest: ")
modelResults(rfModelCV)

Logistic Regression: 
cross-validated areaUnderROC: 0.857472760941634
test areaUnderROC: 0.8839886845827439

Random Forest: 
cross-validated areaUnderROC: 0.8727047995451697
test areaUnderROC: 0.8789250353606783


## Сохранение результатов

In [21]:
def saveResultForSubmit(model: CrossValidatorModel, df: DataFrame, fileName: String) {
    val scoredDf = model.transform(processedTestDf)
    val outputDf = scoredDf.select("PassengerId", "prediction")
    val castedDf = outputDf.select(outputDf("PassengerId"), outputDf("prediction").cast(IntegerType).as("Survived"))                                      
    castedDf.write.format("csv").option("header", "true").save(f"../data/$fileName%s.csv")
}

saveResultForSubmit: (model: org.apache.spark.ml.tuning.CrossValidatorModel, df: org.apache.spark.sql.DataFrame, fileName: String)Unit


In [22]:
saveResultForSubmit(lrModelCV, processedTestDf, "lr")
saveResultForSubmit(rfModelCV, processedTestDf, "rf")