### What's BigDL

In 2016, Intel released its BigDL distributed Deep Learning project into the open-source community. It natively integrates into Spark framework and supports popular neural net topologies. BigDL also provides 100+ basic building blocks for neural networks allowing users to create novel topologies to suit their unique applications. Thus with Intel’s BigDL, the users are able to leverage their existing Spark infrastructure to enable Deep Learning applications without having to invest into bringing up separate infrastructure to take advantage of neural networks. Since BigDL is an integral part of Spark framework, the user does not need explicitly manage distributed computations. Instead, BigDL and Spark do it for him/her, while allowing sufficient level of optimization and customization of compute execution. See below for details and check out Intel’s BigDL portal. 

In the notebook, we will desmonstrate how to train lenet on mnist in BigDL. For more detail, see https://github.com/intel-analytics/BigDL/wiki/Tutorials#training-lenet-on-mnist

### Set enviroment variables

BigDL leverages Intel MKL kernels. Therefore, a few special env variable need to be set in executors for a optimal performance. 

In [None]:
%%configure -f

{ 
    "jars":["wasb:///bigdl-0.1.0-SNAPSHOT-jar-with-dependencies2.jar"],
    
    "conf": {
        "spark.executorEnv.DL_ENGINE_TYPE": "mklblas",
        "spark.executorEnv.MKL_DISABLE_FAST_MM": "1",
        "spark.executorEnv.KMP_BLOCKTIME": "0",
        "spark.executorEnv.OMP_WAIT_POLICY": "passive",
        "spark.executorEnv.OMP_NUM_THREADS": "1",
        "spark.executorEnv.DL_CORE_NUMBER": "7",
        "spark.executorEnv.DL_NODE_NUMBER": "4",
        "spark.driver.extraJavaOptions": "-Dbigdl.check.singleton=false"
}
}

### Set Cluster Depenedent Parameters
Other cluster-dependent parameters also need to be passed (e.g. executor cores number) and some Spark properties need be set correctly. After set coreNumber, nodeNumber, we need initialize Engine. The Engine will appropriately initialize executor environment variables and spark properties in order to get the best performance for your deep learning application on Spark cluster. Note:
1.	Core number should be less than the number of physical cores available. If your machine turns on hyper-threading, one physical core will map to two OS cores. The higher the core number, the better training speed we can get. You can get the core number by going to the YARN UI of your cluster (https://clusteraddress.azurehdinsight.net/yarnui). In this example, we use a cluster that has 4 nodes and each node has 8 cores,
2.	The totalBatch means  total batch size. In a distributed environment, the batch_size should be divided by nodeNumber *coreNumber

In [None]:
/*
 * --coreNumber: Core number used in the training per node. 
 * --nodeNumber: Executor number in cluster.
 * --batchsize: Mini batch size.
 */
import com.intel.analytics.bigdl.utils.Engine
val nodeNumber = 4
val coreNumber = 7
val mult = 64
val batchSize = nodeNumber * coreNumber * mult
Engine.init(nodeNumber, coreNumber, true /* env == "spark" */)

### Prepare dataset

We use MNIST as our dataset. It is a database of handwritten digits and the digits have been size-normalized and centered in a fixed-size image. Therefore it's a good database for people who want to try learning techniques and pattern recognition methods on real-world data while spending minimal efforts on preprocessing and formatting. There are four files in the dataset. train-images-idx3-ubyte contains train images, train-labels-idx1-ubyte is a train label file, t10k-images-idx3-ubyte has validation images and t10k-labels-idx1-ubyte contains validation labels.

You should put all the MNIST dataset in the default storage account (the BLOB storage).

In [None]:
/* 
 * Load data from files
 */
import java.nio.ByteBuffer
import java.nio.file.Path
import com.intel.analytics.bigdl.dataset._
import com.intel.analytics.bigdl.dataset.image._
import com.intel.analytics.bigdl.models.lenet.Utils

def loadBinaryFile(filePath: Path): Array[Byte] = {
  val files = sc.binaryFiles(filePath.toString())
  val bytes = files.first()._2.toArray()
  bytes
}

def load(featureFile: Path, labelFile: Path): Array[ByteRecord] = {
  val labelBuffer = ByteBuffer.wrap(loadBinaryFile(labelFile))
  val featureBuffer = ByteBuffer.wrap(loadBinaryFile(featureFile))
  val labelMagicNumber = labelBuffer.getInt()

  require(labelMagicNumber == 2049)
  val featureMagicNumber = featureBuffer.getInt()
  require(featureMagicNumber == 2051)

  val labelCount = labelBuffer.getInt()
  val featureCount = featureBuffer.getInt()
  require(labelCount == featureCount)

  val rowNum = featureBuffer.getInt()
  val colNum = featureBuffer.getInt()

  val result = new Array[ByteRecord](featureCount)
  var i = 0
  while (i < featureCount) {
    val img = new Array[Byte]((rowNum * colNum))
    var y = 0
    while (y < rowNum) {
      var x = 0
      while (x < colNum) {
        img(x + y * colNum) = featureBuffer.get()
        x += 1
      }
      y += 1
    }
    result(i) = ByteRecord(img, labelBuffer.get().toFloat + 1.0f)
    i += 1
  }
  result
}

In [None]:
// Files used for training
val dir = "/mnistdataset"
val dbfsDir = dir

val trainDataFile = "train-images-idx3-ubyte"
val trainLabelFile = "train-labels-idx1-ubyte"
val validationDataFile = "t10k-images-idx3-ubyte"
val validationLabelFile = "t10k-labels-idx1-ubyte"

Here we load data from MINIST by creating the BigDL DataSet , and then applying a series of Transformer to preprocess data into Mini-batch.

In [None]:
/* Preprocess data, Convert byte records into grey image, 
 * normalize it and convert a batch of labeled grey
 * images into a Mini-batch
 */
import java.nio.file.Paths
import com.intel.analytics.bigdl.DataSet
import com.intel.analytics.bigdl.dataset._
import com.intel.analytics.bigdl.dataset.image._
val trainData = Paths.get(dbfsDir, trainDataFile)
val trainLabel = Paths.get(dbfsDir, trainLabelFile)
val validationData = Paths.get(dbfsDir, validationDataFile)
val validationLabel = Paths.get(dbfsDir, validationLabelFile)

val trainMean = 0.13066047740239506
val trainStd = 0.3081078
val trainSet = DataSet.array(load(trainData, trainLabel), sc) -> BytesToGreyImg(28, 28) -> GreyImgNormalizer(trainMean, trainStd) -> GreyImgToBatch(batchSize)

val testMean = 0.13251460696903547
val testStd = 0.31048024
val validationSet = DataSet.array(load(validationData, validationLabel), sc) -> BytesToGreyImg(28, 28) -> GreyImgNormalizer(testMean, testStd) -> GreyImgToBatch(batchSize)


### Train model

Before training, we need set hyperparameters which determine how the network is trained. For more details, see http://colinraffel.com/wiki/neural_network_hyperparameters

And we use LeNet5 as our training model. LeNet5 is a classical CNN model used in digital number classification. For detailed information, please refer to http://yann.lecun.com/exdb/lenet/ 

Then we create the Optimizer by specifying the DataSet, the Model and the Criterion (which, given input and target, computes gradient per given loss function) and begin training. The training step may take a while on a small number of nodes. Using a compute cluster will speed up training significantly.

In [None]:
import com.intel.analytics.bigdl.models.lenet._
import com.intel.analytics.bigdl.nn._
import com.intel.analytics.bigdl.optim._
import com.intel.analytics.bigdl.utils._

// Set hyperparameters. we set learningrate and max epoches here
val state = T("learningRate" -> 0.05 / 4 * mult)

// Set maximun epoches
val maxEpoch = 15

// Train Lenet model
val initialModel = LeNet5(10)  // 10 digit classes
val optimizer = Optimizer(
  model = initialModel,   // training modes
  dataset = trainSet,     // training dataset
  criterion = ClassNLLCriterion[Float]()) // loss function
val trainedModel = optimizer.setValidation(
    trigger = Trigger.everyEpoch,
    dataset = validationSet,
    vMethods = Array(new Top1Accuracy)).setState(state).setEndWhen(Trigger.maxEpoch(maxEpoch)).optimize()

### Validate model

Now we can validate the learned model. BigDL provides a set of metrics to evaluate the model via Validator and ValidationMethod classes. Here we use Top1Accuracy as validationMethod. It will calculate top 1 accuracy.

In [None]:
// Validate the learned model
import com.intel.analytics.bigdl.optim.{LocalValidator, Top1Accuracy, Validator}
val validator = Validator(trainedModel, validationSet)
val result = validator.test(Array(new Top1Accuracy[Float]))
result.foreach(r => {
  println(s"${r._2} is ${r._1}")
})

### Looking at the result
You should be able to see a model that has top 1 accuracy bigger than 98%. 

### Conclusion
In this blog post, we have demonstrated how easy it is to set up a BigDL environment on HDInsight Spark. Leveraging BigDL Spark library, a user can easily write scalable distributed Deep Learning applications within familiar Spark infrastructure without an intimate knowledge of the configuration of the underlying compute cluster. BigDL and Azure HDInsight team have been collaborating closely enable BigDL in Azure HDInsight environment.
If you have any feedbacks for HDInsight, feel free to drop an email to hdifeedback at microsoft dot com. If you have any questions for BigDL, you can raise your questions in BigDL Google Group (https://groups.google.com/forum/#!forum/bigdl-user-group).


### Resources
[Learn more about Azure HDInsight](https://docs.microsoft.com/en-us/azure/hdinsight/)

[Aritificial Intelligence Software and Hardware at Intel](https://software.intel.com/ai)

[BigDL introductory video](https://software.intel.com/en-us/videos/bigdl-distributed-deep-learning-on-apache-spark)
