# Iceberg Classification Step 3c: Model Training in distributed training
The following code includes demonstration for:
- get data from ``feature store``
- training with ``TFRecord``
- distributed training

This notebook is tested with the following ``configuration`` from hopsworks.
<div>
<img src="fig/step3c_jupyter_config.png" width="900" align="center"/>
</div>

In [1]:
import os
import json
import tensorflow as tf
import hops
from hops import experiment
from hops import tensorboard
from hops import model as hopsworks_model

import hsfs

# SparkSession available as 'spark'
print(
    f"-----------------------------------------------\n" \
    f"This notebook is tested with:\n" \
    f"  - TensorFlow {tf.__version__}.\n" \
    f"  - Hopsworks {hops.__version__}.\n" \
    f"  - Spark {spark.version}.\n"
)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log
246,application_1619040920875_0279,pyspark,idle,Link,Link


SparkSession available as 'spark'.
-----------------------------------------------
This notebook is tested with:
  - TensorFlow 2.4.1.
  - Hopsworks 2.1.0.1.
  - Spark 2.4.3.2.

In [2]:
def create_model(input_shape):
    """Returns a CNN model for image classification.
    
    Parameters:
    - input_shape(tuple): input shape of the CNN model.
    
    Returns:
    - a TensorFlow keras model that is not compiled yet.
    
    """
    model = tf.keras.models.Sequential()
    
    # Conv Layer 1
    model.add(tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation='relu', input_shape=input_shape))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2,2)))
    model.add(tf.keras.layers.Dropout(0.2))

    # Conv Layer 2
    model.add(tf.keras.layers.Conv2D(128, kernel_size=(3, 3), activation='relu' ))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
    model.add(tf.keras.layers.Dropout(0.2))

    # Conv Layer 3
    model.add(tf.keras.layers.Conv2D(128, kernel_size=(3, 3), activation='relu'))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
    model.add(tf.keras.layers.Dropout(0.2))

    # Conv Layer 4
    model.add(tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation='relu'))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
    model.add(tf.keras.layers.Dropout(0.2))

    # Flatten the data for upcoming dense layers
    model.add(tf.keras.layers.Flatten())

    # Dense Layers
    model.add(tf.keras.layers.Dense(512))
    model.add(tf.keras.layers.Activation('relu'))
    model.add(tf.keras.layers.Dropout(0.2))

    # Dense Layer 2
    model.add(tf.keras.layers.Dense(256))
    model.add(tf.keras.layers.Activation('relu'))
    model.add(tf.keras.layers.Dropout(0.2))

    # Sigmoid Layer
    model.add(tf.keras.layers.Dense(1))
    model.add(tf.keras.layers.Activation('sigmoid'))
    
    return model

In [3]:
def train_fn():
    """"Wrapper function for the experiment.
    
    Returns:
    - metrics: training summary.
    
    """
    
    # ---------------- Initialization ----------------
    # use this strategy to test your code before switching to other strategies which actually distributes to multiple devices/machines.
    # strategy = tf.distribute.OneDeviceStrategy(device='/gpu:0')
    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
    
    # Establish a connection with the Hopsworks feature store
    #     engine='training' is needed so that the executors in Spark can connect to feature store
    connection = hsfs.connection(engine='training') 
    # Get the feature store handle for the project's feature store
    fs = connection.get_feature_store()
    
    # Clear session info
    tf.keras.backend.clear_session()
    
    # Set up visible GPU
    gpus = tf.config.experimental.list_physical_devices('GPU')
    if gpus:
      try:
        # Currently, memory growth needs to be the same across GPUs
        for gpu in gpus:
          tf.config.experimental.set_memory_growth(gpu, True)
        logical_gpus = tf.config.experimental.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
      except RuntimeError as e:
        # Memory growth must be set before GPUs have been initialized
        print(e)
    # ---------------- Initialization ----------------
    

    
    # ---------------- Hyperparameters ----------------
    # Number of epochs to training
    EPOCHS = 15
    # Define per device batch size
    batch_size_per_replica = 32
    # Training batch size
    TRAIN_BATCH_SIZE = batch_size_per_replica * strategy.num_replicas_in_sync
    # Evaluation batch size
    EVAL_BATCH_SIZE = 1
    # Shuffle buffer size for TensorFlow dataset
    SHUFFLE_BUFFER_SIZE = 10000
    # Optimizer learning rate
    LEARNING_RATE = 0.001
    # input_shape of the model
    INPUT_SHAPE= (75, 75, 3)    
    # Name of the training dataset in feature store
    TRAIN_FS_NAME = 'train_tfrecords_iceberg_classification_dataset'
    # Name of the test dataset in feature sotre
    TEST_FS_NAME = 'test_tfrecords_iceberg_classification_dataset'
    # ---------------- Hyperparameters ----------------
    
    # ---------------- Training Process ----------------
    # construct model under distribution strategy scope
    with strategy.scope(): 
        model = create_model(INPUT_SHAPE)
        model.compile(optimizer=tf.keras.optimizers.Adam(LEARNING_RATE), loss='binary_crossentropy',  metrics=['accuracy'])
    
    # Define the TensorBoard and ModelCheckpoint callbacks.
    callbacks = []
    callbacks.append(tf.keras.callbacks.TensorBoard(log_dir=tensorboard.logdir(), histogram_freq=0, write_graph=True, write_images=True, profile_batch='5,10'))
    callbacks.append(tf.keras.callbacks.ModelCheckpoint(tensorboard.logdir()))
    
    def decode(sample):
        """Decode each training sample.
        
        This funtionc decode each sample and return it in a format that is ready for training.
        
        Parameters:
        - sample: raw features of a data sample stored in a dictionary-like object
        
        Returns:
        - x: 'band_1', 'band_2', and 'band_avg' will be reshaped and stacked
             and form the input of the model
        - y: 'is_iceberg' will be the output of the model.
        """
        
        name_list = ['band_1', 'band_2', 'band_avg', 'is_iceberg']
        x = tf.stack([sample[name_list[0]], sample[name_list[1]], sample[name_list[2]]], axis=1)
        x = tf.reshape(x, [75, 75, 3])
        y = [tf.cast(sample[name_list[3]], tf.float32)]
        return x,y
    
    # Training dataset in TFRecord format
    train_ds = fs.get_training_dataset(name=TRAIN_FS_NAME).tf_data(target_name='is_iceberg')
    train_ds = train_ds.tf_record_dataset(process=False, batch_size=TRAIN_BATCH_SIZE, num_epochs=EPOCHS)
    train_ds_processed = train_ds.map(decode).shuffle(SHUFFLE_BUFFER_SIZE).repeat(EPOCHS).cache().batch(TRAIN_BATCH_SIZE).prefetch(tf.data.experimental.AUTOTUNE)
    
    # Evaluation dataset in TFRecord format
    eval_ds = fs.get_training_dataset(name=TEST_FS_NAME).tf_data(target_name='is_iceberg')
    eval_ds = eval_ds.tf_record_dataset(process=False, batch_size=EVAL_BATCH_SIZE, num_epochs=EPOCHS)
    eval_ds_processed = eval_ds.map(decode).shuffle(SHUFFLE_BUFFER_SIZE).repeat(EPOCHS).cache().batch(EVAL_BATCH_SIZE).prefetch(tf.data.experimental.AUTOTUNE)
    
    # Start training the model.
    history = model.fit(
        train_ds_processed,
        epochs=EPOCHS,
        verbose=1,
        validation_data=eval_ds_processed,
        callbacks=callbacks
    )
    
    # 'metrics' is the return value of this function;
    #     The values in 'metrics' will be printed to the notebook cell that launch the experiment

    metrics = {
        'train_loss': history.history['loss'][-1],
        'train_accuracy': history.history['accuracy'][-1],
        'val_loss': history.history['val_loss'][-1],
        'val_accuracy': history.history['val_accuracy'][-1],
    } 
    
    # ---------------- Training Process ----------------
    
    # ---------------- Save and Export ----------------
    # Export model as savedModel
    # export_path = tensorboard.logdir() + '/SavedModel'
    export_path = os.getcwd() + '/SavedModel'

    tf.keras.models.save_model(
        model,
        export_path,
        overwrite=True,
        include_optimizer=True,
        save_format=None,
        signatures=None,
        options=None
    )
    
    # 'hopsworks_model' is the module provided by hopsworks for exporting models
    # 'hopsworks_model' is a different name of 'hops.model' to avoid name clashes
    
    # Only need to export the model on the chief
    if json.loads(os.environ['TF_CONFIG'])['task']['type'] == 'chief':  
        hopsworks_model.export(export_path, 'ship_iceberg_classifier', metrics=metrics)    
    
    return metrics

In [4]:
experiment.mirrored(train_fn, name='Iceberg_Ship_Classification_with_distributed_training', metric_key='val_accuracy')

Finished Experiment 

('hdfs://rpc.namenode.service.consul:8020/Projects/ExtremeEarth/Experiments/application_1619040920875_0278_1', {'train_loss': 0.03183300048112869, 'train_accuracy': 0.9898654222488403, 'val_loss': 0.5522504448890686, 'val_accuracy': 0.8797653913497925, 'log': 'Experiments/application_1619040920875_0278_1/chief_0_output.log'})

## END of the Step3c