In [1]:
%classpath add mvn com.salesforce.transmogrifai transmogrifai-core_2.11 0.7.0

In [2]:
%classpath add mvn org.apache.spark spark-mllib_2.11 2.4.5

In [3]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
import org.apache.spark.sql.functions.udf

import com.salesforce.op._
import com.salesforce.op.features._
import com.salesforce.op.features.types._
import com.salesforce.op.stages.impl.classification._
import com.salesforce.op.evaluators.Evaluators

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
import org.apache.spark.sql.functions.udf
import com.salesforce.op._
import com.salesforce.op.features._
import com.salesforce.op.features.types._
import com.salesforce.op.stages.impl.classification._
import com.salesforce.op.evaluators.Evaluators


In [4]:
val conf = new SparkConf().setMaster("local[*]").setAppName("TitanicPrediction")
implicit val spark = SparkSession.builder.config(conf).getOrCreate()

org.apache.spark.sql.SparkSession@74f42135

In [5]:
case class Passenger(
  id: Int,
  survived: Int,
  pClass: Option[Int],
  name: Option[String],
  sex: Option[String],
  age: Option[Double],
  sibSp: Option[Int],
  parCh: Option[Int],
  ticket: Option[String],
  fare: Option[Double],
  cabin: Option[String],
  embarked: Option[String]
)

defined class Passenger


In [6]:
val survived = FeatureBuilder.RealNN[Passenger].extract(_.survived.toRealNN).asResponse
val pClass = FeatureBuilder.PickList[Passenger].extract(_.pClass.map(_.toString).toPickList).asPredictor
val name = FeatureBuilder.Text[Passenger].extract(_.name.toText).asPredictor
val sex = FeatureBuilder.PickList[Passenger].extract(_.sex.map(_.toString).toPickList).asPredictor
val age = FeatureBuilder.Real[Passenger].extract(_.age.toReal).asPredictor
val sibSp = FeatureBuilder.Integral[Passenger].extract(_.sibSp.toIntegral).asPredictor
val parCh = FeatureBuilder.Integral[Passenger].extract(_.parCh.toIntegral).asPredictor
val ticket = FeatureBuilder.PickList[Passenger].extract(_.ticket.map(_.toString).toPickList).asPredictor
val fare = FeatureBuilder.Real[Passenger].extract(_.fare.toReal).asPredictor
val cabin = FeatureBuilder.PickList[Passenger].extract(_.cabin.map(_.toString).toPickList).asPredictor
val embarked = FeatureBuilder.PickList[Passenger].extract(_.embarked.map(_.toString).toPickList).asPredictor

Feature(name = embarked, uid = PickList_00000000000b, isResponse = false, originStage = FeatureGeneratorStage_00000000000b, parents = [], distributions = [])

In [7]:
val familySize = sibSp + parCh + 1
val estimatedCostOfTickets = familySize * fare
val pivotedSex = sex.pivot()
val normedAge = age.fillMissingWithMean().zNormalize()
val ageGroup = age.map[PickList](_.value.map(v => if (v > 18) "adult" else "child").toPickList)


Feature(name = age_1-stagesApplied_PickList_000000000012, uid = PickList_000000000012, isResponse = false, originStage = UnaryLambdaTransformer_000000000012, parents = [Real_000000000005], distributions = [])

In [8]:
val passengerFeatures = Seq(
      pClass, name, age, sibSp, parCh, ticket,
      cabin, embarked, familySize, estimatedCostOfTickets,
      pivotedSex, ageGroup
    ).transmogrify()

Feature(name = age-cabin-embarked-fare-name-pClass-parCh-sex-sibSp-ticket_10-stagesApplied_OPVector_000000000017, uid = OPVector_000000000017, isResponse = false, originStage = VectorsCombiner_000000000017, parents = [OPVector_000000000013,OPVector_00000000000f,OPVector_000000000014,OPVector_000000000015,OPVector_000000000016], distributions = [])

In [9]:
val sanityCheck = true
val finalFeatures = if (sanityCheck) survived.sanityCheck(passengerFeatures) else passengerFeatures

Feature(name = age-cabin-embarked-fare-name-pClass-parCh-sex-sibSp-survived-ticket_11-stagesApplied_OPVector_000000000018, uid = OPVector_000000000018, isResponse = false, originStage = SanityChecker_000000000018, parents = [RealNN_000000000001,OPVector_000000000017], distributions = [])

In [10]:
import com.salesforce.op.stages.impl.classification.BinaryClassificationModelSelector
import com.salesforce.op.stages.impl.classification.BinaryClassificationModelsToTry._

val prediction =
      BinaryClassificationModelSelector.withTrainValidationSplit(
        modelTypesToUse = Seq(OpLogisticRegression)
      ).setInput(survived, finalFeatures).getOutput()


Feature(name = age-cabin-embarked-fare-name-pClass-parCh-sex-sibSp-survived-ticket_12-stagesApplied_Prediction_000000000023, uid = Prediction_000000000023, isResponse = true, originStage = ModelSelector_000000000023, parents = [RealNN_000000000001,OPVector_000000000018], distributions = [])

In [11]:
val evaluator = Evaluators.BinaryClassification().setLabelCol(survived).setPredictionCol(prediction)

OpBinaryClassificationEvaluator_000000000024

In [12]:
import spark.implicits._ // Needed for Encoders for the Passenger case class
import com.salesforce.op.readers.DataReaders

val trainFilePath = "/home/beakerx/helloworld/src/main/resources/TitanicDataset/TitanicPassengersTrainData.csv"
    // Define a way to read data into our Passenger class from our CSV file
val trainDataReader = DataReaders.Simple.csvCase[Passenger](
      path = Option(trainFilePath),
      key = _.id.toString
    )

org.apache.spark.sql.SparkSession$implicits$@791670c4

In [13]:
val workflow =
      new OpWorkflow()
        .setResultFeatures(survived, prediction)
        .setReader(trainDataReader)

com.salesforce.op.OpWorkflow@57f8a7f7

In [14]:
val fittedWorkflow = workflow.train()
println("Summary:\n" + fittedWorkflow.summaryPretty())

Summary:
Evaluated OpLogisticRegression model using Train Validation Split and area under precision-recall metric.
Evaluated 8 OpLogisticRegression models with area under precision-recall metric between [0.7246270149973595, 0.8099677675881571].
+--------------------------------------------------------+
|         Selected Model - OpLogisticRegression          |
+--------------------------------------------------------+
| Model Param      | Value                               |
+------------------+-------------------------------------+
| aggregationDepth | 2                                   |
| elasticNetParam  | 0.1                                 |
| family           | auto                                |
| fitIntercept     | true                                |
| maxIter          | 50                                  |
| modelType        | OpLogisticRegression                |
| name             | OpLogisticRegression_00000000001c_3 |
| regParam         | 0.2                       

null

In [15]:
println("Scoring the model:\n=================")
val (dataframe, metrics) = fittedWorkflow.scoreAndEvaluate(evaluator = evaluator)

println("Transformed dataframe columns:\n--------------------------")
dataframe.columns.foreach(println)

println("Metrics:\n------------")
println(metrics)

Scoring the model:
Transformed dataframe columns:
--------------------------
key
survived
age-cabin-embarked-fare-name-pClass-parCh-sex-sibSp-survived-ticket_12-stagesApplied_Prediction_000000000023
Metrics:
------------
{
  "Precision" : 0.826530612244898,
  "Recall" : 0.7105263157894737,
  "F1" : 0.7641509433962264,
  "AuROC" : 0.8860847473875945,
  "AuPR" : 0.8642975015676693,
  "Error" : 0.16835016835016836,
  "TP" : 243.0,
  "TN" : 498.0,
  "FP" : 51.0,
  "FN" : 99.0,
  "thresholds" : [ 0.8429051668236677, 0.8340050884125256, 0.8248385563692645, 0.8158377748929678, 0.811440517817274, 0.8063078095375442, 0.8012109822316678, 0.7938396389206815, 0.7914264524124233, 0.7885731053087678, 0.7852590636844079, 0.7804150024804707, 0.7748023382106847, 0.7658723601130933, 0.7531030924152463, 0.7441859014776429, 0.7333930943898694, 0.719494072372077, 0.7026698010014258, 0.697036002520517, 0.6871191380068455, 0.6677021670164942, 0.6602723428268436, 0.6541997986165858, 0.6479072714009527, 0.6416

null