# Chapter 15 — TFX: MLOps and deploying models with TensorFlow

This notebook reproduces the workflow from **TensorFlow in Action (Thushan Ganegedara)** Chapter 15.  
The focus is not only training a model, but organizing the whole “model lifecycle” into a repeatable pipeline.

---

## Summary

In earlier chapters, it was possible to train a model and evaluate it in a single notebook session. In production, that approach breaks down quickly because we also need to manage:

- Data ingestion: where the data comes from, how it is split, and whether it keeps the same schema over time  
- Data validation: detecting anomalies (unexpected values, missing columns, distribution shifts) before training  
- Feature transformation: consistent preprocessing that is applied during both training and serving  
- Training and artifact tracking: saving models, metadata, and intermediate artifacts in a way that can be reproduced  
- Evaluation and validation: ensuring a newly trained model is “good enough” and does not regress compared to a baseline  
- Infrastructure checks + deployment: validating that a model can actually be served, then pushing it to a serving directory

This chapter uses **TFX (TensorFlow Extended)** to connect those pieces. TFX is an ecosystem of standard pipeline components (ExampleGen, StatisticsGen, SchemaGen, Transform, Trainer, Evaluator, InfraValidator, Pusher, …) that communicate by passing **artifacts** (data, schemas, models, evaluation results) while tracking everything in **ML Metadata (MLMD)**.

The dataset used here is the **Forest Fires dataset** from UCI. The target variable is `area` (burned area), and the goal is to build a **regression model** and then walk through a simplified end-to-end MLOps pipeline.

---

## Notes about running in Colab

- TFX is a large dependency. After installing it, Colab often requires a runtime restart.
- Some parts of deployment (especially Docker-based TensorFlow Serving) depend on whether your environment supports Docker.  
  If Docker commands fail, the serving parts can be run on a local machine instead.

In [1]:
# (Colab) Install TFX
# After install, restart the runtime if you see import errors.
!pip -q install -U tfx

  [1;31merror[0m: [1msubprocess-exited-with-error[0m
  
  [31m×[0m [32mpython setup.py egg_info[0m did not run successfully.
  [31m│[0m exit code: [1;36m1[0m
  [31m╰─>[0m See above for output.
  
  [1;35mnote[0m: This error originates from a subprocess, and is likely not a problem with pip.
  Preparing metadata (setup.py) ... [?25l[?25herror
[1;31merror[0m: [1mmetadata-generation-failed[0m

[31m×[0m Encountered error while generating package metadata.
[31m╰─>[0m See above for output.

[1;35mnote[0m: This is an issue with the package mentioned above, not pip.
[1;36mhint[0m: See above for details.


In [9]:
import os
import json
import base64
import pathlib
import numpy as np
import pandas as pd
import tensorflow as tf

from absl import logging
logging.set_verbosity(logging.INFO)

from tfx import v1 as tfx
print("TensorFlow version:", tf.__version__)
print("TFX version       :", tfx.__version__)

ModuleNotFoundError: No module named 'tfx'

## 1) Download and inspect the dataset

The Forest Fires dataset is a small tabular dataset.  
It has both continuous features (e.g., `temp`, `wind`, `rain`) and categorical-like features (`month`, `day`).

We will:
1. download the CSV
2. load into a pandas DataFrame
3. check shape, columns, and a few sample rows

In [None]:
import requests

DATA_DIR = pathlib.Path("data")
RAW_DIR = DATA_DIR / "raw"
CSV_DIR = DATA_DIR / "csv"
TRAIN_DIR = CSV_DIR / "train"
TEST_DIR = CSV_DIR / "test"

for d in [RAW_DIR, TRAIN_DIR, TEST_DIR]:
    d.mkdir(parents=True, exist_ok=True)

csv_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/forest-fires/forestfires.csv"
names_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/forest-fires/forestfires.names"

csv_path = RAW_DIR / "forestfires.csv"
names_path = RAW_DIR / "forestfires.names"

if not csv_path.exists():
    r = requests.get(csv_url, timeout=60)
    r.raise_for_status()
    csv_path.write_bytes(r.content)

if not names_path.exists():
    r = requests.get(names_url, timeout=60)
    r.raise_for_status()
    names_path.write_bytes(r.content)

print("Saved:", csv_path)
print("Saved:", names_path)

In [None]:
df = pd.read_csv(csv_path)
display(df.head())
print("\nShape:", df.shape)
print("Columns:", list(df.columns))

### Quick checks

Before building any pipeline, it is useful to do small sanity checks:

- Are there missing values?
- What is the target distribution?
- Are the categorical columns formatted consistently?

In [None]:
print("Missing values per column:")
display(df.isna().sum())

print("\nTarget (area) summary:")
display(df["area"].describe())

print("\nUnique values (month):", sorted(df["month"].unique()))
print("Unique values (day)  :", sorted(df["day"].unique()))

## 2) Create a train/test split and store as CSV

The chapter prepares separate CSV files.  
This is mainly to make the pipeline input explicit (a folder with CSV files).

Later, `CsvExampleGen` will create the train/eval splits that the Trainer uses internally.

In [None]:
# 95% train, 5% test
train_df = df.sample(frac=0.95, random_state=42)
test_df = df.drop(train_df.index)

train_csv_path = TRAIN_DIR / "forestfires.csv"
test_csv_path = TEST_DIR / "forestfires.csv"

train_df.to_csv(train_csv_path, index=False)
test_df.to_csv(test_csv_path, index=False)

print("Train rows:", len(train_df), "->", train_csv_path)
print("Test  rows:", len(test_df),  "->", test_csv_path)

## 3) Create an Interactive TFX context

For learning and experimentation, **InteractiveContext** is convenient because we can run components step-by-step.

Two key paths:

- PIPELINE_ROOT: where artifacts are stored
- METADATA_PATH: a local ML Metadata (MLMD) SQLite database

In [None]:
PIPELINE_NAME = "forest-fires-tfx"
PIPELINE_ROOT = os.path.join("pipelines", PIPELINE_NAME)
METADATA_PATH = os.path.join("metadata", PIPELINE_NAME, "metadata.db")
SERVING_MODEL_DIR = os.path.join("serving_model", PIPELINE_NAME)

os.makedirs(PIPELINE_ROOT, exist_ok=True)
os.makedirs(os.path.dirname(METADATA_PATH), exist_ok=True)
os.makedirs(SERVING_MODEL_DIR, exist_ok=True)

context = tfx.orchestration.experimental.interactive.InteractiveContext(
    pipeline_root=PIPELINE_ROOT,
    metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH),
)

print("PIPELINE_ROOT     :", PIPELINE_ROOT)
print("METADATA_PATH     :", METADATA_PATH)
print("SERVING_MODEL_DIR :", SERVING_MODEL_DIR)

## 4) ExampleGen — ingest CSV and create Examples

`CsvExampleGen` reads input data and outputs an Examples artifact in TFRecord format.

Downstream components mainly work with Examples artifacts, not raw CSV files.

In [None]:
example_gen = tfx.components.CsvExampleGen(input_base=str(TRAIN_DIR))
context.run(example_gen)

In [None]:
context.show(example_gen.outputs["examples"])

## 5) StatisticsGen — compute dataset statistics

This step calculates summary statistics (counts, mean/std, histograms, etc.).
These statistics can be visualized and used for anomaly detection.

In [None]:
statistics_gen = tfx.components.StatisticsGen(examples=example_gen.outputs["examples"])
context.run(statistics_gen)

In [None]:
context.show(statistics_gen.outputs["statistics"])

## 6) SchemaGen — infer a data schema

SchemaGen uses the computed statistics to infer feature types and constraints.
The schema becomes a contract between training data and future incoming data.

In [None]:
schema_gen = tfx.components.SchemaGen(
    statistics=statistics_gen.outputs["statistics"],
    infer_feature_shape=False,
)
context.run(schema_gen)

In [None]:
context.show(schema_gen.outputs["schema"])

## 7) ExampleValidator — detect anomalies against the schema

ExampleValidator compares dataset statistics against the schema and reports anomalies.

In [None]:
example_validator = tfx.components.ExampleValidator(
    statistics=statistics_gen.outputs["statistics"],
    schema=schema_gen.outputs["schema"],
)
context.run(example_validator)

In [None]:
context.show(example_validator.outputs["anomalies"])

## 8) Transform — define feature engineering with tf.Transform

Transform ensures preprocessing is applied consistently in training and serving.

We will create:
- `forest_fires_constants.py`
- `forest_fires_transform.py` containing `preprocessing_fn(inputs)`

In [None]:
from pathlib import Path

constants_code = """
VOCAB_FEATURE_KEYS = ["day", "month"]
MAX_CATEGORICAL_FEATURE_VALUES = [7, 12]

DENSE_FLOAT_FEATURE_KEYS = ["DC", "DMC", "FFMC", "ISI", "rain", "temp", "wind", "X", "Y"]

BUCKET_FEATURE_KEYS = ["RH"]
BUCKET_FEATURE_BOUNDARIES = [[33.0, 66.0]]

LABEL_KEY = "area"

def transformed_name(key: str) -> str:
    return key + "_xf"
""".strip() + "\n"

Path("forest_fires_constants.py").write_text(constants_code)
print("Wrote forest_fires_constants.py")

In [None]:
transform_code = """
import tensorflow as tf
import tensorflow_transform as tft
import forest_fires_constants as const

_DENSE_FLOAT_FEATURE_KEYS = const.DENSE_FLOAT_FEATURE_KEYS
_VOCAB_FEATURE_KEYS = const.VOCAB_FEATURE_KEYS
_BUCKET_FEATURE_KEYS = const.BUCKET_FEATURE_KEYS
_BUCKET_FEATURE_BOUNDARIES = const.BUCKET_FEATURE_BOUNDARIES
_LABEL_KEY = const.LABEL_KEY

def _transformed_name(key: str) -> str:
    return const.transformed_name(key)

def _sparse_to_dense(x):
    if isinstance(x, tf.SparseTensor):
        x = tf.sparse.to_dense(x)
    return tf.squeeze(x, axis=1)

def preprocessing_fn(inputs):
    outputs = {}

    for key in _DENSE_FLOAT_FEATURE_KEYS:
        outputs[_transformed_name(key)] = tft.scale_to_z_score(_sparse_to_dense(inputs[key]))

    for key in _VOCAB_FEATURE_KEYS:
        outputs[_transformed_name(key)] = tft.compute_and_apply_vocabulary(
            _sparse_to_dense(inputs[key]),
            num_oov_buckets=1,
            vocab_filename=key,
        )

    for key, boundaries in zip(_BUCKET_FEATURE_KEYS, _BUCKET_FEATURE_BOUNDARIES):
        outputs[_transformed_name(key)] = tft.bucketize(_sparse_to_dense(inputs[key]), boundaries=boundaries)

    outputs[_transformed_name(_LABEL_KEY)] = tf.cast(_sparse_to_dense(inputs[_LABEL_KEY]), tf.float32)

    return outputs
""".strip() + "\n"

Path("forest_fires_transform.py").write_text(transform_code)
print("Wrote forest_fires_transform.py")

In [None]:
transform = tfx.components.Transform(
    examples=example_gen.outputs["examples"],
    schema=schema_gen.outputs["schema"],
    module_file=os.path.abspath("forest_fires_transform.py"),
)
context.run(transform)

In [None]:
context.show(transform.outputs["transform_graph"])
context.show(transform.outputs["transformed_examples"])

## 9) Trainer — train a regression model using transformed features

The Trainer component runs model-building code from a module file.

In [None]:
trainer_code = """
import os
from typing import List, Text

import tensorflow as tf
import tensorflow_transform as tft
from tfx_bsl.public import tfxio
from absl import logging

import forest_fires_constants as const

_DENSE_FLOAT_FEATURE_KEYS = const.DENSE_FLOAT_FEATURE_KEYS
_VOCAB_FEATURE_KEYS = const.VOCAB_FEATURE_KEYS
_MAX_CATEGORICAL_FEATURE_VALUES = const.MAX_CATEGORICAL_FEATURE_VALUES
_BUCKET_FEATURE_KEYS = const.BUCKET_FEATURE_KEYS
_BUCKET_FEATURE_BOUNDARIES = const.BUCKET_FEATURE_BOUNDARIES
_LABEL_KEY = const.LABEL_KEY

def _transformed_name(key: str) -> str:
    return const.transformed_name(key)

def _transformed_names(keys):
    return [_transformed_name(k) for k in keys]

def _input_fn(file_pattern: List[Text], data_accessor, tf_transform_output: tft.TFTransformOutput, batch_size: int = 64):
    dataset = data_accessor.tf_dataset_factory(
        file_pattern,
        tfxio.TensorFlowDatasetOptions(batch_size=batch_size, label_key=_transformed_name(_LABEL_KEY)),
        tf_transform_output.transformed_metadata.schema,
    )

    def _cast(features, label):
        for k in list(features.keys()):
            if features[k].dtype == tf.int64:
                features[k] = tf.cast(features[k], tf.int32)
        return features, label

    return dataset.map(_cast)

def _build_keras_model():
    numeric_cols = [tf.feature_column.numeric_column(k) for k in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS)]

    bucket_cols = []
    for key, boundaries in zip(_BUCKET_FEATURE_KEYS, _BUCKET_FEATURE_BOUNDARIES):
        num_buckets = len(boundaries) + 1
        cat_col = tf.feature_column.categorical_column_with_identity(_transformed_name(key), num_buckets=num_buckets)
        bucket_cols.append(tf.feature_column.indicator_column(cat_col))

    vocab_cols = []
    for key, max_val in zip(_VOCAB_FEATURE_KEYS, _MAX_CATEGORICAL_FEATURE_VALUES):
        cat_col = tf.feature_column.categorical_column_with_identity(_transformed_name(key), num_buckets=max_val + 1)
        vocab_cols.append(tf.feature_column.indicator_column(cat_col))

    feature_columns = numeric_cols + bucket_cols + vocab_cols

    inputs = {}
    for key in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS):
        inputs[key] = tf.keras.Input(shape=(1,), name=key, dtype=tf.float32)
    for key in _transformed_names(_VOCAB_FEATURE_KEYS + _BUCKET_FEATURE_KEYS):
        inputs[key] = tf.keras.Input(shape=(1,), name=key, dtype=tf.int32)

    x = tf.keras.layers.DenseFeatures(feature_columns)(inputs)
    x = tf.keras.layers.Dense(64, activation="relu")(x)
    x = tf.keras.layers.Dense(32, activation="relu")(x)
    outputs = tf.keras.layers.Dense(1, activation="linear")(x)

    model = tf.keras.Model(inputs=inputs, outputs=outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
        loss="mse",
        metrics=[tf.keras.metrics.MeanSquaredError(name="mse")],
    )
    return model

def _get_serve_tf_examples_fn(model, tf_transform_output):
    transform_layer = tf_transform_output.transform_features_layer()

    raw_feature_spec = tf_transform_output.raw_feature_spec()
    raw_feature_spec.pop(_LABEL_KEY)

    @tf.function
    def serve_tf_examples_fn(serialized_tf_examples):
        parsed_features = tf.io.parse_example(serialized_tf_examples, raw_feature_spec)
        transformed_features = transform_layer(parsed_features)

        model_inputs = {}
        for k, v in transformed_features.items():
            if k.endswith("_xf") and k != _transformed_name(_LABEL_KEY):
                if v.dtype == tf.int64:
                    v = tf.cast(v, tf.int32)
                model_inputs[k] = tf.expand_dims(v, -1) if len(v.shape) == 1 else v

        return {"predictions": model(model_inputs)}

    return serve_tf_examples_fn

def run_fn(fn_args):
    logging.info("Trainer run_fn started")

    tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
    train_dataset = _input_fn(fn_args.train_files, fn_args.data_accessor, tf_transform_output, batch_size=64)
    eval_dataset = _input_fn(fn_args.eval_files, fn_args.data_accessor, tf_transform_output, batch_size=64)

    model = _build_keras_model()
    model.summary(print_fn=logging.info)

    log_dir = os.path.join(fn_args.model_run_dir, "logs")
    os.makedirs(log_dir, exist_ok=True)
    callbacks = [
        tf.keras.callbacks.TensorBoard(log_dir=log_dir),
        tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=3, restore_best_weights=True),
    ]

    model.fit(
        train_dataset,
        steps_per_epoch=fn_args.train_steps,
        validation_data=eval_dataset,
        validation_steps=fn_args.eval_steps,
        epochs=20,
        callbacks=callbacks,
        verbose=1,
    )

    serving_fn = _get_serve_tf_examples_fn(model, tf_transform_output)
    signatures = {
        "serving_default": serving_fn.get_concrete_function(
            tf.TensorSpec(shape=[None], dtype=tf.string, name="examples")
        )
    }

    model.save(fn_args.serving_model_dir, save_format="tf", signatures=signatures)
    logging.info("Saved model to %s", fn_args.serving_model_dir)
""".strip() + "\n"

Path("forest_fires_trainer.py").write_text(trainer_code)
print("Wrote forest_fires_trainer.py")

### Train/Eval steps

TFX uses train/eval splits internally.  
Here we set explicit step counts using a simple approximation.

In [None]:
from tfx.proto import trainer_pb2

BATCH_SIZE = 64
n_rows = len(train_df)

train_steps = max(1, int((2.0/3.0) * n_rows / BATCH_SIZE))
eval_steps  = max(1, int((1.0/3.0) * n_rows / BATCH_SIZE))

print("Rows (train CSV):", n_rows)
print("Train steps     :", train_steps)
print("Eval steps      :", eval_steps)

In [None]:
trainer = tfx.components.Trainer(
    module_file=os.path.abspath("forest_fires_trainer.py"),
    examples=transform.outputs["transformed_examples"],
    transform_graph=transform.outputs["transform_graph"],
    schema=schema_gen.outputs["schema"],
    train_args=trainer_pb2.TrainArgs(num_steps=train_steps),
    eval_args=trainer_pb2.EvalArgs(num_steps=eval_steps),
)
context.run(trainer)

In [None]:
context.show(trainer.outputs["model"])

## 10) Resolver + Evaluator — compare candidate model against a baseline

Evaluator uses TFMA to compute metrics and (optionally) validate the model with thresholds.

In [None]:
import tensorflow_model_analysis as tfma
from google.protobuf import text_format

eval_config = text_format.Parse(
    r"""
    model_specs { label_key: "area" }
    slicing_specs {}
    slicing_specs { feature_keys: ["month"] }
    metrics_specs {
      metrics { class_name: "ExampleCount" }
      metrics {
        class_name: "MeanSquaredError"
        threshold {
          value_threshold { upper_bound { value: 300.0 } }
          change_threshold { direction: LOWER_IS_BETTER absolute { value: 0.0 } }
        }
      }
    }
    """,
    tfma.EvalConfig(),
)

eval_config

In [None]:
model_resolver = tfx.dsl.Resolver(
    strategy_class=tfx.dsl.experimental.LatestBlessedModelStrategy,
    model=tfx.dsl.Channel(type=tfx.types.standard_artifacts.Model),
    model_blessing=tfx.dsl.Channel(type=tfx.types.standard_artifacts.ModelBlessing),
).with_id("latest_blessed_model_resolver")

context.run(model_resolver)

In [None]:
evaluator = tfx.components.Evaluator(
    examples=example_gen.outputs["examples"],
    model=trainer.outputs["model"],
    baseline_model=model_resolver.outputs["model"],
    eval_config=eval_config,
)

context.run(evaluator)

In [None]:
context.show(evaluator.outputs["evaluation"])
context.show(evaluator.outputs.get("blessing"))

In [None]:
eval_result = tfma.load_eval_result(evaluator.outputs["evaluation"].get()[0].uri)
tfma.view.render_slicing_metrics(eval_result)

## 11) InfraValidator — check serving infrastructure (Docker-based TF Serving)

InfraValidator launches a serving environment and validates that the model can be loaded and queried.
This step depends on Docker availability.

In [None]:
from tfx.proto import infra_validator_pb2

infra_validator = tfx.components.InfraValidator(
    model=trainer.outputs["model"],
    examples=example_gen.outputs["examples"],
    serving_spec=infra_validator_pb2.ServingSpec(
        tensorflow_serving=infra_validator_pb2.TensorFlowServing(tags=["latest"]),
        local_docker=infra_validator_pb2.LocalDockerConfig(),
    ),
    validation_spec=infra_validator_pb2.ValidationSpec(
        max_loading_time_seconds=60,
        num_tries=3,
    ),
    request_spec=infra_validator_pb2.RequestSpec(
        tensorflow_serving=infra_validator_pb2.TensorFlowServingRequestSpec(),
        num_examples=1,
    ),
)

context.run(infra_validator)

In [None]:
context.show(infra_validator.outputs["blessing"])

## 12) Pusher — push the validated model to a serving directory

Pusher copies the model to a deployment destination only if it is blessed.

In [None]:
from tfx.proto import pusher_pb2

pusher = tfx.components.Pusher(
    model=trainer.outputs["model"],
    model_blessing=evaluator.outputs.get("blessing"),
    infra_blessing=infra_validator.outputs.get("blessing"),
    push_destination=pusher_pb2.PushDestination(
        filesystem=pusher_pb2.PushDestination.Filesystem(base_directory=SERVING_MODEL_DIR)
    ),
)

context.run(pusher)

In [None]:
context.show(pusher.outputs["pushed_model"])
print("Serving model directory:", SERVING_MODEL_DIR)

## 13) Serving the pushed model with TensorFlow Serving (Docker)

A typical TensorFlow Serving command (run where Docker is available):

```bash
docker run -p 8501:8501 \
  --mount type=bind,source=$(pwd)/serving_model/forest-fires-tfx,target=/models/forest-fires-tfx \
  -e MODEL_NAME=forest-fires-tfx \
  tensorflow/serving:latest
```

The model exported by this pipeline accepts serialized `tf.Example` records.  
The next cells build a sample `tf.Example` request and show the REST payload format.

In [None]:
def _bytes_feature(v: str):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[v.encode("utf-8")]))

def _float_feature(v: float):
    return tf.train.Feature(float_list=tf.train.FloatList(value=[float(v)]))

sample = test_df.iloc[0].to_dict()

features = {}
for k, v in sample.items():
    if k in ["month", "day"]:
        features[k] = _bytes_feature(str(v))
    else:
        features[k] = _float_feature(float(v))

example_proto = tf.train.Example(features=tf.train.Features(feature=features))
serialized = example_proto.SerializeToString()

b64 = base64.b64encode(serialized).decode("utf-8")
print("Base64 example length:", len(b64))

In [None]:
import requests

TF_SERVING_URL = "http://localhost:8501/v1/models/forest-fires-tfx:predict"

payload = {
    "signature_name": "serving_default",
    "instances": [{"b64": b64}],
}

print("POST ->", TF_SERVING_URL)
print("Payload keys:", list(payload.keys()))

# Uncomment if TF Serving is running
# r = requests.post(TF_SERVING_URL, json=payload, timeout=10)
# print("Status:", r.status_code)
# print("Response:", r.text[:500])

## Closing notes

This chapter shifts from “training in isolation” to a pipeline mindset:

- data checks and schema become part of the workflow
- preprocessing is exported and reused consistently
- evaluation can compare models over time using ML Metadata
- deployment is conditional on passing validations