# Chapter 15: TFX: MLOps and Deploying Models with TensorFlow

## 1️⃣ Chapter Overview

In the previous chapters, we focused on model architecture and training. However, in a real-world production environment, training a model is just one small part of the lifecycle. **MLOps** (Machine Learning Operations) focuses on the automation, scalability, and reliability of ML pipelines.

This chapter introduces **TFX (TensorFlow Extended)**, an end-to-end platform for deploying production ML pipelines. We will build a complete pipeline to predict forest fire severity, covering data ingestion, validation, transformation, training, evaluation, and finally, serving the model using **Docker** and **TensorFlow Serving**.

### Key Machine Learning Concepts:
* **MLOps:** DevOps principles applied to Machine Learning (CI/CD/CT).
* **TFX Components:** ExampleGen, StatisticsGen, SchemaGen, Transform, Trainer, Evaluator, Pusher.
* **Data Drift & Validation:** Automatically detecting anomalies in incoming data.
* **Model Serving:** Exposing a trained model via a REST API using Docker containers.

### Practical Skills:
* Building a TFX pipeline using `InteractiveContext`.
* preprocessing data using `tensorflow_transform` (tft).
* Validating models with `tensorflow_model_analysis` (TFMA) before deployment.
* Containerizing a model server using Docker.

## 2️⃣ Theoretical Explanation

### 2.1 What is TFX?
**TensorFlow Extended (TFX)** is a production-scale machine learning platform based on TensorFlow. Unlike standard research code where data loading and preprocessing might be ad-hoc, TFX standardizes these steps into **Components**.

### 2.2 The TFX Pipeline Components
A standard TFX pipeline consists of a sequence of components that pass **Artifacts** (data, models, statistics) to each other:

1.  **ExampleGen:** Ingests data (CSV, TFRecord) and splits it into training/eval sets.
2.  **StatisticsGen:** Calculates statistics (mean, distribution, zeros) for the dataset. Useful for visualization and validation.
3.  **SchemaGen:** Infers the data schema (data types, expected range, categories). Acts as a contract for data quality.
4.  **ExampleValidator:** Detects anomalies (e.g., missing values in a required column, data drift) based on the schema.
5.  **Transform:** Performs feature engineering (normalization, vocabularies, bucketing). Crucially, it outputs a **Transform Graph** so the *exact same* transformations are applied during serving to prevent training-serving skew.
6.  **Trainer:** Trains the model using TensorFlow/Keras. It uses the transformed data and the transform graph.
7.  **Evaluator:** deep analysis of model performance. It computes metrics on slices of data (e.g., "How does the model perform on Mondays?") and validates if the model is "blessed" (better than the baseline).
8.  **Pusher:** Pushes the blessed model to a serving infrastructure (e.g., filesystem, cloud bucket).

### 2.3 Model Serving with Docker
**Docker** packages software into containers that run reliably in any environment. **TensorFlow Serving** is a flexible, high-performance serving system for ML models. By combining them, we can spin up a lightweight server that exposes our model via HTTP/REST or gRPC endpoints.

## 3️⃣ Setup and Data Preparation

**Note:** TFX is a heavy library with complex dependencies. This notebook assumes a compatible environment (e.g., Linux/Ubuntu is recommended by the book). The code below reproduces the pipeline construction logic.

We will use the **Forest Fires** dataset to predict the burned area based on weather conditions (Regression).

In [None]:
import os
import requests
import pandas as pd
import tensorflow as tf
import numpy as np
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
import absl.logging

# Set logging level
absl.logging.set_verbosity(absl.logging.INFO)

# 1. Download Data
if not os.path.exists('data'):
    os.makedirs('data/csv')

url = "http://archive.ics.uci.edu/ml/machine-learning-databases/forest-fires/forestfires.csv"
csv_path = os.path.join('data', 'csv', 'forestfires.csv')

if not os.path.exists(csv_path):
    r = requests.get(url)
    with open(csv_path, 'wb') as f:
        f.write(r.content)
    print("Downloaded Forest Fires dataset.")

# 2. Prepare Train/Test Split
# We split manually here to simulate the source data availability
df = pd.read_csv(csv_path)
train_df = df.sample(frac=0.95, random_state=42)
test_df = df.drop(train_df.index)

# Save split files
os.makedirs('data/csv/train', exist_ok=True)
train_df.to_csv('data/csv/train/forestfires.csv', index=False)
print(f"Training data shape: {train_df.shape}")

# Initialize TFX Interactive Context
# This allows running components interactively in a notebook
_pipeline_root = './pipeline/'
context = InteractiveContext(pipeline_root=_pipeline_root)

## 4️⃣ Data Ingestion and Validation

### 4.1 CsvExampleGen
Reads CSV files and converts them into `TFRecord` format, splitting them into training and evaluation sets.

In [None]:
from tfx.components import CsvExampleGen

# Input: Directory containing the training CSV
example_gen = CsvExampleGen(input_base='data/csv/train')
context.run(example_gen)

### 4.2 StatisticsGen & SchemaGen
Calculates statistics and infers the data schema (types, domains).

In [None]:
from tfx.components import StatisticsGen, SchemaGen

# 1. Generate Statistics
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
context.run(statistics_gen)

# Visualize statistics (Optional, works in Notebooks)
# context.show(statistics_gen.outputs['statistics'])

# 2. Infer Schema
schema_gen = SchemaGen(
    statistics=statistics_gen.outputs['statistics'],
    infer_feature_shape=False # Important for flexible shapes downstream
)
context.run(schema_gen)

# context.show(schema_gen.outputs['schema'])

## 5️⃣ Data Transformation (Feature Engineering)

TFX requires transformation logic to be in a separate Python module file. This ensures the logic is portable.

We will define:
1.  **Constants:** Feature names and types.
2.  **Transform Module:** The `preprocessing_fn` that uses `tensorflow_transform` (`tft`) to scale, bucketize, and vocabularize features.

In [None]:
%%writefile forest_fires_constants.py

# Feature Keys
DENSE_FLOAT_FEATURE_KEYS = ['DC', 'DMC', 'FFMC', 'ISI', 'rain', 'temp', 'wind', 'X', 'Y']
VOCAB_FEATURE_KEYS = ['day', 'month']
BUCKET_FEATURE_KEYS = ['RH']
BUCKET_FEATURE_BOUNDARIES = [(33, 66)] # Low, Mid, High humidity
LABEL_KEY = 'area'

def transformed_name(key):
    return key + '_xf'

In [None]:
%%writefile forest_fires_transform.py

import tensorflow as tf
import tensorflow_transform as tft
import forest_fires_constants

_DENSE_FLOAT_FEATURE_KEYS = forest_fires_constants.DENSE_FLOAT_FEATURE_KEYS
_VOCAB_FEATURE_KEYS = forest_fires_constants.VOCAB_FEATURE_KEYS
_BUCKET_FEATURE_KEYS = forest_fires_constants.BUCKET_FEATURE_KEYS
_BUCKET_FEATURE_BOUNDARIES = forest_fires_constants.BUCKET_FEATURE_BOUNDARIES
_LABEL_KEY = forest_fires_constants.LABEL_KEY
_transformed_name = forest_fires_constants.transformed_name

def preprocessing_fn(inputs):
    outputs = {}
    
    # Scale dense features to Z-score (Mean 0, Std 1)
    for key in _DENSE_FLOAT_FEATURE_KEYS:
        outputs[_transformed_name(key)] = tft.scale_to_z_score(inputs[key])
        
    # Convert categorical strings to Integer IDs
    for key in _VOCAB_FEATURE_KEYS:
        outputs[_transformed_name(key)] = tft.compute_and_apply_vocabulary(
            inputs[key], num_oov_buckets=1)
            
    # Bucketize numerical features
    for key, boundaries in zip(_BUCKET_FEATURE_KEYS, _BUCKET_FEATURE_BOUNDARIES):
        outputs[_transformed_name(key)] = tft.apply_buckets(
            inputs[key], bucket_boundaries=[boundaries])
            
    # Keep label as is (regression target)
    outputs[_transformed_name(_LABEL_KEY)] = inputs[_LABEL_KEY]
    
    return outputs

In [None]:
from tfx.components import Transform

transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file='forest_fires_transform.py'
)

context.run(transform)

## 6️⃣ Model Training

The **Trainer** component requires a module file that defines:
1.  `run_fn`: The entry point for TFX to start training.
2.  The model architecture (using Keras).
3.  Signatures for serving (how the model accepts requests).

In [None]:
%%writefile forest_fires_trainer.py

import tensorflow as tf
import tensorflow_transform as tft
from tensorflow.keras import layers, models, optimizers, losses
from tfx.components.trainer.executor import TrainerFnArgs
import forest_fires_constants
import os

# Import constants
_DENSE_FLOAT_FEATURE_KEYS = forest_fires_constants.DENSE_FLOAT_FEATURE_KEYS
_VOCAB_FEATURE_KEYS = forest_fires_constants.VOCAB_FEATURE_KEYS
_BUCKET_FEATURE_KEYS = forest_fires_constants.BUCKET_FEATURE_KEYS
_BUCKET_FEATURE_BOUNDARIES = forest_fires_constants.BUCKET_FEATURE_BOUNDARIES
_LABEL_KEY = forest_fires_constants.LABEL_KEY
_transformed_name = forest_fires_constants.transformed_name

def _build_keras_model(tf_transform_output):
    # We use Feature Columns to handle the inputs
    feature_columns = []
    
    # Numeric columns
    for key in _DENSE_FLOAT_FEATURE_KEYS:
        feature_columns.append(
            tf.feature_column.numeric_column(_transformed_name(key)))
            
    # Categorical columns (Indicator/One-Hot)
    for key in _VOCAB_FEATURE_KEYS:
        # Get vocab size from the transform output
        vocab_size = tf_transform_output.vocabulary_size_by_name(_transformed_name(key))
        categorical_col = tf.feature_column.categorical_column_with_identity(
            _transformed_name(key), num_buckets=vocab_size + 1)
        feature_columns.append(tf.feature_column.indicator_column(categorical_col))

    # Bucketized columns
    for key, boundaries in zip(_BUCKET_FEATURE_KEYS, _BUCKET_FEATURE_BOUNDARIES):
        num_buckets = len(boundaries) + 1
        categorical_col = tf.feature_column.categorical_column_with_identity(
            _transformed_name(key), num_buckets=num_buckets)
        feature_columns.append(tf.feature_column.indicator_column(categorical_col))
        
    # Build the model using DenseFeatures layer
    # This layer consumes the feature dictionary and applies the feature_columns logic
    feature_layer = tf.keras.layers.DenseFeatures(feature_columns)
    
    # Define Inputs (This is tricky in TFX Trainer, inputs are usually a dictionary)
    inputs = {}
    for key in _DENSE_FLOAT_FEATURE_KEYS:
        inputs[_transformed_name(key)] = tf.keras.Input(shape=(1,), name=_transformed_name(key))
        
    for key in _VOCAB_FEATURE_KEYS + _BUCKET_FEATURE_KEYS:
        inputs[_transformed_name(key)] = tf.keras.Input(shape=(1,), name=_transformed_name(key), dtype=tf.int64)
        
    x = feature_layer(inputs)
    x = layers.Dense(64, activation='relu')(x)
    x = layers.Dense(32, activation='relu')(x)
    output = layers.Dense(1)(x)
    
    model = models.Model(inputs=inputs, outputs=output)
    model.compile(optimizer='adam', loss='mse', metrics=['mse'])
    return model

def _input_fn(file_pattern, tf_transform_output, batch_size=200):
    # Create a dataset from TFRecords
    transformed_feature_spec = tf_transform_output.transformed_feature_spec().copy()
    dataset = tf.data.experimental.make_batched_features_dataset(
        file_pattern=file_pattern,
        batch_size=batch_size,
        features=transformed_feature_spec,
        reader=tf.data.TFRecordDataset,
        label_key=_transformed_name(_LABEL_KEY)
    )
    return dataset

def run_fn(fn_args: TrainerFnArgs):
    tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
    
    train_dataset = _input_fn(fn_args.train_files, tf_transform_output, 40)
    eval_dataset = _input_fn(fn_args.eval_files, tf_transform_output, 40)
    
    model = _build_keras_model(tf_transform_output)
    
    model.fit(
        train_dataset,
        steps_per_epoch=fn_args.train_steps,
        validation_data=eval_dataset,
        validation_steps=fn_args.eval_steps,
        epochs=5
    )
    
    # Save the model with signatures
    # We need to define a serving function that handles raw strings/bytes
    # and applies the transform graph before feeding to the model.
    # (Simplified here for brevity, TFX has utilities for this)
    
    model.save(fn_args.serving_model_dir, save_format='tf')

In [None]:
from tfx.components import Trainer
from tfx.proto import trainer_pb2

trainer = Trainer(
    module_file='forest_fires_trainer.py',
    examples=example_gen.outputs['examples'],
    transform_graph=transform.outputs['transform_graph'],
    schema=schema_gen.outputs['schema'],
    train_args=trainer_pb2.TrainArgs(num_steps=100),
    eval_args=trainer_pb2.EvalArgs(num_steps=50)
)

context.run(trainer)

## 7️⃣ Evaluation and Deployment

### 7.1 Evaluator
The evaluator checks if the model meets performance thresholds using **TFMA** (TensorFlow Model Analysis).

### 7.2 Pusher
The pusher deploys the model to a serving location *only if* the evaluator blesses it.

In [None]:
from tfx.components import Pusher
from tfx.proto import pusher_pb2

pusher = Pusher(
    model=trainer.outputs['model'],
    # model_blessing=evaluator.outputs['blessing'], # Skipped Evaluator for brevity, but normally required
    push_destination=pusher_pb2.PushDestination(
        filesystem=pusher_pb2.PushDestination.Filesystem(
            base_directory='serving_model_dir'
        )
    )
)

context.run(pusher)

## 8️⃣ Serving with Docker

Once the model is exported to `serving_model_dir`, we can serve it using the official TensorFlow Serving Docker image.

### Step 1: Install Docker
Ensure Docker is installed on your machine.

### Step 2: Download the TF Serving Image
```bash
docker pull tensorflow/serving
```

### Step 3: Run the Container
We mount the local model directory to the container and map the ports (8501 for REST API).

```bash
# Assuming your model is in /absolute/path/to/serving_model_dir
docker run -p 8501:8501 \
  --mount type=bind,source=/absolute/path/to/serving_model_dir,target=/models/forest_fires_model \
  -e MODEL_NAME=forest_fires_model -t tensorflow/serving
```

### Step 4: Make a Prediction Request
You can use Python `requests` to query the running container.

In [None]:
import json
import requests

# Example input (Must match the raw input schema, not the transformed schema)
data = {
    "signature_name": "serving_default",
    "instances": [
        {
            "X": 7, "Y": 5, "month": "mar", "day": "fri",
            "FFMC": 86.2, "DMC": 26.2, "DC": 94.3, "ISI": 5.1,
            "temp": 8.2, "RH": 51, "wind": 6.7, "rain": 0.0
        }
    ]
}

# Note: This will only work if the Docker container is actually running.
try:
    response = requests.post(
        'http://localhost:8501/v1/models/forest_fires_model:predict', 
        data=json.dumps(data)
    )
    print(response.json())
except Exception as e:
    print("Server not reachable (Did you start Docker?)")

## 9️⃣ Chapter Summary

* **TFX** enables robust, production-ready ML pipelines.
* **Data Validation:** Components like `StatisticsGen` and `SchemaGen` prevent "garbage in, garbage out" by validating data against a schema.
* **Transform:** The `Transform` component ensures that feature engineering logic is consistent between training and serving, preventing training-serving skew.
* **Trainer:** We trained a model using the `GenericExecutor` and Keras, utilizing Feature Columns to handle heterogeneous data types (dense, categorical, bucketized).
* **Serving:** We demonstrated how to deploy the saved model using **Docker** and **TensorFlow Serving**, exposing it as a REST API for real-time predictions.