# Image Classification on Spark with GPU

This demo is to show how you can leverage Spark 3.0 with GPU sharing feature on a standalone cluster. In our setup, we will create a Spark cluster with 1 master and 1 worker. We would strongly recommend to run this Notebook in [Amazon SageMaker Notebook](https://docs.aws.amazon.com/sagemaker/latest/dg/nbi.html) since the tutorial is setup and tested on this platform.

## Setup

You may need to switch the system default CUDA version by following [instruction](https://docs.aws.amazon.com/dlami/latest/devguide/tutorial-base.html)

```bash
sudo rm /usr/local/cuda
sudo ln -s /usr/local/cuda-10.1 /usr/local/cuda
nvcc --version
```

Currently, DJL can work with CUDA 10.1 and 10.2.

Let's start by spawn a terminal in JupyterLab. Please follow our [SageMaker Setup](https://github.com/aws-samples/djl-demo/tree/master/aws/sagemaker-notebook) with Scala Kernel and Spark.

After follow the instruction, you should have a Spark standalone cluster setup.

## Import dependencies



In [None]:
import $ivy.`org.apache.spark::spark-sql:3.0.1`
import $ivy.`org.apache.spark::spark-mllib:3.0.1`
import $ivy.`org.apache.hadoop:hadoop-hdfs:2.7.4`
import $ivy.`ai.djl:api:0.8.0`
import $ivy.`ai.djl.pytorch:pytorch-model-zoo:0.8.0`
import $ivy.`ai.djl.pytorch:pytorch-native-auto:1.6.0`

In [None]:
import java.net.URL
import java.nio.file.Files
import java.util
import ai.djl.{Device, Model}
import ai.djl.modality.Classifications
import ai.djl.modality.cv.transform.{Resize, ToTensor}
import ai.djl.ndarray.types.{DataType, Shape}
import ai.djl.ndarray.{NDList, NDManager}
import ai.djl.repository.zoo.{Criteria, ModelZoo, ZooModel}
import ai.djl.training.util.{DownloadUtils, ProgressBar}
import ai.djl.translate.{Batchifier, Pipeline, Translator, TranslatorContext}
import ai.djl.util.{Utils, ZipUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.ml.image.ImageSchema
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Encoders, Row, NotebookSparkSession}

In [None]:
val url = "https://alpha-djl-demos.s3.amazonaws.com/spark-demo/images.zip"
val tempPath = Files.createTempDirectory("images")
DownloadUtils.download(new URL(url), tempPath.resolve("images.zip"), new ProgressBar())
ZipUtils.unzip(Files.newInputStream(tempPath.resolve("images.zip")), tempPath)
// upload to hadoop
println("Upload images to HDFS...")
val hadoopConf = new Configuration()
val hdfs = FileSystem.get(hadoopConf)
val srcPath = new Path(tempPath.toAbsolutePath.toString)
val outputPath = new Path("hdfs:///images")
hdfs.copyFromLocalFile(srcPath, outputPath)
val imagePath = outputPath.toString

## Create Translator

A Translator in DJL is used to define the preprocessing and postprocessing logic. The following code is to 

- preprocess: convert a Spark DataFrame Row to DJL NDArray.
- postprocess: convert inference result to classifications

In [None]:
// Translator: a class used to do preprocessing and post processing
class MyTranslator extends Translator[Row, Classifications] {

    private var classes: java.util.List[String] = new util.ArrayList[String]()
    private val pipeline: Pipeline = new Pipeline()
      .add(new Resize(224, 224))
      .add(new ToTensor())

    override def prepare(manager: NDManager, model: Model): Unit = {
        classes = Utils.readLines(model.getArtifact("synset.txt").openStream())
      }

    override def processInput(ctx: TranslatorContext, row: Row): NDList = {

      val height = ImageSchema.getHeight(row)
      val width = ImageSchema.getWidth(row)
      val channel = ImageSchema.getNChannels(row)
      var image = ctx.getNDManager.create(ImageSchema.getData(row), new Shape(height, width, channel)).toType(DataType.UINT8, true)
      // BGR to RGB
      image = image.flip(2)
      pipeline.transform(new NDList(image))
    }

    // Deal with the output.，NDList contains output result, usually one or more NDArray(s).
    override def processOutput(ctx: TranslatorContext, list: NDList): Classifications = {
      var probabilitiesNd = list.singletonOrThrow
      probabilitiesNd = probabilitiesNd.softmax(0)
      new Classifications(classes, probabilitiesNd)
    }

    override def getBatchifier: Batchifier = Batchifier.STACK
}

## Load the model

Now, we just need to fetch the model from a URL. The url can be a hdfs (hdfs://), file (file://) or http (https://) format. We use Criteria as a container to store the model and translator information. Then, all we need to do is to load the model from it.

Note: DJL Criteria and Model are not serializable, so we add `lazy` declaration.

In [None]:
val modelUrl = "https://alpha-djl-demos.s3.amazonaws.com/model/djl-blockrunner/pytorch_resnet18.zip?model_name=traced_resnet18"
lazy val criteria = Criteria.builder
  .setTypes(classOf[Row], classOf[Classifications])
  .optModelUrls(modelUrl)
  .optTranslator(new MyTranslator())
  .optProgress(new ProgressBar)
  .build()
lazy val model = ModelZoo.loadModel(criteria)

## Start Spark application

We can create a `NotebookSparkSession` through the Almond Spark plugin. It will internally apply all necessary jars to each of the worker node.

In [None]:
// Create Spark session
val spark = {
  NotebookSparkSession.builder()
    .master("spark://localhost:7077")
    .config("spark.task.resource.gpu.amount", "0.25")
    .config("spark.task.cpus", "2")
    .config("spark.executor.resource.gpu.amount", "1")
    .getOrCreate()
}

In [None]:
spark.conf.getAll.foreach(pair => println(pair._1 + ":" + pair._2))
val df = spark.read.format("image").option("dropInvalid", true).load(imagePath)

In [None]:
val result = df.select(col("image.*")).mapPartitions(partition => {
  val context = TaskContext.get()
  val gpu = context.resources()("gpu").addresses(0)
  val model = loadModel(Device.gpu(gpu.toInt))
  val predictor = model.newPredictor()
  partition.map(row => {
    // image data stored as HWC format
    predictor.predict(row).toString
  })
})(Encoders.STRING)
println(result.collect().mkString("\n"))