**Check that youre using the right TensorFlow image**

In [1]:
pip list | grep tensorflow 

tensorflow               2.1.0
tensorflow-estimator     2.1.0
Note: you may need to restart the kernel to use updated packages.


**To package the trainer in a container image, we need a file on the cluster that contains the code as well as a file with the resource definition of the job for the Kubernetes cluster:**

In [2]:
TRAINER_FILE = "tfjob-hotel-demand.py"
KUBERNETES_FILE = "tfjob-hotel-demand.yaml"

**We also want to capture output from a cell with %%capture that usually looks like some-resource created. To that end, let's define a helper function:**

In [3]:
import re

from IPython.utils.capture import CapturedIO


def get_resource(captured_io: CapturedIO) -> str:
    """
    Gets a resource name from `kubectl apply -f <configuration.yaml>`.

    :param str captured_io: Output captured by using `%%capture` cell magic
    :return: Name of the Kubernetes resource
    :rtype: str
    :raises Exception: if the resource could not be created
    """
    out = captured_io.stdout
    matches = re.search(r"^(.+)\s+created", out)
    if matches is not None:
        return matches.group(1)
    else:
        raise Exception(f"Cannot get resource as its creation failed: {out}. It may already exist.")

In [4]:
import pandas as  pd
df = pd.read_csv("https://raw.githubusercontent.com/charlesa101/KubeflowUseCases/draft/Hotel%20bookings%20demand/hotel_bookings.csv?token=AQEY3DFJCQCART4U4QXWS6TA6HBYM")
df.head()

Unnamed: 0,hotel,is_canceled,lead_time,arrival_date_year,arrival_date_month,arrival_date_week_number,arrival_date_day_of_month,stays_in_weekend_nights,stays_in_week_nights,adults,...,deposit_type,agent,company,days_in_waiting_list,customer_type,adr,required_car_parking_spaces,total_of_special_requests,reservation_status,reservation_status_date
0,Resort Hotel,0,342,2015,July,27,1,0,0,2,...,No Deposit,,,0,Transient,0.0,0,0,Check-Out,2015-07-01
1,Resort Hotel,0,737,2015,July,27,1,0,0,2,...,No Deposit,,,0,Transient,0.0,0,0,Check-Out,2015-07-01
2,Resort Hotel,0,7,2015,July,27,1,0,1,1,...,No Deposit,,,0,Transient,75.0,0,0,Check-Out,2015-07-02
3,Resort Hotel,0,13,2015,July,27,1,0,1,1,...,No Deposit,304.0,,0,Transient,75.0,0,0,Check-Out,2015-07-02
4,Resort Hotel,0,14,2015,July,27,1,0,2,2,...,No Deposit,240.0,,0,Transient,98.0,0,1,Check-Out,2015-07-03


In [5]:
%%writefile $TRAINER_FILE
import argparse
import logging
import json
import os
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

import numpy as np
import pandas as pd
import datetime

from sklearn.model_selection import train_test_split as tts
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import LabelEncoder

from numpy.random import seed

import tensorflow as tf
tf.random.set_seed(221)
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization
from tensorflow.keras.optimizers import SGD, Adam, RMSprop

logging.getLogger().setLevel(logging.INFO)

def make_datasets_unbatched():
  df = pd.read_csv("https://raw.githubusercontent.com/charlesa101/KubeflowUseCases/draft/Hotel%20bookings%20demand/hotel_bookings.csv?token=AQEY3DFJCQCART4U4QXWS6TA6HBYM")
  df.head()

  # Examine the columns with missing values
  df_null = df.isnull().sum()
  df_null[df_null.values > 0].sort_values(ascending=False)

  # drop missing values
  df = df.drop(['company', 'agent'], axis=1)
  df = df.dropna(subset=['country', 'children'], axis=0)
  df = df.reset_index(drop=True)

  # Converting wrong datatype columns to correct type (object to datetime)
  df['reservation_status_date'] = pd.to_datetime(df['reservation_status_date'])

  # Converting string month to numerical one (Dec = 12, Jan = 1, etc.)
  datetime_object = df['arrival_date_month'].str[0:3]
  month_number = np.zeros(len(datetime_object))

  # Creating a new column based on numerical representation of the months
  for i in range(0, len(datetime_object)):
    datetime_object[i] = datetime.datetime.strptime(datetime_object[i], "%b")
    month_number[i] = datetime_object[i].month

  # Float to integer conversion
  month_number = pd.DataFrame(month_number).astype(int)

  # 3 columns merged into one
  df['arrival_date'] = df['arrival_date_year'].map(str) + '-' + month_number[0].map(str) + '-' \
                       + df['arrival_date_day_of_month'].map(str)
  # Dropping already used columns
  df = df.drop(['arrival_date_year', 'arrival_date_month', 'arrival_date_day_of_month',
                  'arrival_date_week_number'], axis=1)
  # convert the newly created arrival_date feature to datetime type
  df['arrival_date'] = pd.to_datetime(df['arrival_date'])

  # Calculating total guests by combining adults, children and babies columns
  df['total guests'] = df['adults'] + df['children'] + df['babies']

  # drop data points that include zero Total Guests
  df = df[df['total guests'] != 0]

  # Total Number of Days Stayed
  df['total stays'] = df['stays_in_weekend_nights'] + df['stays_in_week_nights']

  dataNoCancel = df[df['is_canceled'] == 0]
  dataNoCancel = dataNoCancel.reset_index(drop=True)

  df = df.drop(['adults', 'children', 'babies', 'stays_in_weekend_nights', 'stays_in_week_nights', 'arrival_date', 'reservation_status_date'], axis=1)

  # Categorical variables preprocessing with label encoding
  list_1 = list(df.columns)
  cate_list=[]
  for i in list_1:
    if df[i].dtype=='object':
      cate_list.append(i)
  # transform the categorical variables with label encoder
  le = LabelEncoder()
  for i in cate_list:
    df[i] = le.fit_transform(df[i])
    
  # split the data into dependent variables and independent variable
  X = df.drop(['hotel'],axis=1)
  y = df.hotel

  # split the data into training and test set
  X_train,X_test,y_train,y_test = tts(X,y,random_state=36,test_size=0.3)

  # scale the data
  scaler = StandardScaler()
  X_train = scaler.fit_transform(X_train)
  X_test = scaler.fit_transform(X_test)

  train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
  test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test))
  train = train_dataset.cache().shuffle(2000).repeat()
  return train, test_dataset

def model(args):
  seed(1)
  model = Sequential()
  model.add(Dense(10, activation='relu', input_dim=21))
  #model.add(BatchNormalization())
  model.add(Dense(10, activation='relu'))
  #model.add(Dropout(0.2))
  model.add(Dense(1, activation='sigmoid'))
    
  model.summary()
  opt = args.optimizer
  model.compile(optimizer=opt,
                loss='binary_crossentropy',
                metrics=['accuracy'])
  tf.keras.backend.set_value(model.optimizer.learning_rate, args.learning_rate)
  return model

def main(args):
  # MultiWorkerMirroredStrategy creates copies of all variables in the model's
  # layers on each device across all workers

  strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
  communication=tf.distribute.experimental.CollectiveCommunication.AUTO)
  logging.debug(f"num_replicas_in_sync: {strategy.num_replicas_in_sync}")
  BATCH_SIZE_PER_REPLICA = args.batch_size
  BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
    
  # Datasets need to be created after instantiation of `MultiWorkerMirroredStrategy`
  train_dataset, test_dataset = make_datasets_unbatched()
  train_dataset = train_dataset.batch(batch_size=BATCH_SIZE)
  test_dataset = test_dataset.batch(batch_size=BATCH_SIZE)

  # See: https://www.tensorflow.org/api_docs/python/tf/data/experimental/DistributeOptions
  options = tf.data.Options()
  options.experimental_distribute.auto_shard_policy = \
  tf.data.experimental.AutoShardPolicy.DATA
    
  train_datasets_sharded = train_dataset.with_options(options)
  test_dataset_sharded = test_dataset.with_options(options)

  with strategy.scope():
    # Model building/compiling need to be within `strategy.scope()`.
    multi_worker_model = model(args)

    # Keras' `model.fit()` trains the model with specified number of epochs and
    # number of steps per epoch. 
    multi_worker_model.fit(train_datasets_sharded,
                         epochs=100,
                         steps_per_epoch=30)

    eval_loss, eval_acc = multi_worker_model.evaluate(test_dataset_sharded, 
                                                    verbose=0, steps=10)
    # Log metrics for Katib
    logging.info("loss={:.4f}".format(eval_loss))
    logging.info("accuracy={:.4f}".format(eval_acc))

if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  parser.add_argument("--batch_size",
                      type=int,
                      default=32,
                      metavar="N",
                      help="Batch size for training (default: 128)")
  parser.add_argument("--learning_rate", 
                      type=float,  
                      default=0.1,
                      metavar="N",
                      help='Initial learning rate')
  parser.add_argument("--optimizer", 
                      type=str, 
                      default='adam',
                      metavar="N",
                      help='optimizer')
  parsed_args, _ = parser.parse_known_args()
  main(parsed_args)

Overwriting tfjob-hotel-demand.py


In [6]:
%run $TRAINER_FILE --optimizer 'adam'

INFO:tensorflow:Using MirroredStrategy with devices ('/device:CPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CollectiveCommunication.AUTO
Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense (Dense)                (None, 10)                220       
_________________________________________________________________
dense_1 (Dense)              (None, 10)                110       
_________________________________________________________________
dense_2 (Dense)              (None, 1)                 11        
Total params: 341
Trainable params: 341
Non-trainable params: 0
_________________________________________________________________
Train for 30 steps
Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 1

INFO:root:loss=0.4025
INFO:root:accuracy=0.8156


**Create a distributed TFJob**

For large training jobs, we wish to run our trainer in a distributed mode. Once the notebook server cluster can access the Docker image from the registry, we can launch a distributed TF job.

The specification for a distributed TFJob is defined using YAML:

In [7]:
%%writefile $KUBERNETES_FILE
apiVersion: "kubeflow.org/v1"
kind: "TFJob"
metadata:
  name: "hotel-booking"
  namespace: tolu
spec:
  cleanPodPolicy: None
  tfReplicaSpecs:
    Worker:
      replicas: 2
      restartPolicy: OnFailure
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
        spec:
          containers:
          - name: tensorflow
            # modify this property if you would like to use a custom image
            image: mavencodev/tf_hotel:v.0.1 # put the correct image
            command:
                - "python"
                - "/tfjob-hotel-demand.py"
                - "--batch_size=64"
                - "--learning_rate=0.1"
                - "--optimizer=adam"

Writing tfjob-hotel-demand.yaml


**Deploy the distributed training job**

In [8]:
%%capture tf_output --no-stderr
! kubectl create -f $KUBERNETES_FILE

In [9]:
TF_JOB = get_resource(tf_output)

**Check the job status using the following command**

In [10]:
! kubectl describe $TF_JOB

Name:         hotel-booking
Namespace:    tolu
Labels:       <none>
Annotations:  <none>
API Version:  kubeflow.org/v1
Kind:         TFJob
Metadata:
  Creation Timestamp:  2021-07-13T13:46:29Z
  Generation:          1
  Managed Fields:
    API Version:  kubeflow.org/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f:spec:
        .:
        f:cleanPodPolicy:
        f:tfReplicaSpecs:
          .:
          f:Worker:
            .:
            f:replicas:
            f:restartPolicy:
            f:template:
              .:
              f:metadata:
                .:
                f:annotations:
                  .:
                  f:sidecar.istio.io/inject:
              f:spec:
    Manager:      kubectl
    Operation:    Update
    Time:         2021-07-13T13:46:29Z
    API Version:  kubeflow.org/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f:spec:
        f:tfReplicaSpecs:
          f:Worker:
            f:template:
              f:m

In [12]:
!kubectl get pods -l job-name=hotel-booking

NAME                     READY   STATUS              RESTARTS   AGE
hotel-booking-worker-0   0/1     ContainerCreating   0          61s
hotel-booking-worker-1   0/1     ContainerCreating   0          60s


In [13]:
! kubectl get events --sort-by='.lastTimestamp' | tail

73s         Normal   SuccessfulCreatePod       tfjob/hotel-booking          Created pod: hotel-booking-worker-1
73s         Normal   Scheduled                 pod/hotel-booking-worker-1   Successfully assigned tolu/hotel-booking-worker-1 to instance-1
73s         Normal   SuccessfulCreateService   tfjob/hotel-booking          Created service: hotel-booking-worker-1
72s         Normal   Pulling                   pod/hotel-booking-worker-1   Pulling image "mavencodev/tf_hotel:v.0.1"
6s          Normal   Pulled                    pod/hotel-booking-worker-0   Successfully pulled image "mavencodev/tf_hotel:v.0.1"
5s          Normal   Created                   pod/hotel-booking-worker-1   Created container tensorflow
5s          Normal   Started                   pod/hotel-booking-worker-1   Started container tensorflow
5s          Normal   Started                   pod/hotel-booking-worker-0   Started container tensorflow
5s          Normal   Created                   pod/hotel-book

In [14]:
! kubectl logs -f hotel-booking-worker-0

2021-07-13 13:47:40.646571: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2021-07-13 13:47:40.646622: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead
2021-07-13 13:47:42.618392: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-07-13 13:47:42.618748: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2021-07-13 13:47:42.618778: W tensorflow/stream_executor/cuda/cuda_driver.cc:326] failed call to cuInit: UNKNOWN ERROR (303)
2021-07-13 13:47:42.618828: I tensorflow/strea

**Run the following command to delete the job**

In [15]:
#! kubectl delete $TF_JOB

tfjob.kubeflow.org "hotel-booking" deleted


**Check to see if the pod is still up and running**

In [16]:
#! kubectl -n tolu logs -f hotel-booking

Error from server (NotFound): pods "hotel-booking" not found
