<a href="https://colab.research.google.com/github/Anjasfedo/Learning-TensorFlow/blob/main/eat_tensorflow2_in_30_days/Chapter6_7.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 6-7 Call Tensorflow Model Using spark-scala

This section introduce how to use the trained TensorFlow model to predict in spark.

The prerequisite of this section is fundamental knowledge on spark and scala.

It is easier to use pyspark, since it only requires loading model with Python on each executor and predict saparately.

For the consideration of the performance, the spark in scala version is the most popular.

This section shows how to use pretrained Tensorflow model in spark through TensorFlow for Java.

It is possible to predict with the trained TensorFlow model in hundreds of thousands computers using the parallel computing feature of spark.

## 0. Using TensorFlow model in spark-scala

The necessary steps for predicting with trained TensorFlow model in spark (scala) are:
- Preparing protobuf model file
- Create a spark (scala) project, insert jar package dependencies for TensorFlow in Java.
- Loading TensorFlow model on the driver end of spark (scala) project and debug it successfully.
- Loading TensorFlow model on executor of spark (scala) project through RDD and debug it successfully.
- Loading TensorFlow model on executor of spark (scala) project through Data and debug it successfully.

## 1. Preparing protobuf Model File

Here trained simple linear regression model with `tf.keras` and save it as protobuf file.

In [1]:
import tensorflow as tf
from tensorflow.keras import models, layers, optimizers

In [2]:
# Number of samples
n = 800

In [3]:
# Generate testing dataset
X = tf.random.uniform([n, 2], minval=-10, maxval=10)
w0 = tf.constant([[2.0], [-1.0]])
b0 = tf.constant(3.0)

Y = X @ w0 + b0 + tf.random.normal([n, 1], mean=0.0, stddev=2.0)

In [5]:
# Modeling
tf.keras.backend.clear_session()

inputs = layers.Input(shape=(2,), name='inputs')
outputs = layers.Dense(1, name='outputs')(inputs)
model = models.Model(inputs=inputs, outputs=outputs)
model.summary()

In [6]:
# Train with fit method
model.compile(optimizer='rmsprop', loss='mse', metrics=['mae'])
model.fit(X, Y, epochs=100, batch_size=8)

Epoch 1/100
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - loss: 229.6244 - mae: 12.9121
Epoch 2/100
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 193.6105 - mae: 11.7040
Epoch 3/100
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 189.5473 - mae: 11.5524
Epoch 4/100
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 5ms/step - loss: 179.9805 - mae: 11.4547
Epoch 5/100
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step - loss: 166.9619 - mae: 11.0658
Epoch 6/100
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/step - loss: 161.3964 - mae: 10.8517
Epoch 7/100
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 137.0596 - mae: 9.9848
Epoch 8/100
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step - loss: 126.6253 - mae: 9.4383
Epoch 9/100
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━

<keras.src.callbacks.history.History at 0x7e8cde609480>

In [9]:
print(f'w: {model.layers[1].kernel}')
print(f'b: {model.layers[1].bias}')

w: <KerasVariable shape=(2, 1), dtype=float32, path=outputs/kernel>
b: <KerasVariable shape=(1,), dtype=float32, path=outputs/bias>


In [12]:
# Save the model
export_path = '/content/model/'
version = '1'
tf.saved_model.save(model, export_path + version)

In [14]:
!ls {export_path + version}

assets	fingerprint.pb	saved_model.pb	variables


In [15]:
!saved_model_cli show --dir {export_path + version} --all

2024-08-06 01:42:22.421622: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-08-06 01:42:22.455905: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-08-06 01:42:22.464964: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered

MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:

signature_def['__saved_model_init_op']:
  The given SavedModel SignatureDef contains the following input(s):
  The given SavedModel SignatureDef contains the following output(s):
    outputs['__saved_model_init_op'] tensor_info:
        dtype: DT_INVALID
        shape: unknown_rank
       

## 2. Create a spark (scala) project, insert jar package dependencies for TensorFlow in Java.

Need to add following jar package dependency if use maven to manage projects

```
<!-- https://mvnrepository.com/artifact/org.tensorflow/tensorflow -->
<dependency>
    <groupId>org.tensorflow</groupId>
    <artifactId>tensorflow</artifactId>
    <version>1.15.0</version>
</dependency>
```

May also download the jar package `org.tensorflow.tensorflow`, together with depended `org.tensorflow.libtensorflow` and `org.tensorflow.libtensorflow_jni` from the following link, then add all of them into the project.

https://mvnrepository.com/artifact/org.tensorflow/tensorflow/1.15.0

## 3. Loading Tensorflow Model on the driver end of spark (scala) project and debug it successfully

The following demonstration is run in jupyter notebook. Need to install toree to have it support spark (scala)



```
import scala.collection.mutable.WrappedArray
import org.{tensorflow=>tf}

//Note: the second argument of the load function should be "serve"; the related info could be found from the model file.

val bundle = tf.SavedModelBundle
   .load("/Users/liangyun/CodeFiles/eat_tensorflow2_in_30_days/data/linear_model/1","serve")

//Note: for the Java version TensorFlow uses static graph as TensorFlow 1.X, i.e. use `Session`, then explicit data to feed and results to fetch, and finally run it.
//Note: multiple feed methods could be used consequetively when we need to feed multiple data.
//Note: the input must be in the type of float

val sess = bundle.session()
val x = tf.Tensor.create(Array(Array(1.0f,2.0f),Array(2.0f,3.0f)))
val y =  sess.runner().feed("serving_default_inputs:0", x)
         .fetch("StatefulPartitionedCall:0").run().get(0)

val result = Array.ofDim[Float](y.shape()(0).toInt,y.shape()(1).toInt)
y.copyTo(result)

if(x != null) x.close()
if(y != null) y.close()
if(sess != null) sess.close()
if(bundle != null) bundle.close()  

result
```



With output:

```
Array(Array(3.019596), Array(3.9878292))
```


## 4. Loading TensorFLow model on executor of spark (scala) project through RDD and debug it successfully

Here transer the TensorFlow model loaded on the Driver end to each executor through broadcasting, and predict with distributed computing on all the executors.



```
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.WrappedArray
import org.{tensorflow=>tf}

val spark = SparkSession
    .builder()
    .appName("TfRDD")
    .enableHiveSupport()
    .getOrCreate()

val sc = spark.sparkContext

// Loading model on Driver end
val bundle = tf.SavedModelBundle
   .load("/Users/liangyun/CodeFiles/master_tensorflow2_in_20_hours/data/linear_model/1","serve")

// Broadcasting the model to all the executors
val broads = sc.broadcast(bundle)

// Creating dataset
val rdd_data = sc.makeRDD(List(Array(1.0f,2.0f),Array(3.0f,5.0f),Array(6.0f,7.0f),Array(8.0f,3.0f)))

// Predicting in batch by using the model through mapPartitions
val rdd_result = rdd_data.mapPartitions(iter => {
    
    val arr = iter.toArray
    val model = broads.value
    val sess = model.session()
    val x = tf.Tensor.create(arr)
    val y =  sess.runner().feed("serving_default_inputs:0", x)
             .fetch("StatefulPartitionedCall:0").run().get(0)

    // Copy the prediction into the Array in type Float with the same shape
    val result = Array.ofDim[Float](y.shape()(0).toInt,y.shape()(1).toInt)
    y.copyTo(result)
    result.iterator
    
})


rdd_result.take(5)
bundle.close
```



With output:

```
Array(Array(3.019596), Array(3.9264367), Array(7.8607616), Array(15.974984))
```

## 5. Loading TensorFlow model on executor of spark (scala) project through Data and debug it successfully

The disctibuted prediction using TensorFlow model could also be implemented on DataFrame data, besides implementing on RDD data in spark.

It could be done through registering the method of prediction as sparkSQL function.



```
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.WrappedArray
import org.{tensorflow=>tf}

object TfDataFrame extends Serializable{
    
    
    def main(args:Array[String]):Unit = {
        
        val spark = SparkSession
        .builder()
        .appName("TfDataFrame")
        .enableHiveSupport()
        .getOrCreate()
        val sc = spark.sparkContext
        
        
        import spark.implicits._

        val bundle = tf.SavedModelBundle
           .load("/Users/liangyun/CodeFiles/master_tensorflow2_in_20_hours/data/linear_model/1","serve")

        val broads = sc.broadcast(bundle)
        
        // Construct the prediction function and register it as udf of sparkSQL
        val tfpredict = (features:WrappedArray[Float])  => {
            val bund = broads.value
            val sess = bund.session()
            val x = tf.Tensor.create(Array(features.toArray))
            val y =  sess.runner().feed("serving_default_inputs:0", x)
                     .fetch("StatefulPartitionedCall:0").run().get(0)
            val result = Array.ofDim[Float](y.shape()(0).toInt,y.shape()(1).toInt)
            y.copyTo(result)
            val y_pred = result(0)(0)
            y_pred
        }
        spark.udf.register("tfpredict",tfpredict)
        
        // Creating DataFrame dataset, and put the features into one of the columns
        val dfdata = sc.parallelize(List(Array(1.0f,2.0f),Array(3.0f,5.0f),Array(7.0f,8.0f))).toDF("features")
        dfdata.show
        
        // Call the sparkSQL predicting function, add a new column as y_preds
        val dfresult = dfdata.selectExpr("features","tfpredict(features) as y_preds")
        dfresult.show
        bundle.close
    }
}
```





```
TfDataFrame.main(Array())
```



```
+----------+
|  features|
+----------+
|[1.0, 2.0]|
|[3.0, 5.0]|
|[7.0, 8.0]|
+----------+

+----------+---------+
|  features|  y_preds|
+----------+---------+
|[1.0, 2.0]| 3.019596|
|[3.0, 5.0]|3.9264367|
|[7.0, 8.0]| 8.828995|
+----------+---------+
```