In [1]:
import org.apache.spark.ml.feature.OneHotEncoderEstimator
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature.{IndexToString, VectorIndexer}
import org.apache.spark.ml.PipelineStage
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

In [2]:
val df2 = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("data/test.csv")

df2.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



df2 = [PassengerId: int, Pclass: int ... 9 more fields]


[PassengerId: int, Pclass: int ... 9 more fields]

In [3]:
val df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("data/train.csv")

df = [PassengerId: int, Survived: int ... 10 more fields]


[PassengerId: int, Survived: int ... 10 more fields]

In [4]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [5]:
df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [25]:
import org.apache.spark.sql.functions._

val agemeanValue = df.agg(mean(df("Age"))).first.getDouble(0)

val faremeanValue = df.agg(mean(df("Fare"))).first.getDouble(0)

val fixedDf = df.na.fill(agemeanValue, Array("Age"))

val reFixed = fixedDf.na.fill(faremeanValue, Array("Fare"))


val new1 = df2.agg(mean(df2("Age"))).first.getDouble(0)

val new2 = df2.agg(mean(df2("Fare"))).first.getDouble(0)

val fixedDf2 = df2.na.fill(agemeanValue, Array("Age"))

val test = fixedDf2.na.fill(faremeanValue, Array("Fare"))


agemeanValue = 29.69911764705882
faremeanValue = 32.2042079685746
fixedDf = [PassengerId: int, Survived: int ... 10 more fields]
reFixed = [PassengerId: int, Survived: int ... 10 more fields]
new1 = 30.272590361445783
new2 = 35.6271884892086
fixedDf2 = [PassengerId: int, Pclass: int ... 9 more fields]
test = [PassengerId: int, Pclass: int ... 9 more fields]


[PassengerId: int, Pclass: int ... 9 more fields]

In [28]:
val Array(recvTrainData, recvTestData) = reFixed.randomSplit(Array(0.8, 0.2))

val trainingData = reFixed.withColumnRenamed("Survived", "label")

val testData = recvTestData.withColumnRenamed("Survived", "label")

recvTrainData = [PassengerId: int, Survived: int ... 10 more fields]
recvTestData = [PassengerId: int, Survived: int ... 10 more fields]
trainingData = [PassengerId: int, label: int ... 10 more fields]
testData = [PassengerId: int, label: int ... 10 more fields]


[PassengerId: int, label: int ... 10 more fields]

In [50]:
val cols = Array("Sex", "Pclass")
val stringIndexers = cols.map { eachCol =>
  new StringIndexer(eachCol)
    .setInputCol(eachCol)
    .setOutputCol(eachCol + "_Indexed")
    .fit(trainingData)
}

trainingData.show(5)

+-----------+-----+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|label|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+-----+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|    0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|    1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|    1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|    1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|    0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+-----+------+--------------------+------+----+-----+-----+----------------+-------+-----+--

cols = Array(Sex, Pclass)
stringIndexers = Array(Sex, Pclass)


Array(Sex, Pclass)

In [51]:
val cols = Array("Fare", "Age", "SibSp", "Parch", "Pclass_Indexed", "Sex_Indexed")
val assembler = new VectorAssembler()
  .setInputCols(cols)
  .setOutputCol("Features")

cols = Array(Fare, Age, SibSp, Parch, Pclass_Indexed, Sex_Indexed)
assembler = vecAssembler_37d0ab86385f


vecAssembler_37d0ab86385f

In [52]:
val randomForest = new RandomForestClassifier()
  .setLabelCol("label")
  .setFeaturesCol("Features")

randomForest = rfc_c9a230751e1c


rfc_c9a230751e1c

In [53]:
val pipeline = new Pipeline().setStages((stringIndexers :+ assembler :+ randomForest ).toArray)

pipeline = pipeline_99ef5b0359b9


pipeline_99ef5b0359b9

In [54]:
val paramGrid = new ParamGridBuilder()
      .addGrid(randomForest.maxBins, Array(24, 29, 31))
      .addGrid(randomForest.maxDepth, Array(4, 6, 8))
      .addGrid(randomForest.impurity, Array("entropy", "gini"))
      .build()

val evaluator = new BinaryClassificationEvaluator()
        .setLabelCol("label")

val cv = new CrossValidator()
      .setEstimator(pipeline)
      .setEvaluator(evaluator)
      .setEstimatorParamMaps(paramGrid)
      .setNumFolds(10)

    // train the model
val crossValidatorModel = cv.fit(trainingData)

paramGrid = 


Array({
	rfc_c9a230751e1c-impurity: entropy,
	rfc_c9a230751e1c-maxBins: 24,
	rfc_c9a230751e1c-maxDepth: 4
}, {
	rfc_c9a230751e1c-impurity: gini,
	rfc_c9a230751e1c-maxBins: 24,
	rfc_c9a230751e1c-maxDepth: 4
}, {
	rfc_c9a230751e1c-impurity: entropy,
	rfc_c9a230751e1c-maxBins: 29,
	rfc_c9a230751e1c-maxDepth: 4
}, {
	rfc_c9a230751e1c-impurity: gini,
	rfc_c9a230751e1c-maxBins: 29,
	rfc_c9a230751e1c-maxDepth: 4
}, {
	rfc_c9a230751e1c-impurity: entropy,
	rfc_c9a230751e1c-maxBins: 31,
	rfc_c9a230751e1c-maxDepth: 4
}, {
	rfc_c9a230751e1c-impurity: gini,
	rfc_c9a230751e1c-maxBins: 31,
	rfc_c9a230751e1c-maxDepth: 4
}, {
	rfc_c9a230751e1c-impurity: entropy,
	rfc_c9a230751e1c-maxBins: 24,
	rfc_c9a230751e1c-maxDepth: 6
}, {
	rfc_c9a230751e1c-impu...


In [55]:
val predictions = crossValidatorModel.transform(testData)

val accuracy = evaluator.evaluate(predictions)


predictions = [PassengerId: int, label: int ... 16 more fields]
accuracy = 0.9366197183098592


0.9366197183098592

In [56]:
case class Info(PassengerId: String, label: String)
 
predictions.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- label: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = false)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Sex_Indexed: double (nullable = false)
 |-- Pclass_Indexed: double (nullable = false)
 |-- Features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



defined class Info


In [57]:
val model = pipeline.fit(trainingData2)

val scoredDf = model.transform(test)

val outputDf = scoredDf.select("PassengerId", "prediction")

val finalDF = outputDf.withColumnRenamed("prediction", "Survived")

finalDF.printSchema()

Unknown Error: <console>:45: error: not found: value trainingData2
       val model = pipeline.fit(trainingData2)
                                ^


In [58]:
val castedDf = finalDF.select(finalDF("PassengerId"), finalDF("Survived"))   

val df3 = castedDf.withColumn("Survived", $"Survived" cast "Int" as "Survived")

Unknown Error: <console>:41: error: not found: value finalDF
       val castedDf = finalDF.select(finalDF("PassengerId"), finalDF("Survived"))
                      ^
<console>:41: error: not found: value finalDF
       val castedDf = finalDF.select(finalDF("PassengerId"), finalDF("Survived"))
                                     ^
<console>:41: error: not found: value finalDF
       val castedDf = finalDF.select(finalDF("PassengerId"), finalDF("Survived"))
                                                             ^


In [59]:
df3.count()

Unknown Error: <console>:42: error: not found: value df3
       df3.count()
       ^


In [60]:
df3.coalesce(1)
      .write
      .option("header","true")
      .option("sep",",")
      .mode("overwrite")
      .csv("op")

Unknown Error: <console>:42: error: not found: value df3
       df3.coalesce(1)
       ^
