In [1]:
%%configure -f
{"jars":["/user/livy/repl_jars/dl4j-assembly-0.6.0.jar"],"driverMemory":"3g","executorMemory":"2g","conf":{"spark.driver.extraClassPath":"/home/livy/dl4j-assembly-0.6.0.jar","spark.serializer":"org.apache.spark.serializer.KryoSerializer","spark.kryo.registrator":"org.nd4j.Nd4jRegistrator"}}

In [None]:
val rawData = sc.textFile("data/mls/ch05/train.tsv")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1477665064581_0002,spark,idle,Link,Link,✔


SparkContext available as 'sc'.
SqlContext available as 'sqlContext'.


In [None]:
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

val rawDataNoHeader = rawData.filter(line => !line.contains("hasDomainLink"))

val records = rawDataNoHeader.map(line => line.split("\t"))

val data = records.map { r =>
  val trimmed = r.map(_.replaceAll("\"", ""))
  val label = trimmed(r.size - 1).toInt
  val features = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0 else d.toDouble)
  LabeledPoint(label.toDouble, Vectors.dense(features))
}

data.cache
println(data.count)

In [None]:
val categories = records.map(r => r(3)).distinct.collect.zipWithIndex.toMap
val numCategories = categories.size
println(categories)
println(numCategories)

val dataCategories = records.map { r =>
  val trimmed = r.map(_.replaceAll("\"", ""))
  val label = trimmed(r.size - 1).toInt
  val categoryIdx = categories(r(3))
  val categoryFeatures = Array.ofDim[Double](numCategories)
  categoryFeatures(categoryIdx) = 1.0
  val otherFeatures = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0 else d.toDouble)
  val features = categoryFeatures ++ otherFeatures
  LabeledPoint(label, Vectors.dense(features))
}

import org.apache.spark.mllib.feature.StandardScaler
val scalerCats = new StandardScaler(withMean = true, withStd = true).fit(dataCategories.map(lp => lp.features))
val scaledDataCats = dataCategories.map(lp => LabeledPoint(lp.label, scalerCats.transform(lp.features)))
val dataNB = records.map { r =>
  val trimmed = r.map(_.replaceAll("\"", ""))
  val label = trimmed(r.size - 1).toInt
  val categoryIdx = categories(r(3))
  val categoryFeatures = Array.ofDim[Double](numCategories)
  categoryFeatures(categoryIdx) = 1.0
  LabeledPoint(label, Vectors.dense(categoryFeatures))
}

In [None]:
val trainTestSplit = scaledDataCats.randomSplit(Array(0.6, 0.4), 123)
val train = trainTestSplit(0)
val test = trainTestSplit(1)

In [None]:
val iterations = 10
val seed = 123
val learningRate = 0.005
val nEpochs = 20
val numInputs = 36
val numOutputs = 2
val numHiddenNodes = 30
val batchSizePerWorker = 16

In [None]:
import org.deeplearning4j.datasets.iterator.impl.MnistDataSetIterator
import org.deeplearning4j.nn.api.OptimizationAlgorithm
import org.deeplearning4j.nn.conf.layers.{ DenseLayer, OutputLayer }
import org.deeplearning4j.nn.conf.{ NeuralNetConfiguration, Updater }
import org.deeplearning4j.nn.weights.WeightInit
import org.deeplearning4j.spark.api.{ Repartition, RepartitionStrategy }
import org.deeplearning4j.spark.impl.multilayer.SparkDl4jMultiLayer
import org.deeplearning4j.spark.impl.paramavg.ParameterAveragingTrainingMaster
import org.deeplearning4j.spark.stats.StatsUtils
import org.nd4j.linalg.dataset.DataSet
import org.nd4j.linalg.lossfunctions.LossFunctions._

val nnconf = new NeuralNetConfiguration.Builder().
                seed(seed).
                iterations(iterations).
                optimizationAlgo(OptimizationAlgorithm.STOCHASTIC_GRADIENT_DESCENT).
                learningRate(learningRate).
                updater(Updater.NESTEROVS).momentum(0.9).
                list().
                layer(0, new DenseLayer.Builder().
                               nIn(numInputs).
                               nOut(numHiddenNodes).
                               weightInit(WeightInit.XAVIER).
                               activation("relu").
                               build()).
                layer(1, new OutputLayer.Builder(LossFunction.NEGATIVELOGLIKELIHOOD).
                               weightInit(WeightInit.XAVIER).
                               activation("softmax").
                               nIn(numHiddenNodes).nOut(numOutputs).
                               build()).
                pretrain(false).backprop(true).build()


In [None]:
val tm = new ParameterAveragingTrainingMaster.Builder(batchSizePerWorker).
    averagingFrequency(10).
    saveUpdater(true).
    workerPrefetchNumBatches(2).
    batchSizePerWorker(batchSizePerWorker).
    repartionData(Repartition.Always).
    repartitionStrategy(RepartitionStrategy.SparkDefault).
    build()

In [None]:
val sparkNet = new SparkDl4jMultiLayer(sc, nnconf, tm)

In [None]:
for (i <- 0 until nEpochs) {
    val _ = sparkNet.fitLabeledPoint(train)
}

In [None]:
import org.deeplearning4j.spark.util._

val evaluation = sparkNet.evaluate(MLLibUtil.fromLabeledPoint(test, 2, batchSizePerWorker))
println(evaluation.stats())