# Training on CHURN Dataset using Tensorflow Operator

## Prerequisites
Before we proceed, let's check that we're using the right image, that is, [TensorFlow](https://www.tensorflow.org/api_docs/) is available:

In [None]:
#! pip3 list | grep tensorflow 
! pip3 install --user tensorflow==2.4.0
! pip3 install --user ipywidgets nbconvert
!python -m pip install --user --upgrade pip
!pip3 install pandas scikit-learn keras tensorflow-datasets --user

To package the trainer in a container image, we shall need a file (on our cluster) that contains the code as well as a file with the resource definitition of the job for the Kubernetes cluster:

In [1]:
TRAINER_FILE = "tfjobtitan.py"
KUBERNETES_FILE = "tfjob-titan.yaml"

We also want to capture output from a cell with [`%%capture`](https://ipython.readthedocs.io/en/stable/interactive/magics.html#cellmagic-capture) that usually looks like `some-resource created`.
To that end, let's define a helper function:

In [2]:
import re

from IPython.utils.capture import CapturedIO


def get_resource(captured_io: CapturedIO) -> str:
    """
    Gets a resource name from `kubectl apply -f <configuration.yaml>`.

    :param str captured_io: Output captured by using `%%capture` cell magic
    :return: Name of the Kubernetes resource
    :rtype: str
    :raises Exception: if the resource could not be created
    """
    out = captured_io.stdout
    matches = re.search(r"^(.+)\s+created", out)
    if matches is not None:
        return matches.group(1)
    else:
        raise Exception(f"Cannot get resource as its creation failed: {out}. It may already exist.")

## How to Load and Inspect the Data

In [3]:
import pandas as  pd

data = pd.read_csv("https://raw.githubusercontent.com/MavenCode/KubeflowTraining/master/Day2/KubeflowComponentsAndPipeline/Labs/6_minio/titanic/datasets/train.csv")
data.head()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S


## How to Train the Model in the Notebook
We want to train the model in a distributed fashion, we put all the code in a single cell.
That way we can save the file and include it in a container image:

In [4]:
%%writefile $TRAINER_FILE
import argparse
import logging
import json
import os
import re
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

import numpy as np
import pandas as pd
# splitting the data
from sklearn.model_selection import train_test_split
# Standardization - feature scaling
from sklearn.preprocessing import StandardScaler
# data encoding
from sklearn.preprocessing import LabelEncoder

import tensorflow_datasets as tfds
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models
from tensorflow.keras.layers import Dense, Flatten 
from tensorflow.keras.optimizers import SGD, Adam, RMSprop

logging.getLogger().setLevel(logging.INFO)




def make_datasets_unbatched():
  data = pd.read_csv("https://raw.githubusercontent.com/MavenCode/KubeflowTraining/master/Day2/KubeflowComponentsAndPipeline/Labs/6_minio/titanic/datasets/train.csv")

  #preprocessing
  data['relatives'] = data['SibSp'] + data['Parch']
  data.loc[data['relatives'] > 0, 'not_alone'] = 0
  data.loc[data['relatives'] == 0, 'not_alone'] = 1
  data['not_alone'] = data['not_alone'].astype(int)

  # drop columns with high cardinality
  data = data.drop(['PassengerId', 'Name', 'Ticket'], axis=1)

  #dealing with missing data in cabin feature
  deck = {"A": 1, "B": 2, "C": 3, "D": 4, "E": 5, "F": 6, "G": 7, "U": 8}

  data['Cabin'] = data['Cabin'].fillna("U0")
  data['Deck'] = data['Cabin'].map(lambda x: re.compile("([a-zA-Z]+)").search(x).group())
  data['Deck'] = data['Deck'].map(deck)
  data['Deck'] = data['Deck'].fillna(0)
  data['Deck'] = data['Deck'].astype(int)
  # we can now drop the cabin feature
  data = data.drop(['Cabin'], axis=1)

  #dealing with missing data in age feature
  data["Age"] = data["Age"].fillna(data["Age"].mean())

  #dealing with missing data in emabrk feature
  # fill with most common value
  common_value = 'S'
  data['Embarked'] = data['Embarked'].fillna(common_value)

  # encode categorical variables
  data = pd.get_dummies(data)

  X=data.drop("Survived",axis=1)
  y=data.Survived

    
  # split the data
  X_train,X_test,y_train,y_test = train_test_split( X,y, test_size=0.2, random_state = 10)
  train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
  test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test))
  train = train_dataset.cache().shuffle(100).repeat()
  return train, test_dataset


def model(args):
  model = models.Sequential()
  model.add(Dense(units =20, activation='relu', input_dim=13))
  model.add(Dense(units =1, activation='sigmoid'))

  model.summary()
  opt = args.optimizer
  model.compile(optimizer=opt,
                loss='binary_crossentropy',
                metrics=['accuracy'])
  tf.keras.backend.set_value(model.optimizer.learning_rate, args.learning_rate)
  return model


def main(args):
  # MultiWorkerMirroredStrategy creates copies of all variables in the model's
  # layers on each device across all workers
  strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
      communication=tf.distribute.experimental.CollectiveCommunication.AUTO)
  logging.debug(f"num_replicas_in_sync: {strategy.num_replicas_in_sync}")
  BATCH_SIZE_PER_REPLICA = args.batch_size
  BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

  # Datasets need to be created after instantiation of `MultiWorkerMirroredStrategy`
  train_dataset, test_dataset = make_datasets_unbatched()
  train_dataset = train_dataset.batch(batch_size=BATCH_SIZE)
  test_dataset = test_dataset.batch(batch_size=BATCH_SIZE)

  # See: https://www.tensorflow.org/api_docs/python/tf/data/experimental/DistributeOptions
  options = tf.data.Options()
  options.experimental_distribute.auto_shard_policy = \
        tf.data.experimental.AutoShardPolicy.DATA

  train_datasets_sharded  = train_dataset.with_options(options)
  test_dataset_sharded = test_dataset.with_options(options)

  with strategy.scope():
    # Model building/compiling need to be within `strategy.scope()`.
    multi_worker_model = model(args)

  # Keras' `model.fit()` trains the model with specified number of epochs and
  # number of steps per epoch. 
  multi_worker_model.fit(train_datasets_sharded,
                         epochs=12,
                         steps_per_epoch=5)
  
  eval_loss, eval_acc = multi_worker_model.evaluate(test_dataset_sharded, 
                                                    verbose=0, steps=10)

  # Log metrics for Katib
  logging.info("loss={:.4f}".format(eval_loss))
  logging.info("accuracy={:.4f}".format(eval_acc))


if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  parser.add_argument("--batch_size",
                      type=int,
                      default=12,
                      metavar="N",
                      help="Batch size for training (default: 128)")
  parser.add_argument("--learning_rate", 
                      type=float,  
                      default=0.001,
                      metavar="N",
                      help='Initial learning rate')
  parser.add_argument("--optimizer", 
                      type=str, 
                      default='adam',
                      metavar="N",
                      help='optimizer')

  parsed_args, _ = parser.parse_known_args()
  main(parsed_args)

Overwriting tfjobtitan.py


That saves the file as defined by `TRAINER_FILE` but it does not run it.

Let's see if our code is correct by running it from within our notebook:

In [5]:
%run $TRAINER_FILE --optimizer 'sgd'

Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead
INFO:tensorflow:Using MirroredStrategy with devices ('/device:CPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO


INFO:numexpr.utils:NumExpr defaulting to 2 threads.


Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense (Dense)                (None, 20)                280       
_________________________________________________________________
dense_1 (Dense)              (None, 1)                 21        
Total params: 301
Trainable params: 301
Non-trainable params: 0
_________________________________________________________________
Epoch 1/12
Epoch 2/12
Epoch 3/12
Epoch 4/12
Epoch 5/12
Epoch 6/12
Epoch 7/12
Epoch 8/12
Epoch 9/12
Epoch 10/12
Epoch 11/12
Epoch 12/12


INFO:root:loss=0.7736
INFO:root:accuracy=0.6583


## How to Create a Docker Image Manually


The Dockerfile looks as follows:

```
  
FROM tensorflow/tensorflow:2.4.0
RUN pip install tensorflow_datasets pandas scikit-learn keras
COPY tfjobchurn.py /
ENTRYPOINT ["python", "/tfjobchurn.py", "--batch_size", "10", "--learning_rate", "0.001", "--optimizer", "sgd"]
```


Then it's easy to push images to your container registry:

```bash
docker build -t <docker_image_name_with_tag> .
docker push <docker_image_name_with_tag>
```

The image is available as `mavencodev/tf_jobtitanic:1.0` in case you want to skip it for now.

## How to Create a Distributed `TFJob`
For large training jobs, we wish to run our trainer in a distributed mode.
Once the notebook server cluster can access the Docker image from the registry, we can launch a distributed PyTorch job.

The specification for a distributed `TFJob` is defined using YAML:

In [6]:
%%writefile $KUBERNETES_FILE
apiVersion: "kubeflow.org/v1"
kind: "TFJob"
metadata:
  name: "churn"
  namespace: demo01 # your-user-namespace
spec:
  cleanPodPolicy: None
  tfReplicaSpecs:
    Worker:
      replicas: 2
      restartPolicy: OnFailure
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
        spec:
          containers:
          - name: tensorflow
            # modify this property if you would like to use a custom image
            image: mavencodev/tf_jobtitanic:1.0
            command:
                - "python"
                - "/tfjobchurn.py"
                - "--batch_size=10"
                - "--learning_rate=0.001"
                - "--optimizer=sgd"

Overwriting tfjob-titan.yaml


Let's deploy the distributed training job:

In [None]:
%%capture tf_output --no-stderr
! kubectl create -f $KUBERNETES_FILE

In [None]:
TF_JOB = get_resource(tf_output)

To see the job status, use the following command:

In [None]:
! kubectl describe $TF_JOB

Name:         churn
Namespace:    demo01
Labels:       <none>
Annotations:  <none>
API Version:  kubeflow.org/v1
Kind:         TFJob
Metadata:
  Creation Timestamp:  2021-03-23T03:01:50Z
  Generation:          1
  Managed Fields:
    API Version:  kubeflow.org/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f:spec:
        .:
        f:cleanPodPolicy:
        f:tfReplicaSpecs:
          .:
          f:Worker:
            .:
            f:replicas:
            f:restartPolicy:
            f:template:
              .:
              f:metadata:
                .:
                f:annotations:
                  .:
                  f:sidecar.istio.io/inject:
              f:spec:
    Manager:      kubectl
    Operation:    Update
    Time:         2021-03-23T03:01:50Z
    API Version:  kubeflow.org/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f:spec:
        f:successPolicy:
        f:tfReplicaSpecs:
          f:Worker:
            f:template

You should now be able to see the created pods matching the specified number of workers.

In [None]:
! kubectl get pods -l job-name=churn

NAME             READY   STATUS      RESTARTS   AGE
churn-worker-0   0/1     Completed   0          60s
churn-worker-1   0/1     Completed   0          60s


In case of issues, it may be helpful to see the last ten events within the cluster:

```bash
! kubectl get events --sort-by='.lastTimestamp' | tail
```

To stream logs from the worker-0 pod to check the training progress, run the following command:

In [None]:
! kubectl logs -f churn-worker-0

2021-03-23 03:02:26.525110: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2021-03-23 03:02:26.525145: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead
2021-03-23 03:02:28.311728: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-03-23 03:02:28.311957: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2021-03-23 03:02:28.311979: W tensorflow/stream_executor/cuda/cuda_driver.cc:326] failed call to cuInit: UNKNOWN ERROR (303)
2021-03-23 03:02:28.312006: I tensorflow/strea

To delete the job, run the following command:

In [None]:
! kubectl delete $TF_JOB

tfjob.kubeflow.org "churn" deleted


Check to see if the check to see if the pod is still up and running 

In [None]:
! kubectl -n demo01 logs -f churn