**PART 3**

**ADVANCED DEEP NETWORKS FOR COMPLEX PROBLEMS**

---

**CHAPTER 15 - TFX: MLOps and deploying models with TensorFlow**

---

### **15.1 Writing a data pipeline with TFX**

**MLOps** (Machine Learning Operations) combines ML and DevOps to automate the lifecycle of machine learning applications, from data collection to model delivery. **TFX (TensorFlow Extended)** is a powerful platform for building and managing these end-to-end machine learning pipelines.

In this chapter, we develop a pipeline to predict the severity of forest fires based on weather conditions using the **Forest Fires** dataset. The pipeline involves several stages: data ingestion, validation, feature transformation, model training, and deployment.

#### **15.1.1 Loading data from CSV files**

The first step in any pipeline is data ingestion. TFX provides the `CsvExampleGen` component specifically for reading CSV files. This component splits the data into training and evaluation sets (using hashing) and converts them into **TFRecord** format (serialized byte streams), which is the standard efficient data format for TensorFlow.

![Figure 15.1 The directory/file structure after running the CsvExampleGen](./15.Chapter-15/Figure15-1.jpg)

After running, the component generates an output artifact containing the split data files (e.g., `train` and `eval` splits).

![Figure 15.2 Output HTML table generated by running the CsvExampleGen component](./15.Chapter-15/Figure15-2.jpg)

In [None]:
from tfx.components import CsvExampleGen
import os

# Define the component to read data from the directory containing CSVs
example_gen = CsvExampleGen(input_base=os.path.join('data', 'csv', 'train'))

# Run the component within the interactive context
context.run(example_gen)

#### **15.1.2 Generating basic statistics from the data**

Understanding the data through **Exploratory Data Analysis (EDA)** is critical. The `StatisticsGen` component iterates over the data produced by `CsvExampleGen` and computes descriptive statistics (e.g., mean, standard deviation, missing value counts, min/max) for both training and evaluation splits.

![Figure 15.3 The output provided by the StatisticsGen component](./15.Chapter-15/Figure15-3.jpg)
![Figure 15.4 The directory/file structure after running StatisticsGen](./15.Chapter-15/Figure15-4.jpg)

Visualizing these statistics allows us to detect data skewness, missing values, or anomalies. For example, we might observe the distribution of the `FFMC` feature or categorical counts for `day` and `month`.

![Figure 15.5 The summary statistics graphs generated for the data by the StatisticsGen component](./15.Chapter-15/Figure15-5.jpg)

In [None]:
from tfx.components import StatisticsGen

# Generate statistics using the examples produced by CsvExampleGen
statistics_gen = StatisticsGen(
    examples=example_gen.outputs['examples']
)
context.run(statistics_gen)

# Visualize the statistics in the notebook
context.show(statistics_gen.outputs['statistics'])

#### **15.1.3 Inferring the schema from data**

The `SchemaGen` component automatically infers a data schema based on the statistics generated in the previous step. This schema acts as a blueprint for the data, defining:
* **Data Types**: e.g., FLOAT, INT, STRING.
* **Presence**: Whether a feature is required or optional.
* **Domains**: The set of allowed values for categorical features or ranges for numerical ones.

We set `infer_feature_shape=False` to provide flexibility for downstream feature engineering steps, which means the data will be represented as sparse tensors during the transformation phase.

In [None]:
from tfx.components import SchemaGen

# Infer schema from statistics
schema_gen = SchemaGen(
    statistics=statistics_gen.outputs['statistics'],
    infer_feature_shape=False
)
context.run(schema_gen)
context.show(schema_gen.outputs['schema'])

#### **15.1.4 Converting data to features**

The `Transform` component is responsible for feature engineering. It takes the raw data and the schema, and applies a user-defined preprocessing function (`preprocessing_fn`). This function is typically defined in a separate Python module file (e.g., `forest_fires_transform.py`).

Common transformations include:
* **Z-score Normalization**: For dense floating-point features (scaling to mean 0, variance 1).
* **Vocabulary Generation**: Mapping strings to integer IDs for categorical features.
* **Bucketization**: Grouping continuous numerical values into discrete bins (e.g., bucketizing Relative Humidity).

The `preprocessing_fn` leverages the `tensorflow_transform` (tft) library to perform these operations efficiently and consistently during both training and serving.

In [None]:
%%writefile forest_fires_transform.py
import tensorflow as tf
import tensorflow_transform as tft
import forest_fires_constants

# ... (Import constants like feature keys) ...

def preprocessing_fn(inputs):
    outputs = {}
    # Scale dense features using Z-score
    for key in _DENSE_FLOAT_FEATURE_KEYS:
        outputs[_transformed_name(key)] = tft.scale_to_z_score(
            _sparse_to_dense(inputs[key])
        )
    # Apply vocabulary for categorical features
    for key in _VOCAB_FEATURE_KEYS:
        outputs[_transformed_name(key)] = tft.compute_and_apply_vocabulary(
            _sparse_to_dense(inputs[key]), 
            num_oov_buckets=1
        )
    # Bucketize numerical features based on defined boundaries
    for key, boundary in zip(_BUCKET_FEATURE_KEYS, _BUCKET_FEATURE_BOUNDARIES):
        outputs[_transformed_name(key)] = tft.apply_buckets(
            _sparse_to_dense(inputs[key]), bucket_boundaries=[boundary]
        )
    # Pass the label through unchanged
    outputs[_transformed_name(_LABEL_KEY)] = _sparse_to_dense(inputs[_LABEL_KEY])
    return outputs

In [None]:
from tfx.components import Transform

transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file=os.path.abspath('forest_fires_transform.py')
)
context.run(transform)

### **15.2 Training a simple regression neural network: TFX Trainer API**

The `Trainer` component is the core engine for model training in TFX. It consumes the transformed data and the transform graph produced by the previous step. We define a `run_fn` function (in a separate module) to orchestrate data loading, model building, and the training loop.

![Figure 15.6 How the model interacts with the API, the TensorFlow server, and the client](./15.Chapter-15/Figure15-6.jpg)

#### **15.2.1 Defining a Keras model**
To handle the various feature types (dense, categorical, bucketized), we utilize `tf.feature_column` definitions. These columns are then fed into a `DenseFeatures` layer, which serves as the entry point to our neural network. The model architecture is a deep regressor consisting of several hidden layers and a single output node for the regression target.

![Figure 15.7 Overview of the functionality of the DenseFeatures layer](./15.Chapter-15/Figure15-7.jpg)

In [None]:
def _build_keras_model(columns, dnn_hidden_units):
    # ... (Define Input layers for all feature columns) ...
    
    # Use DenseFeatures layer to process inputs based on feature columns
    output = tf.keras.layers.DenseFeatures(columns)(input_layers)
    
    # Add hidden layers
    for numnodes in dnn_hidden_units:
        output = tf.keras.layers.Dense(numnodes, activation='tanh')(output)
    
    # Output layer for regression (1 unit)
    output = tf.keras.layers.Dense(1)(output)
    
    model = tf.keras.Model(input_layers, output)
    model.compile(loss='mean_squared_error', 
                  optimizer=tf.keras.optimizers.Adam(lr=0.001))
    return model

#### **15.2.2 Defining the model training**
The `run_fn` function ties everything together. It:
1.  Loads the transformation graph.
2.  Creates training and evaluation datasets using a custom `_input_fn`.
3.  Builds the Keras model.
4.  Trains the model using `model.fit()`.
5.  Saves the trained model along with specific **signatures** that define how it should be served.

In [None]:
def run_fn(fn_args: tfx.components.FnArgs):
    # Load the transform output
    tf_transform_output = tft.TFTransformOutput(fn_args.transform_graph_path)
    
    # Create datasets
    train_dataset = _input_fn(fn_args.train_files, ...)
    eval_dataset = _input_fn(fn_args.eval_files, ...)
    
    # Build the model
    model = _build_keras_model(...)
    
    # Train the model
    model.fit(train_dataset, ...)
    
    # Define signatures for serving
    signatures = {
        'serving_default': _get_serve_tf_examples_fn(model, tf_transform_output).get_concrete_function(...)
    }
    # Save the model
    model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

#### **15.2.3 Signature Defs: Defining how models are used outside TensorFlow**
Signatures are crucial for deployment. They act as the API contract for the model. We define a serving function `serve_tf_examples_fn` which:
1.  Accepts serialized examples (raw byte data).
2.  Parses them.
3.  Applies the transformation logic (embedding the Transform layer directly into the serving graph).
4.  Generates predictions.
This approach ensures that the deployed model can handle raw data directly without needing a separate preprocessing service.

#### **15.2.4 Training the Keras model with TFX Trainer**
We execute the training by instantiating the `Trainer` component. We pass it the module file containing our training logic, the transformed data, the schema, and the transform graph. The Trainer outputs the saved model artifacts to the pipeline root.

![Figure 15.8 The complete directory/file structure after running the Trainer](./15.Chapter-15/Figure15-8.jpg)

In [None]:
from tfx.components import Trainer
from tfx.proto import trainer_pb2

trainer = Trainer(
    module_file=os.path.abspath("forest_fires_trainer.py"),
    transformed_examples=transform.outputs['transformed_examples'],
    schema=schema_gen.outputs['schema'],
    transform_graph=transform.outputs['transform_graph'],
    train_args=trainer_pb2.TrainArgs(num_steps=n_train_steps),
    eval_args=trainer_pb2.EvalArgs(num_steps=n_eval_steps)
)
context.run(trainer)

### **15.3 Setting up Docker to serve a trained model**

To create a portable and isolated environment for our model, we use **Docker**. We pull the official `tensorflow/serving` Docker image, which is optimized for serving TensorFlow models via a REST API.

We start a container mapping the model's directory from our host machine to a directory inside the container, and we expose port **8501** for REST API communication. The command typically looks like:
`docker run --rm -p 8501:8501 ... tensorflow/serving:2.6.3-gpu`

### **15.4 Deploying the model and serving it through an API**

Before a model goes live, it must pass validation checks. TFX provides specialized components for this:

#### **15.4.1 Validating the infrastructure**
The `InfraValidator` component verifies that the model is mechanically serveable. It spins up a test Docker container, loads the model, and sends test requests. If the model responds correctly, it is marked as "BLESSED" (infrastructure-wise).

![Figure 15.9 The directory/file structure after running the InfraValidator](./15.Chapter-15/Figure15-9.jpg)

#### **15.4.2 Resolving the correct model**
The `Resolver` component is used to manage model versions. Using a strategy like `LatestBlessedModelStrategy`, it identifies the best previously trained model (baseline) to compare the new model against.

#### **15.4.3 Evaluating the model**
The `Evaluator` component performs deep analysis of model performance. It computes metrics (e.g., Mean Squared Error) on the evaluation set and checks them against defined thresholds. It also compares the current model to the baseline. Only if the new model meets the thresholds and outperforms the baseline is it "blessed" for quality.

#### **15.4.4 Pushing the final model**
The `Pusher` component is the final gatekeeper. It pushes the model to the production destination (e.g., a serving directory) only if the model has been blessed by both the `InfraValidator` and the `Evaluator`.

#### **15.4.5 Predicting with the TensorFlow serving API**
Once the model is served (e.g., via Docker), clients can send HTTP POST requests to the API endpoint (e.g., `http://localhost:8501/v1/models/forest_fires_model:predict`). The request body typically contains the input data, often encoded in base64 if it involves binary formats or serialized examples.

![Figure 15.10 How the model interacts with the API, the TensorFlow server, and the client](./15.Chapter-15/Figure15-10.jpg)

In [None]:
import requests
import json
import base64

# Construct the request body
# Note: Input data is often base64 encoded for transmission
req_body = {
    "signature_name": "serving_default",
    "instances": [
        str(base64.b64encode(b"{\"X\": 7, ...}")) # Example input data
    ]
}

# Send HTTP POST request to the running TensorFlow Serving container
json_response = requests.post(
    'http://localhost:8501/v1/models/forest_fires_model:predict',
    data=json.dumps(req_body),
    headers={"content-type": "application/json"}
)

# Parse and print predictions
predictions = json.loads(json_response.text)
print(predictions)