#### Before we proceed, let's check that we're using the right image, that is, TensorFlow is available:

In [1]:
#! pip3 list | grep tensorflow 
#! pip3 install --user tensorflow==2.4.0
#! pip3 install --user ipywidgets nbconvert
#!python -m pip install --user --upgrade pip
#!pip3 install pandas scikit-learn keras tensorflow-datasets --user

#### To package the trainer in a container image, we shall need a file (on our cluster) that contains the code as well as a file with the resource definitition of the job for the Kubernetes cluster:

In [38]:
TRAINER_FILE = "tfjobairline.py"
KUBERNETES_FILE = "tfjob-airline.yaml"

#### We also want to capture output from a cell with %%capture that usually looks like some-resource created. To that end, let's define a helper function:

In [3]:
import re

from IPython.utils.capture import CapturedIO


def get_resource(captured_io: CapturedIO) -> str:
    """
    Gets a resource name from `kubectl apply -f <configuration.yaml>`.

    :param str captured_io: Output captured by using `%%capture` cell magic
    :return: Name of the Kubernetes resource
    :rtype: str
    :raises Exception: if the resource could not be created
    """
    out = captured_io.stdout
    matches = re.search(r"^(.+)\s+created", out)
    if matches is not None:
        return matches.group(1)
    else:
        raise Exception(f"Cannot get resource as its creation failed: {out}. It may already exist.")

#### Load and Inspect the Data¶

In [4]:
import pandas as  pd
data = pd.read_csv("https://raw.githubusercontent.com/Soot3/testing/master/Invistico_Airline.csv")
data.head()

Unnamed: 0,satisfaction,Gender,Customer Type,Age,Type of Travel,Class,Flight Distance,Seat comfort,Departure/Arrival time convenient,Food and drink,...,Online support,Ease of Online booking,On-board service,Leg room service,Baggage handling,Checkin service,Cleanliness,Online boarding,Departure Delay in Minutes,Arrival Delay in Minutes
0,satisfied,Female,Loyal Customer,65,Personal Travel,Eco,265,0,0,0,...,2,3,3,0,3,5,3,2,0,0.0
1,satisfied,Male,Loyal Customer,47,Personal Travel,Business,2464,0,0,0,...,2,3,4,4,4,2,3,2,310,305.0
2,satisfied,Female,Loyal Customer,15,Personal Travel,Eco,2138,0,0,0,...,2,2,3,3,4,4,4,2,0,0.0
3,satisfied,Female,Loyal Customer,60,Personal Travel,Eco,623,0,0,0,...,3,1,1,0,1,4,1,3,0,0.0
4,satisfied,Female,Loyal Customer,70,Personal Travel,Eco,354,0,0,0,...,4,2,2,0,2,4,2,5,0,0.0


#### Train the Model in the Notebook

We want to train the model in a distributed fashion, we put all the code in a single cell. That way we can save the file and include it in a container image:

In [5]:
import argparse
import logging
import json
import os
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

from numpy.random import seed

import tensorflow as tf
tf.random.set_seed(221)
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization
from tensorflow.keras.optimizers import SGD, Adam, RMSprop

logging.basicConfig(
    format="%(asctime)s %(levelname)-8s %(message)s",
    datefmt="%Y-%m-%dT%H:%M:%SZ",
    level=logging.INFO)

# Import dependencies
import datetime
from tensorflow.keras import callbacks

# Create new callback
class MyHistory(callbacks.Callback):
    """Adapted from https://github.com/keras-team/keras/blob/master/keras/callbacks/callbacks.py#L614"""

    def on_train_begin(self, logs=None):
        self.epoch = []
        self.timestamps = []
        self.history = {}

    def on_epoch_end(self, epoch, logs=None):
        logs = logs or {}
        self.epoch.append(epoch)
        self.timestamps.append(datetime.datetime.now())
        for k, v in logs.items():
            self.history.setdefault(k, []).append(v)


def make_datasets_unbatched():
    df = pd.read_csv("https://raw.githubusercontent.com/Soot3/testing/master/Invistico_Airline.csv")
    #drop rows with missing values
    df.dropna(inplace=True)
    #new column total delay
    df['total_delay'] = df['Departure Delay in Minutes'] + df['Arrival Delay in Minutes']
    
    #drop 'Departure Delay in Minutes',and 'Arrival Delay in Minutes'
    df.drop(columns=['Departure Delay in Minutes','Arrival Delay in Minutes'], inplace=True)
    
        #satisfied and dissatisfied in number 
    satisfaction_map = {"satisfied": 1,"dissatisfied": 0 }
    df['satisfaction']  = df['satisfaction'].map(satisfaction_map)

    #Male and Female in number 
    Gender_map = {"Male": 1,"Female": 2 }
    df['Gender']  = df['Gender'].map(Gender_map)

    #Loyal and disloyal in number 
    Customer_Type_map = {"Loyal Customer": 1,"disloyal Customer": 0 }
    df['Customer Type']  = df['Customer Type'].map(Customer_Type_map)

    #Business travel and Business travel in number 
    Type_of_Travel_map = {"Business travel": 1,"Personal Travel": 2 }
    df['Type of Travel']  = df['Type of Travel'].map(Type_of_Travel_map)

    #Business and Eco and Eco plus in number 
    Class_map = {"Business": 1,"Eco": 3, "Eco Plus": 2 }
    df['Class']  = df['Class'].map(Class_map)

    cols = ['Flight Distance', 'total_delay', 'Checkin service', 'On-board service']

    Q1 = df[cols].quantile(0.25)
    Q3 = df[cols].quantile(0.75)
    IQR = Q3 - Q1

    df = df[~((df[cols] < (Q1 - 1.5 * IQR)) |(df[cols] > (Q3 + 1.5 * IQR))).any(axis=1)]   
    
    #Split dataset

    X = df.drop('satisfaction',axis=1)
    y = df['satisfaction'] 
    X_train,X_test, y_train, y_test = train_test_split(X,y, test_size = 0.3, random_state = 111)
    
    
    train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
    test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test))
    train = train_dataset.cache().shuffle(2000).repeat()
    return train, test_dataset

def model(args):
    seed(1)
    model = Sequential()
    model.add(Dense(100, activation='relu', input_dim=21))
    model.add(BatchNormalization())
    model.add(Dense(40, activation='relu'))
    model.add(Dropout(0.2))
    model.add(Dense(1, activation='sigmoid'))
    
    model.summary()
    opt = args.optimizer
    model.compile(optimizer=opt,
                loss='binary_crossentropy',
                metrics=['accuracy'])
    tf.keras.backend.set_value(model.optimizer.learning_rate, args.learning_rate)
    return model


def main(args):
    # MultiWorkerMirroredStrategy creates copies of all variables in the model's
    # layers on each device across all workers
    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
        communication=tf.distribute.experimental.CollectiveCommunication.AUTO)
    logging.debug(f"num_replicas_in_sync: {strategy.num_replicas_in_sync}")
    BATCH_SIZE_PER_REPLICA = args.batch_size
    BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
    
    # Datasets need to be created after instantiation of `MultiWorkerMirroredStrategy`
    train_dataset, test_dataset = make_datasets_unbatched()
    train_dataset = train_dataset.batch(batch_size=BATCH_SIZE)
    test_dataset = test_dataset.batch(batch_size=BATCH_SIZE)

    # See: https://www.tensorflow.org/api_docs/python/tf/data/experimental/DistributeOptions
    options = tf.data.Options()
    options.experimental_distribute.auto_shard_policy = \
    tf.data.experimental.AutoShardPolicy.DATA
    
    train_datasets_sharded  = train_dataset.with_options(options)
    test_dataset_sharded = test_dataset.with_options(options)
    
    # Instantiate callback
    myHistory = MyHistory()
    
    with strategy.scope():
        # Model building/compiling need to be within `strategy.scope()`.
        multi_worker_model = model(args)
        # Keras' `model.fit()` trains the model with specified number of epochs and
        # number of steps per epoch. 
        multi_worker_model.fit(train_datasets_sharded,
                         epochs=100,
                         steps_per_epoch=100, callbacks = [myHistory])

        eval_loss, eval_acc = multi_worker_model.evaluate(test_dataset_sharded, 
                                                    verbose=0, steps=70)
        # Log metrics for Katib
        logging.info("loss={:.4f}".format(eval_loss))
        for i in myHistory.history['accuracy']:
          logging.info('accuracy = {:.4f}'.format(i))

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument("--batch_size",
                      type=int,
                      default=79,
                      metavar="N",
                      help="Batch size for training (default: 79)")
    parser.add_argument("--learning_rate", 
                      type=float,  
                      default=0.01,
                      metavar="N",
                      help='Initial learning rate')
    parser.add_argument("--optimizer", 
                      type=str, 
                      default='adam',
                      metavar="N",
                      help='optimizer')
    parsed_args, _ = parser.parse_known_args()
    main(parsed_args)m',
                      metavar="N",
                      help='optimizer')
    parsed_args, _ = parser.parse_known_args()
    main(parsed_args)

Overwriting tfjobairline.py


#### That saves the file as defined by TRAINER_FILE but it does not run it.

Let's see if our code is correct by running it from within our notebook:

In [6]:
%run $TRAINER_FILE --optimizer 'adam'

Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead


Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead


INFO:tensorflow:Using MirroredStrategy with devices ('/device:CPU:0',)


2021-07-19T14:43:27Z INFO     Using MirroredStrategy with devices ('/device:CPU:0',)


INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO


2021-07-19T14:43:27Z INFO     Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
2021-07-19T14:43:28Z INFO     NumExpr defaulting to 2 threads.


Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense (Dense)                (None, 100)               2200      
_________________________________________________________________
batch_normalization (BatchNo (None, 100)               400       
_________________________________________________________________
dense_1 (Dense)              (None, 40)                4040      
_________________________________________________________________
dropout (Dropout)            (None, 40)                0         
_________________________________________________________________
dense_2 (Dense)              (None, 1)                 41        
Total params: 6,681
Trainable params: 6,481
Non-trainable params: 200
_________________________________________________________________
Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/1

2021-07-19T14:44:48Z INFO     loss=0.8553
2021-07-19T14:44:48Z INFO     accuracy=0.7682


#### Create a Distributed TFJob


For large training jobs, we wish to run our trainer in a distributed mode. Once the notebook server cluster can access the Docker image from the registry, we can launch a distributed TF job.

The specification for a distributed TFJob is defined using YAML:

In [7]:
%%writefile $KUBERNETES_FILE
apiVersion: "kubeflow.org/v1"
kind: "TFJob"
metadata:
    name: "air4line"
    namespace: sooter # your-user-namespace
spec:
    cleanPodPolicy: None
    tfReplicaSpecs:
        Worker:
            replicas: 2
            restartPolicy: OnFailure
            template:
                metadata:
                    annotations:
                        sidecar.istio.io/inject: "false"
                spec:
                    containers:
                    - name: tensorflow
                      # modify this property if you would like to use a custom image
                     image: mavencodev7/tfjob_airline:v.0.3
                      command:
                          - "python"
                          - "/tfjobairline.py"
                          - "--batch_size=79"
                          - "--learning_rate=0.01"
                          - "--optimizer=adam"

Writing tfjob-airline.yaml


#### Let's deploy the distributed training job:

In [49]:
%%capture tf_output --no-stderr
! kubectl create -f $KUBERNETES_FILE

In [50]:
TF_JOB = get_resource(tf_output)

##### To see the job status, use the following command:

In [51]:
! kubectl describe $TF_JOB

Name:         airlineearly1
Namespace:    sooter
Labels:       <none>
Annotations:  <none>
API Version:  kubeflow.org/v1
Kind:         TFJob
Metadata:
  Creation Timestamp:  2021-07-19T17:59:49Z
  Generation:          1
  Managed Fields:
    API Version:  kubeflow.org/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f:spec:
        .:
        f:tfReplicaSpecs:
          .:
          f:Worker:
            .:
            f:replicas:
            f:restartPolicy:
            f:template:
              .:
              f:metadata:
                .:
                f:annotations:
                  .:
                  f:sidecar.istio.io/inject:
              f:spec:
    Manager:      kubectl-create
    Operation:    Update
    Time:         2021-07-19T17:59:49Z
    API Version:  kubeflow.org/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f:spec:
        f:runPolicy:
          .:
          f:cleanPodPolicy:
        f:successPolicy:
        f:tfReplicaSpecs:
          f:Worker:
           

In [53]:
! kubectl get pods -l job-name=airlineearly1

NAME                     READY   STATUS              RESTARTS   AGE
airlineearly1-worker-0   1/1     Running             0          16s
airlineearly1-worker-1   0/1     ContainerCreating   0          15s


In [54]:
! kubectl get events --sort-by='.lastTimestamp' | tail

20s         Normal    SuccessfulCreateService   tfjob/airlineearly1                                                   Created service: airlineearly1-worker-0
20s         Normal    SuccessfulCreatePod       tfjob/airlineearly1                                                   Created pod: airlineearly1-worker-1
18s         Normal    Pulling                   pod/airlineearly1-worker-1                                            Pulling image "mavencodevv/tfjob_airline:v.0.9"
9s          Normal    Started                   pod/airlineearly1-worker-0                                            Started container tensorflow
9s          Normal    Created                   pod/airlineearly1-worker-0                                            Created container tensorflow
9s          Normal    Pulled                    pod/airlineearly1-worker-0                                            Successfully pulled image "mavencodevv/tfjob_airline:v.0.9" in 10.399618359s
5s          Normal    Pulled     

To stream logs from the worker-0 pod to check the training progress, run the following command:

In [55]:
! kubectl logs -f airlineearly1-worker-0

2021-07-19 18:00:07.321864: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2021-07-19 18:00:07.322475: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead
Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead
2021-07-19 18:00:09.077932: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-07-19 18:00:09.078558: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2021-07-19 18:00:09.078598: W tensorflow/stream_executor/cuda/cuda_driver.cc:326] failed call to cuI

#### To delete the job, run the following command:

In [47]:
! kubectl delete $TF_JOB

tfjob.kubeflow.org "airlineearly" deleted


#### Check to see if the check to see if the pod is still up and running

In [48]:
#! kubectl -n sooter logs -f airline