# Distributed training with TensorFlow

**Note that this training doesn't work well with codespaces and virtualized environments as actual CPU/GPU's are not readily available.**

Overview

tf.distribute.Strategy is a TensorFlow API to distribute training across multiple GPUs, multiple machines or TPUs. Using this API, you can distribute your existing models and training code with minimal code changes.

tf.distribute.Strategy has been designed with these key goals in mind:

    Easy to use and support multiple user segments, including researchers, ML engineers, etc.
    Provide good performance out of the box.
    Easy switching between strategies.

tf.distribute.Strategy can be used with a high-level API like Keras, and can also be used to distribute custom training loops (and, in general, any computation using TensorFlow).

In TensorFlow 2.0, you can execute your programs eagerly, or in a graph using tf.function. tf.distribute.Strategy intends to support both these modes of execution. Although we discuss training most of the time in this guide, this API can also be used for distributing evaluation and prediction on different platforms.

Reference: 
+ https://www.tensorflow.org/guide/distributed_training

In [1]:
# Python 3.7.3
############################################
# INCLUDES
############################################
#libraries specific to this example
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.backend import clear_session
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
# seed the pseudorandom number generator
from random import seed
from random import random
from random import randint

#a set of libraries that perhaps should always be in Python source
import os 
import datetime
import sys
import gc
import getopt
import inspect
import math
import warnings
import types

#Data Science Libraries
import numpy as np
import pandas as pd
import scipy as sp
import scipy.ndimage

#Plotting libraries
import matplotlib as mpl
import matplotlib.pyplot as plt

#a darn useful library for creating paths and one I recommend you load to your environment
from pathlib import Path

# can type in the python console `help(name of function)` to get the documentation
from pydoc import help                          

#Import a custom library, in this case a fairly useful logging framework
debug_lib_location = Path("./")
sys.path.append(str(debug_lib_location))
import debug

warnings.filterwarnings('ignore')               # don't print out warnings


root_location=".." + os.sep + "data";

2022-12-16 20:57:24.196473: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.10.1


In [2]:
#Turn on Eager Execution
#tf.enable_eager_execution()

In [3]:
############################################
#JUPYTER NOTEBOOK OUTPUT CONTROL / FORMATTING
############################################
#set floating point to 4 places to things don't run loose
pd.options.display.float_format = '{:,.4f}'.format
np.set_printoptions(precision=4)

# Variable declaration

In [4]:
############################################
# GLOBAL VARIABLES
############################################
DEBUG = 1                            #General ledger output so you know what's happening.
DEBUG_DATA = 1                       #Extremely verbose output, change to zero (0) to supress the volume of output.

# CODE CONSTRAINTS
VERSION_NAME    = "TensorFlowDistributed"
VERSION_ACRONYM = "ML-TFDIST"
VERSION_MAJOR   = 0
VERSION_MINOR   = 0
VERSION_RELEASE = "0b"
VERSION_TITLE   = VERSION_NAME + " (" + VERSION_ACRONYM + ") " + str(VERSION_MAJOR) + "." + str(VERSION_MINOR) + "." + str(VERSION_RELEASE) + " generated SEED."

ENCODING  ="utf-8"
############################################
# GLOBAL CONSTANTS
############################################
mpl.rcParams['figure.figsize'] = (8, 6)
mpl.rcParams['axes.grid'] = False

############################################
# APPLICATION VARIABLES
############################################

############################################
# GLOBAL CONFIGURATION
############################################
os.environ['PYTHONIOENCODING']=ENCODING


## General Function Declaration

In [5]:
############################################
# WARNING / ERROR Management
############################################
def fxn():
    warnings.warn("deprecated", DeprecationWarning)

with warnings.catch_warnings():
    warnings.simplefilter("ignore")
    fxn()


############################################
# FUNCTIONS
############################################

def prototype(incMonth):

    debug.msg_info("Entering {}.{}".format(__name__, inspect.stack()[0][3]))
    debug.msg_info("The month you passed in was " + str(incMonth))
    debug.msg_info("Exiting {}.{}".format(__name__, inspect.stack()[0][3]))
    return 1

def lib_diagnostics():
    debug.msg_debug("System version    #:{:>12}".format(sys.version))
    netcdf4_version_info = nc.getlibversion().split(" ")
    debug.msg_debug("netCDF4 version   #:{:>12}".format(netcdf4_version_info[0]))
    debug.msg_debug("Matplotlib version#:{:>12}".format(matplt.__version__))
    debug.msg_debug("Numpy version     #:{:>12}".format(np.__version__))
    debug.msg_debug("Pandas version    #:{:>12}".format(pd.__version__))
    debug.msg_debug("SciPy version     #:{:>12}".format(sp.__version__))

    return


def get_full_version():

    resultant = str(VERSION_NAME) + "  v" + str(VERSION_MAJOR) + "." + str(VERSION_MINOR) + "." + str(VERSION_RELEASE)
    return resultant

def get_version():

    resultant = str(VERSION_MAJOR) + "." + str(VERSION_MINOR) + "." + str(VERSION_RELEASE)
    return resultant

def printversion():

    print(get_full_version())

def printusage():

    print("")
    printversion()
    print("  -v, --version    prints the version of this software package.")
    print("")
    print("  * - indicates required argument.")

######################################################################
#Support routines to see columns in DataFrames
######################################################################
def show_columns_plain(inc_ary):
    new_ary = []
    for col in inc_ary:
        new_ary.append(np.char.lower(col))
    new_ary.sort
    myOutputString = " "
    for col in new_ary:
        myOutputString = myOutputString + " " + str(col)
    return myOutputString

def show_columns_true(inc_ary):
    new_ary = []
    for col in inc_ary:
        new_ary.append(col)
    new_ary.sort
    myOutputString = " "
    for col in new_ary:
        myOutputString = myOutputString + " " + str(col)
    return myOutputString

######################################################################
#Input Validation
######################################################################
# valid string:
#  We don't want the following:
#   - at the start of the file name (might be construed as a switch)
#  $, &, |, ;, <, >, `, !, *, ", \ (to start with)
###
def validstring(testsubject):

    if testsubject[0] == "-":
        return 0
    elif "$" in testsubject or "&" in testsubject or "|" in testsubject:
        return 0
    elif ";" in testsubject or "`" in testsubject or "!" in testsubject:
        return 0
    elif "*" in testsubject or '"' in testsubject or "\\" in testsubject:
        return 0
    else:
        return 1
        

In [6]:
# %load ./libs.py
#Title:     Displays the libraries in current use.
#Objective: Invocation is intended as function calls within another program.
#Assumptions:
#           1. Should be stored in standardized location such as:
#                      /p/home/{user_name}/usr/PYTHONLIB
#           2. Developer loads the module (Jupyter Lab).
#Pre-Requisites:
#           1. Python v3.*
#           2. Jupyter Lab / Notebook (%load libs.py)
#Usage:
#       %load libs.py
#       find_loaded_modules().HTML
#
#Version History:
# ------------------------------------------------------------------------
# Version   Date       Modification                              Author
# ------------------------------------------------------------------------
# 1.0       2020/04/29 Inception                                 Radiance
# ------------------------------------------------------------------------
# ------------------------------------------------------------------------


#######################################################################
#LIBRARIES
#######################################################################
import os
import types

def module_version(mod):
    '''Return version string for module *mod*, or nothing if
    it doesn't have a "version" or "__version__" attribute.'''
    version = []
    if hasattr(mod, '__dict__'):
        keys = []
        for key in mod.__dict__.keys():
            if key.lower() == 'version' or key.lower() == '__version__':
                v = mod.__dict__[key]
                if (str):
                    if isinstance(v, str):
                        version.append(v)
                else:
                    version.append("No version")
        if keys:
            print (mod, keys)
    if version:
        return ', '.join(version)
    else:
        return ''

def find_loaded_modules(only_versioned_modules=True):

    def list_of_lists_to_HTML(lists, header_row=None):
        '''Convert a list of a list of strings to a HTML table.'''
        s = '<table>'
        if header_row:
            s += '\n\t<tr>\n\t\t'
            s += ''.join(['<th>%s</th>' % item for item in header_row])
            s += '\n\t</tr>'
        for inner_list in lists:
            s += '\n\t<tr>\n\t\t'
            s += ''.join(['<td>%s</td>' % item for item in inner_list])
            s += '\n\t</tr>'
        s += '\n</table>'
        return s
    
    class LoadedModules(list):
        '''Very simple wrapper for a list of lists of strings, with an attribute
        for display in IPython Notebooks.'''
        def __init__(self, *args, **kwargs):
            list.__init__(self, *args, **kwargs)
            
        @property
        def HTML(self):
            from IPython.display import HTML
            return HTML(
                    list_of_lists_to_HTML(
                            self, header_row=['Name', 'Version']))
                    
    objs = LoadedModules()
    for i, mod in enumerate(globals().values()):
        if isinstance(mod, types.ModuleType):
            if hasattr(mod, '__name__'):
                name = mod.__name__
            else:
                name = ''
            
            version = module_version(mod)
            
            objs.append([mod.__name__, version])
    objs.sort(key=lambda r: r[0])
    return objs


In [7]:
find_loaded_modules().HTML

Name,Version
builtins,
builtins,
datetime,
debug,
gc,
getopt,
inspect,
math,
matplotlib,
matplotlib.pyplot,


## Types of strategies

tf.distribute.Strategy intends to cover a number of use cases along different axes. Some of these combinations are currently supported and others will be added in the future. Some of these axes are:

+ Synchronous vs asynchronous training: These are two common ways of distributing training with data parallelism. In sync training, all workers train over different slices of input data in sync, and aggregating gradients at each step. In async training, all workers are independently training over the input data and updating variables asynchronously. Typically sync training is supported via all-reduce and async through parameter server architecture.
+ Hardware platform: You may want to scale your training onto multiple GPUs on one machine, or multiple machines in a network (with 0 or more GPUs each), or on Cloud TPUs.

See the distributed training reference at the top for details.


## GPU's

References
+ https://www.howtogeek.com/414201/how-to-check-what-graphics-card-gpu-is-in-your-pc/
+ https://www.howtogeek.com/508993/how-to-check-which-gpu-is-installed-on-linux/

## MirroredStrategy

tf.distribute.MirroredStrategy supports synchronous distributed training on multiple GPUs on one machine. It creates one replica per GPU device. Each variable in the model is mirrored across all the replicas. Together, these variables form a single conceptual variable called MirroredVariable. These variables are kept in sync with each other by applying identical updates.

Efficient all-reduce algorithms are used to communicate the variable updates across the devices. All-reduce aggregates tensors across all the devices by adding them up, and makes them available on each device. It’s a fused algorithm that is very efficient and can reduce the overhead of synchronization significantly. There are many all-reduce algorithms and implementations available, depending on the type of communication available between devices. By default, it uses NVIDIA NCCL as the all-reduce implementation. You can choose from a few other options we provide, or write your own.

In [8]:
####################################################
#Mirrored Strategy
####################################################
startegy = tf.distribute.MirroredStrategy()

####################################################
#This will create a MirroredStrategy instance which will use all 
#the GPUs that are visible to TensorFlow, and use NCCL as the cross device communication.
#If you wish to use only some of the GPUs on your machine, you can do so like this:
#   mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


2022-12-16 20:57:35.387565: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/lib:/usr/lib64::
2022-12-16 20:57:35.387596: W tensorflow/stream_executor/cuda/cuda_driver.cc:326] failed call to cuInit: UNKNOWN ERROR (303)
2022-12-16 20:57:35.387614: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (codespaces-c62148): /proc/driver/nvidia/version does not exist
2022-12-16 20:57:35.387833: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-12-16 20:57:35.395281: I tensorflo

## CentralStorageStrategy

tf.distribute.experimental.CentralStorageStrategy does synchronous training as well. Variables are not mirrored, instead they are placed on the CPU and operations are replicated across all local GPUs. If there is only one GPU, all variables and operations will be placed on that GPU.

This will create a CentralStorageStrategy instance which will use all visible GPUs and CPU. Update to variables on replicas will be aggregated before being applied to variables.

Create an instance of CentralStorageStrategy by:

In [9]:
####################################################
#Central Storage Strategy
#Reference: https://www.tensorflow.org/api_docs/python/tf/distribute/experimental/CentralStorageStrategy
####################################################
strategy = tf.distribute.experimental.CentralStorageStrategy()

INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:CPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:CPU:0'


In [10]:
dataset = tf.data.Dataset.range(10).batch(2)
dist_dataset = strategy.experimental_distribute_dataset(dataset)
for x in dist_dataset:
  print(x)  # Prints PerReplica values [0, 1], [2, 3],...

tf.Tensor([0 1], shape=(2,), dtype=int64)
tf.Tensor([2 3], shape=(2,), dtype=int64)
tf.Tensor([4 5], shape=(2,), dtype=int64)
tf.Tensor([6 7], shape=(2,), dtype=int64)
tf.Tensor([8 9], shape=(2,), dtype=int64)


2022-12-16 20:57:51.105341: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:656] In AUTO-mode, and switching to DATA-based sharding, instead of FILE-based sharding as we cannot find appropriate reader dataset op(s) to shard. Error: Found an unshardable source dataset: name: "RangeDataset/_3"
op: "RangeDataset"
input: "Const/_0"
input: "Const/_1"
input: "Const/_2"
attr {
  key: "output_shapes"
  value {
    list {
      shape {
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}

2022-12-16 20:57:51.128178: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)


## MultiWorkerMirroredStrategy

tf.distribute.experimental.MultiWorkerMirroredStrategy is very similar to MirroredStrategy. It implements synchronous distributed training across multiple workers, each with potentially multiple GPUs. Similar to MirroredStrategy, it creates copies of all variables in the model on each device across all workers.

It uses CollectiveOps as the multi-worker all-reduce communication method used to keep variables in sync. A collective op is a single op in the TensorFlow graph which can automatically choose an all-reduce algorithm in the TensorFlow runtime according to hardware, network topology and tensor sizes.

It also implements additional performance optimizations. For example, it includes a static optimization that converts multiple all-reductions on small tensors into fewer all-reductions on larger tensors. In addition, we are designing it to have a plugin architecture - so that in the future, you will be able to plugin algorithms that are better tuned for your hardware. Note that collective ops also implement other collective operations such as broadcast and all-gather.

Here is the simplest way of creating MultiWorkerMirroredStrategy:

In [11]:
####################################################
#Multi-Worker Mirrored Strategy
####################################################
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead
INFO:tensorflow:Using MirroredStrategy with devices ('/device:CPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO


MultiWorkerMirroredStrategy currently allows you to choose between two different implementations of collective ops. CollectiveCommunication.RING implements ring-based collectives using gRPC as the communication layer. CollectiveCommunication.NCCL uses Nvidia's NCCL to implement collectives. CollectiveCommunication.AUTO defers the choice to the runtime. The best choice of collective implementation depends upon the number and kind of GPUs, and the network interconnect in the cluster. You can specify them in the following way:

In [12]:
multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
    tf.distribute.experimental.CollectiveCommunication.NCCL)


INFO:tensorflow:Using MirroredStrategy with devices ('/device:CPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.NCCL


One of the key differences to get multi worker training going, as compared to multi-GPU training, is the multi-worker setup. The TF_CONFIG environment variable is the standard way in TensorFlow to specify the cluster configuration to each worker that is part of the cluster. Learn more about setting up TF_CONFIG.
Note: This strategy is experimental as we are currently improving it and making it work for more scenarios. As part of this, please expect the APIs to change in the future.

## TPUStrategy

tf.distribute.experimental.TPUStrategy lets you run your TensorFlow training on Tensor Processing Units (TPUs). TPUs are Google's specialized ASICs designed to dramatically accelerate machine learning workloads. They are available on Google Colab, the TensorFlow Research Cloud and Cloud TPU.

In terms of distributed training architecture, TPUStrategy is the same MirroredStrategy - it implements synchronous distributed training. TPUs provide their own implementation of efficient all-reduce and other collective operations across multiple TPU cores, which are used in TPUStrategy.

*Note that this notebook does not devle into Collab.*

ParameterServerStrategy

tf.distribute.experimental.ParameterServerStrategy supports parameter servers training on multiple machines. In this setup, some machines are designated as workers and some as parameter servers. Each variable of the model is placed on one parameter server. Computation is replicated across all GPUs of all the workers.

For multi worker training, TF_CONFIG needs to specify the configuration of parameter servers and workers in your cluster.

In terms of code, it looks similar to other strategies:

In [13]:
####################################################
#Parameter Server
#Does not execute without configuration setup
####################################################
strategy = tf.distribute.experimental.ParameterServerStrategy()

TypeError: __init__() missing 1 required positional argument: 'cluster_resolver'

## Setting up TF_CONFIG environment variable

For multi-worker training, as mentioned before, you need to set TF_CONFIG environment variable for each binary running in your cluster. The TF_CONFIG environment variable is a JSON string which specifies what tasks constitute a cluster, their addresses and each task's role in the cluster. We provide a Kubernetes template in the tensorflow/ecosystem repo which sets TF_CONFIG for your training tasks.

One example of TF_CONFIG is:

    os.environ["TF_CONFIG"] = json.dumps({
        "cluster": {
            "worker": ["host1:port", "host2:port", "host3:port"],
            "ps": ["host4:port", "host5:port"]
        },
       "task": {"type": "worker", "index": 1}
    })

This TF_CONFIG specifies that there are three workers and two ps tasks in the cluster along with their hosts and ports. The "task" part specifies that the role of the current task in the cluster, worker 1 (the second worker). Valid roles in a cluster is "chief", "worker", "ps" and "evaluator". There should be no "ps" job except when using tf.distribute.experimental.ParameterServerStrategy.

OneDeviceStrategy

tf.distribute.OneDeviceStrategy runs on a single device. This strategy will place any variables created in its scope on the specified device. Input distributed through this strategy will be prefetched to the specified device. Moreover, any functions called via strategy.run will also be placed on the specified device.

You can use this strategy to test your code before switching to other strategies which actually distributes to multiple devices/machines.

So far we've talked about what are the different strategies available and how you can instantiate them. In the next few sections, we will talk about the different ways in which you can use them to distribute your training. We will show short code snippets in this guide and link off to full tutorials which you can run end to end.

In [14]:
strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")

## Using tf.distribute.Strategy with Keras

We've integrated tf.distribute.Strategy into tf.keras which is TensorFlow's implementation of the Keras API specification. tf.keras is a high-level API to build and train models. By integrating into tf.keras backend, we've made it seamless for you to distribute your training written in the Keras training framework.

Here's what you need to change in your code:

    Create an instance of the appropriate tf.distribute.Strategy
    Move the creation and compiling of Keras model inside strategy.scope.

We support all types of Keras models - sequential, functional and subclassed.

Here is a snippet of code to do this for a very simple Keras model with one dense layer:

In [15]:
mirrored_strategy = tf.distribute.MirroredStrategy()
with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  model.compile(loss='mse', optimizer='sgd')

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


In this example we used MirroredStrategy so we can run this on a machine with multiple GPUs. strategy.scope() indicated which parts of the code to run distributed. Creating a model inside this scope allows us to create mirrored variables instead of regular variables. Compiling under the scope allows us to know that the user intends to train this model using this strategy. Once this is set up, you can fit your model like you would normally. MirroredStrategy takes care of replicating the model's training on the available GPUs, aggregating gradients, and more.

In [16]:
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)

2022-12-16 20:58:50.107445: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:656] In AUTO-mode, and switching to DATA-based sharding, instead of FILE-based sharding as we cannot find appropriate reader dataset op(s) to shard. Error: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}



Epoch 1/2
Epoch 2/2


2022-12-16 20:58:51.575217: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:656] In AUTO-mode, and switching to DATA-based sharding, instead of FILE-based sharding as we cannot find appropriate reader dataset op(s) to shard. Error: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}





0.05456290394067764

## What's supported now?

In TF 2.0 release, MirroredStrategy, TPUStrategy, CentralStorageStrategy and MultiWorkerMirroredStrategy are supported in Keras. Except **MirroredStrategy**, others are currently experimental and are subject to change. Support for other strategies will be coming soon. The API and how to use will be exactly the same as above.

# Example Distributed training with Keras

## Overview

The tf.distribute.Strategy API provides an abstraction for distributing your training across multiple processing units. The goal is to allow users to enable distributed training using existing models and training code, with minimal changes.

This tutorial uses the tf.distribute.MirroredStrategy, which does in-graph replication with synchronous training on many GPUs on one machine. Essentially, it copies all of the model's variables to each processor. Then, it uses all-reduce to combine the gradients from all processors and applies the combined value to all copies of the model.

**MirroredStrategy** is one of several distribution strategy available in TensorFlow core. You can read about more strategies at distribution strategy guide.

Reference:
+ https://www.tensorflow.org/tutorials/distribute/keras

***Important:***

    conda install -c conda-forge tensorflow_datasets

In [17]:
import tensorflow_datasets as tfds
import tensorflow as tf
tfds.disable_progress_bar()

import os

ModuleNotFoundError: No module named 'tensorflow_datasets'

Download the MNIST dataset and load it from TensorFlow Datasets. This returns a dataset in tf.data format.

Setting with_info to True includes the metadata for the entire dataset, which is being saved here to info. Among other things, this metadata object includes the number of train and test examples. 

In [20]:
datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True)

mnist_train, mnist_test = datasets['train'], datasets['test']


Downloading and preparing dataset mnist (11.06 MiB) to C:\Users\ChristopherWood2\tensorflow_datasets\mnist\1.0.0...
Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`


Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`


Dataset mnist downloaded and prepared to C:\Users\ChristopherWood2\tensorflow_datasets\mnist\1.0.0. Subsequent calls will reuse this data.


## Define distribution strategy

Create a MirroredStrategy object. This will handle distribution, and provides a context manager (tf.distribute.MirroredStrategy.scope) to build your model inside.

In [21]:
strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))





INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


Number of devices: 1


## Setup input pipeline

When training a model with multiple GPUs, you can use the extra computing power effectively by increasing the batch size. In general, use the largest batch size that fits the GPU memory, and tune the learning rate accordingly.

In [22]:
# You can also do info.splits.total_num_examples to get the total
# number of examples in the dataset.

num_train_examples = info.splits['train'].num_examples
num_test_examples = info.splits['test'].num_examples

BUFFER_SIZE = 10000

BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

Pixel values, which are 0-255, have to be normalized to the 0-1 range. Define this scale in a function.

In [23]:
def scale(image, label):
  image = tf.cast(image, tf.float32)
  image /= 255

  return image, label

Apply this function to the training and test data, shuffle the training data, and batch it for training. Notice we are also keeping an in-memory cache of the training data to improve performance.

In [24]:
train_dataset = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
eval_dataset = mnist_test.map(scale).batch(BATCH_SIZE)

## Create the model
Create and compile the Keras model in the context of strategy.scope.

In [25]:
with strategy.scope():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])

  model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                optimizer=tf.keras.optimizers.Adam(),
                metrics=['accuracy'])

## Define the callbacks

The callbacks used here are:

   + TensorBoard: This callback writes a log for TensorBoard which allows you to visualize the graphs.
   + Model Checkpoint: This callback saves the model after every epoch.
   + Learning Rate Scheduler: Using this callback, you can schedule the learning rate to change after every epoch/batch.

For illustrative purposes, add a print callback to display the learning rate in the notebook.

In [24]:
# Define the checkpoint directory to store the checkpoints

checkpoint_dir = '.'+os.sep+'tmp'+os.sep+'training_checkpoints'
!mkdir {checkpoint_dir}


# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}.h5")

In [25]:
# Callback for printing the LR at the end of each epoch.
class PrintLR(tf.keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs=None):
    print('\nLearning rate for epoch {} is {}'.format(epoch + 1,
                                                      model.optimizer.lr.numpy()))


In [26]:
callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir='.'+os.sep+'logs',
                                   histogram_freq=0,  
                                   embeddings_freq=0, 
                                   update_freq='epoch',
                                   profile_batch=0),  #added this (which doesn't profile) to get the example to to work
    
    #When saving a model's weights, tf.keras defaults to the checkpoint format. 
    #Pass save_format='h5' to use HDF5 (or pass a filename that ends in .h5).
    tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,
                                       #save_weights_only=True,
                                       verbose=1),
    PrintLR()
]


## Train and evaluate

Now, train the model in the usual way, calling fit on the model and passing in the dataset created at the beginning of the tutorial. This step is the same whether you are distributing the training or not.

In [27]:
model.fit(train_dataset, epochs=12, callbacks=callbacks)

Epoch 1/12
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


    938/Unknown - 8s 9ms/step - loss: 0.1965 - accuracy: 0.9440
Epoch 00001: saving model to .\tmp\training_checkpoints\ckpt_1.h5

Learning rate for epoch 1 is 0.0010000000474974513
Epoch 2/12
Epoch 00002: saving model to .\tmp\training_checkpoints\ckpt_2.h5

Learning rate for epoch 2 is 0.0010000000474974513
Epoch 3/12
Epoch 00003: saving model to .\tmp\training_checkpoints\ckpt_3.h5

Learning rate for epoch 3 is 0.0010000000474974513
Epoch 4/12
Epoch 00004: saving model to .\tmp\training_checkpoints\ckpt_4.h5

Learning rate for epoch 4 is 0.0010000000474974513
Epoch 5/12
Epoch 00005: saving model to .\tmp\training_checkpoints\ckpt_5.h5

Learning rate for epoch 5 is 0.0010000000474974513
Epoch 6/12
Epoch 00006: saving model to .\tmp\training_checkpoints\ckpt_6.h5

Learning rate for epoch 6 is 0.0010000000474974513
Epoch 7/12
Epoch 00007: saving model to .\tmp\training_checkpoints\ckpt_7.h5

Learning rate for epoch 7 is 0.0010000000474974513
Epoch 8/12
Epoch 00008: saving model to .\tm

<tensorflow.python.keras.callbacks.History at 0x267544f60c8>

In [28]:
#See if the checkpoints are saved.
from sys import platform
if platform == "linux" or platform == "linux2":
    !ls {checkpoint_dir}
elif platform == "darwin":
    !ls {checkpoint_dir}
elif platform == "win32":
    !dir {checkpoint_dir}


 Volume in drive E is Data
 Volume Serial Number is C0D8-A109

 Directory of E:\Documents\CWOOD\projects\my_jbooks\tmp\training_checkpoints

09/12/2022  04:21 PM    <DIR>          .
09/12/2022  04:21 PM    <DIR>          ..
09/12/2022  04:20 PM         4,200,528 ckpt_1.h5
09/12/2022  04:21 PM         4,200,528 ckpt_10.h5
09/12/2022  04:21 PM         4,200,528 ckpt_11.h5
09/12/2022  04:21 PM         4,200,528 ckpt_12.h5
09/12/2022  04:21 PM         4,200,528 ckpt_2.h5
09/12/2022  04:21 PM         4,200,528 ckpt_3.h5
09/12/2022  04:21 PM         4,200,528 ckpt_4.h5
09/12/2022  04:21 PM         4,200,528 ckpt_5.h5
09/12/2022  04:21 PM         4,200,528 ckpt_6.h5
09/12/2022  04:21 PM         4,200,528 ckpt_7.h5
09/12/2022  04:21 PM         4,200,528 ckpt_8.h5
09/12/2022  04:21 PM         4,200,528 ckpt_9.h5
              12 File(s)     50,406,336 bytes
               2 Dir(s)  304,131,563,520 bytes free


To see how the model perform, load the latest checkpoint and call evaluate on the test data.

Call evaluate as before using appropriate datasets.

In [30]:
#tutorials indicates to save weights only but I found this to be a problem / concern between 
#tensorflow and keras calls, so save the whole model (who cares anyway)
#model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))

#load the specific model name
model=tf.keras.models.load_model(checkpoint_dir+os.sep+'ckpt_12.h5')


eval_loss, eval_acc = model.evaluate(eval_dataset)

print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))


    157/Unknown - 1s 6ms/step - loss: 0.0623 - accuracy: 0.9842Eval loss: 0.062301204898187953, Eval Accuracy: 0.9842000007629395
