**Plan**

**1. Data and Model Parallelism (Multi-GPU and distributed training)**


**2. Introduction to TensorFlow Extended (TFX)**



**<h2>Data and Model Parallelism (Multi-GPU and distributed training)</h2>**

Data and model parallelism are techniques used to speed up the training of deep learning models by distributing the computational load across multiple GPUs or even multiple machines. TensorFlow, a popular deep learning framework, provides robust support for both data and model parallelism. Here is a detailed explanation of both techniques and how to implement them using TensorFlow.



**<h3>A. Data Parallelism</h3>**

Data parallelism involves splitting the training data into multiple subsets and processing each subset on a different GPU. The model is replicated on each GPU, and each replica processes a different portion of the data. Gradients are then averaged and the model is updated.

**1. Setup TensorFlow and import necessary libraries**

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense

**2. Define the strategy**<br>
Use tf.distribute.MirroredStrategy for synchronous training on multiple GPUs.

In [None]:
strategy = tf.distribute.MirroredStrategy()

print("Number of devices: {}".format(strategy.num_replicas_in_sync))

**3. Define and compile your model within the strategy's scope**

In [None]:
with strategy.scope():
    model = Sequential([
        Dense(128, activation='relu', input_shape=(784,)),
        Dense(64, activation='relu'),
        Dense(10, activation='softmax')
    ])

    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

**4. Load and preprocess your data**

In [None]:
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
x_train = x_train.reshape(-1, 784)
x_test = x_test.reshape(-1, 784)

**5. Train the model**

In [None]:
model.fit(x_train, y_train, epochs=10, batch_size=64, validation_data=(x_test, y_test))

**<h3>B. Model Parallelism</h3>**

Model parallelism involves splitting a large model across multiple GPUs. Each GPU is responsible for different parts of the model. This is particularly useful for very large models that cannot fit into a single GPU's memory.

**1. Setup TensorFlow and import necessary libraries:**

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.models import Model

**2. Define the parts of the model on different GPUs**

In [None]:
with tf.device('/GPU:0'):
    inputs = Input(shape=(784,))
    x = Dense(512, activation='relu')(inputs)

with tf.device('/GPU:1'):
    x = Dense(256, activation='relu')(x)
    outputs = Dense(10, activation='softmax')(x)

model = Model(inputs, outputs)

**3. Compile the model**

In [None]:
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

**4. Load and preprocess your data**

In [None]:
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
x_train = x_train.reshape(-1, 784)
x_test = x_test.reshape(-1, 784)

**5. Train the model**

In [None]:
model.fit(x_train, y_train, epochs=10, batch_size=64, validation_data=(x_test, y_test))

**<h3>C. Distributed Training</h3>**

For distributed training across multiple machines, TensorFlow provides the tf.distribute.MultiWorkerMirroredStrategy. This strategy uses synchronous training across multiple workers.

**1. Setup TensorFlow and import necessary libraries**

In [None]:
import tensorflow as tf

**2. Define the strategy**

In [None]:
strategy = tf.distribute.MultiWorkerMirroredStrategy()

**3. Configure the cluster**

You need to set up the cluster specification. This is typically done through environment variables.

In [None]:
import os

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["host1:port", "host2:port"]
    },
    'task': {'type': 'worker', 'index': 0}  # or 1 for the second worker
})

**4. Define and compile your model within the strategy's scope**

In [None]:
with strategy.scope():
    model = Sequential([
        Dense(128, activation='relu', input_shape=(784,)),
        Dense(64, activation='relu'),
        Dense(10, activation='softmax')
    ])

    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

**4. Load and preprocess your data**

In [None]:
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
x_train = x_train.reshape(-1, 784)
x_test = x_test.reshape(-1, 784)

**5. Train the model**

In [None]:
model.fit(x_train, y_train, epochs=10, batch_size=64, validation_data=(x_test, y_test))

**<h2>Introduction to TensorFlow Extended (TFX)</h2>**

TensorFlow Extended (TFX) is an end-to-end platform for deploying production machine learning (ML) pipelines, enabling scalable and reliable workflows from data ingestion to model training, evaluation, and serving. It integrates with various tools like Apache Beam, Apache Airflow, and Kubernetes to automate and manage ML lifecycle tasks. The primary difference between TFX and TensorFlow Serving lies in their scope and functionality: TFX encompasses the entire ML pipeline, handling data validation, feature engineering, model training, evaluation, and deployment, whereas TensorFlow Serving is a specialized component within the TFX ecosystem focused specifically on serving trained ML models efficiently in production environments for inference purposes.

**step 1: Install Required Libraries**

First, install TFX and other necessary libraries.

In [None]:
!pip install tfx
!pip install tensorflow
!pip install apache-beam[gcp]


**Step 2: Define the Components of the TFX Pipeline**

Create a Python script to define and run the TFX pipeline.

In [42]:

import os
import tensorflow as tf
import tensorflow_transform as tft
import tfx
import tensorflow_model_analysis as tfma
from tfx.components import CsvExampleGen, StatisticsGen, SchemaGen, ExampleValidator, Transform, Trainer, Evaluator, Pusher
from tfx.proto import trainer_pb2, pusher_pb2
from tfx.orchestration import metadata
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.types.standard_artifacts import Model, ModelBlessing
from tfx.orchestration.pipeline import Pipeline



In [2]:
# Initialize the TFX pipeline context
context = InteractiveContext()
context



<tfx.orchestration.experimental.interactive.interactive_context.InteractiveContext at 0x7ac40a483d60>

In [51]:
# Download a CSV test file
! wget https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv

--2024-07-23 19:00:58--  https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1922812 (1.8M) [text/plain]
Saving to: ‘data.csv’


2024-07-23 19:00:59 (27.0 MB/s) - ‘data.csv’ saved [1922812/1922812]



In [57]:
! mkdir data; mv data.csv data/

In [56]:
# Define paths for the pipeline
_pipeline_name = 'my_pipeline'
_data_root = 'data'
_output_dir = 'output'
_pipeline_root = os.path.join(_output_dir, 'pipelines', _pipeline_name)
_metadata_path = os.path.join(_output_dir, 'metadata.db')


In [58]:
# Define ExampleGen component to ingest CSV data
example_gen = CsvExampleGen(input_base=_data_root)
example_gen

0,1
.inputs,{}
.outputs,['examples'] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'Examples' (0 artifacts) at 0x7ac40829cfd0.type_nameExamples._artifacts[]
.exec_properties,"['input_base']data['input_config']{  ""splits"": [  {  ""name"": ""single_split"",  ""pattern"": ""*""  }  ] }['output_config']{  ""split_config"": {  ""splits"": [  {  ""hash_buckets"": 2,  ""name"": ""train""  },  {  ""hash_buckets"": 1,  ""name"": ""eval""  }  ]  } }['output_data_format']6['output_file_format']5['custom_config']None['range_config']None"

0,1
['examples'],function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'Examples' (0 artifacts) at 0x7ac40829cfd0.type_nameExamples._artifacts[]

0,1
.type_name,Examples
._artifacts,[]

0,1
['input_base'],data
['input_config'],"{  ""splits"": [  {  ""name"": ""single_split"",  ""pattern"": ""*""  }  ] }"
['output_config'],"{  ""split_config"": {  ""splits"": [  {  ""hash_buckets"": 2,  ""name"": ""train""  },  {  ""hash_buckets"": 1,  ""name"": ""eval""  }  ]  } }"
['output_data_format'],6
['output_file_format'],5
['custom_config'],
['range_config'],


In [64]:
# Generate statistics
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
statistics_gen

0,1
.inputs,"['examples'] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'Examples' (1 artifact) at 0x7ac40829cfd0.type_nameExamples._artifacts[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: /tmp/tfx-interactive-2024-07-23T18_11_59.380169-3ll0v2gl/CsvExampleGen/examples/4) at 0x7ac402ec30d0.type<class 'tfx.types.standard_artifacts.Examples'>.uri/tmp/tfx-interactive-2024-07-23T18_11_59.380169-3ll0v2gl/CsvExampleGen/examples/4.span0.split_names[""train"", ""eval""].version0"
.outputs,['statistics'] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'ExampleStatistics' (0 artifacts) at 0x7ac4080d6aa0.type_nameExampleStatistics._artifacts[]
.exec_properties,['stats_options_json']None['exclude_splits'][]

0,1
['examples'],"function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'Examples' (1 artifact) at 0x7ac40829cfd0.type_nameExamples._artifacts[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: /tmp/tfx-interactive-2024-07-23T18_11_59.380169-3ll0v2gl/CsvExampleGen/examples/4) at 0x7ac402ec30d0.type<class 'tfx.types.standard_artifacts.Examples'>.uri/tmp/tfx-interactive-2024-07-23T18_11_59.380169-3ll0v2gl/CsvExampleGen/examples/4.span0.split_names[""train"", ""eval""].version0"

0,1
.type_name,Examples
._artifacts,"[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: /tmp/tfx-interactive-2024-07-23T18_11_59.380169-3ll0v2gl/CsvExampleGen/examples/4) at 0x7ac402ec30d0.type<class 'tfx.types.standard_artifacts.Examples'>.uri/tmp/tfx-interactive-2024-07-23T18_11_59.380169-3ll0v2gl/CsvExampleGen/examples/4.span0.split_names[""train"", ""eval""].version0"

0,1
[0],"function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: /tmp/tfx-interactive-2024-07-23T18_11_59.380169-3ll0v2gl/CsvExampleGen/examples/4) at 0x7ac402ec30d0.type<class 'tfx.types.standard_artifacts.Examples'>.uri/tmp/tfx-interactive-2024-07-23T18_11_59.380169-3ll0v2gl/CsvExampleGen/examples/4.span0.split_names[""train"", ""eval""].version0"

0,1
.type,<class 'tfx.types.standard_artifacts.Examples'>
.uri,/tmp/tfx-interactive-2024-07-23T18_11_59.380169-3ll0v2gl/CsvExampleGen/examples/4
.span,0
.split_names,"[""train"", ""eval""]"
.version,0

0,1
['statistics'],function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'ExampleStatistics' (0 artifacts) at 0x7ac4080d6aa0.type_nameExampleStatistics._artifacts[]

0,1
.type_name,ExampleStatistics
._artifacts,[]

0,1
['stats_options_json'],
['exclude_splits'],[]


In [None]:
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

# Initialize the Interactive Context
context = InteractiveContext()

# Run the pipeline components
context.run(example_gen)

# List artifacts produced by ExampleGen
example_gen_output = example_gen.outputs['examples'].get()[0]
print(f"ExampleGen output: {example_gen_output.uri}")

# List files in the output directory
for root, dirs, files in os.walk(example_gen_output.uri):
    for file in files:
        print(os.path.join(root, file))



In [15]:
# Infer schema
schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'])
schema_gen

0,1
.inputs,['statistics'] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'ExampleStatistics' (0 artifacts) at 0x7ac40a482500.type_nameExampleStatistics._artifacts[]
.outputs,['schema'] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'Schema' (0 artifacts) at 0x7ac40a483a90.type_nameSchema._artifacts[]
.exec_properties,['infer_feature_shape']1['exclude_splits'][]

0,1
['statistics'],function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'ExampleStatistics' (0 artifacts) at 0x7ac40a482500.type_nameExampleStatistics._artifacts[]

0,1
.type_name,ExampleStatistics
._artifacts,[]

0,1
['schema'],function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'Schema' (0 artifacts) at 0x7ac40a483a90.type_nameSchema._artifacts[]

0,1
.type_name,Schema
._artifacts,[]

0,1
['infer_feature_shape'],1
['exclude_splits'],[]


In [16]:
# Validate examples
example_validator = ExampleValidator(statistics=statistics_gen.outputs['statistics'], schema=schema_gen.outputs['schema'])
example_validator

0,1
.inputs,['statistics'] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'ExampleStatistics' (0 artifacts) at 0x7ac40a482500.type_nameExampleStatistics._artifacts[]['schema'] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'Schema' (0 artifacts) at 0x7ac40a483a90.type_nameSchema._artifacts[]
.outputs,['anomalies'] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'ExampleAnomalies' (0 artifacts) at 0x7ac40a483760.type_nameExampleAnomalies._artifacts[]
.exec_properties,['exclude_splits'][]['custom_validation_config']None

0,1
['statistics'],function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'ExampleStatistics' (0 artifacts) at 0x7ac40a482500.type_nameExampleStatistics._artifacts[]
['schema'],function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'Schema' (0 artifacts) at 0x7ac40a483a90.type_nameSchema._artifacts[]

0,1
.type_name,ExampleStatistics
._artifacts,[]

0,1
.type_name,Schema
._artifacts,[]

0,1
['anomalies'],function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'ExampleAnomalies' (0 artifacts) at 0x7ac40a483760.type_nameExampleAnomalies._artifacts[]

0,1
.type_name,ExampleAnomalies
._artifacts,[]

0,1
['exclude_splits'],[]
['custom_validation_config'],


In [30]:
# Transform data
def preprocessing_fn(inputs):
    outputs = inputs.copy()
    outputs['feature'] = tft.scale_to_z_score(inputs['feature'])
    return outputs

transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    preprocessing_fn='__main__.preprocessing_fn'  # Assuming preprocessing_fn is defined in the same script
)

In [26]:
# Define a simple model for training
def _build_keras_model(hidden_units):
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(hidden_units, activation='relu', input_shape=(1,)),
        tf.keras.layers.Dense(1)
    ])
    model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae'])
    return model


In [31]:
# Save _build_keras_model and preprocessing_fn in the same script
module_file = 'model_trainer.py'
with open(module_file, 'w') as f:
    f.write("""
import tensorflow as tf
import tensorflow_transform as tft

def preprocessing_fn(inputs):
    outputs = inputs.copy()
    outputs['feature'] = tft.scale_to_z_score(inputs['feature'])
    return outputs

def _build_keras_model():
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(10, activation='relu', input_shape=(1,)),
        tf.keras.layers.Dense(1)
    ])
    model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae'])
    return model
    """)

In [34]:
# Trainer component
trainer = Trainer(
    module_file=module_file,
    examples=transform.outputs['transformed_examples'],
    transform_graph=transform.outputs['transform_graph'],
    schema=schema_gen.outputs['schema'],
    train_args=trainer_pb2.TrainArgs(num_steps=100),
    eval_args=trainer_pb2.EvalArgs(num_steps=50)
)

In [35]:
# Model evaluation
eval_config = tfma.EvalConfig(
    slicing_specs=[tfma.SlicingSpec()],
    metrics_specs=[
        tfma.MetricsSpec(
            metrics=[tfma.MetricConfig(class_name='ExampleCount')],
        )
    ]
)
evaluator = Evaluator(
    examples=example_gen.outputs['examples'],
    model=trainer.outputs['model'],
    eval_config=eval_config
)

In [39]:
# Push the model to a serving directory
pusher = Pusher(
    model=trainer.outputs['model'],
    model_blessing=evaluator.outputs['blessing'],
    push_destination=pusher_pb2.PushDestination(
        filesystem=pusher_pb2.PushDestination.Filesystem(base_directory=_output_dir)
    )
)

In [43]:
# Assemble the pipeline
pipeline = Pipeline(
    pipeline_name=_pipeline_name,
    pipeline_root=_pipeline_root,
    components=[
        example_gen,
        statistics_gen,
        schema_gen,
        example_validator,
        transform,
        trainer,
        evaluator,
        pusher
    ],
    metadata_connection_config=metadata.sqlite_metadata_connection_config(_metadata_path),
)

In [44]:
# Run the pipeline
context.run(pipeline)

0,1
.execution_id,1
.component,<tfx.orchestration.pipeline.Pipeline object at 0x7ac40a4839a0>
.component.inputs,{}
.component.outputs,{}


**Step 3: Deploying with TensorFlow Serving**

Assume the model was saved in output/exported_models. Run TensorFlow Serving:

In [None]:
! tensorflow_model_server --rest_api_port=8501 --model_name=my_model --model_base_path="$(pwd)/output/exported_models"

Use a simple Python script to query the served model.

In [None]:
import requests
import json

# Define the data to be sent in the request
data = {
    "instances": [{"feature": 0.5}]
}

# Send the request to the model server
response = requests.post('http://localhost:8501/v1/models/my_model:predict', json=data)

# Print the response
print(response.json())
