# TFX Pipeline API Demonstration

This notebook demonstrates the **native TFX API** and our **custom wrapper layer** for house price prediction.

**Note:** "API" refers to the TFX tool's internal programming interface, not an external data provider.

## Part 1: Native TFX API

Using TFX components directly.

In [1]:
# Setup Python path
import sys
import os

# Add project root to path
project_root = os.path.abspath(os.path.join(os.getcwd(), '..'))
if project_root not in sys.path:
    sys.path.insert(0, project_root)

# Import TFX components
from tfx.components import CsvExampleGen, SchemaGen, Transform, Trainer, Evaluator, Pusher
from tfx.orchestration import pipeline
from tfx.orchestration.local.local_dag_runner import LocalDagRunner
from tfx.proto import trainer_pb2, pusher_pb2

print("Native TFX components imported successfully")

Native TFX components imported successfully


### Creating TFX Components

Each component has a specific interface:

In [2]:
# 1. ExampleGen - Data ingestion
example_gen = CsvExampleGen(input_base='../data')
print(f"ExampleGen created: {type(example_gen).__name__}")

# 2. StatisticsGen - Generate statistics from examples
from tfx.components import StatisticsGen
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
print(f"StatisticsGen created: {type(statistics_gen).__name__}")

# 3. SchemaGen - Schema validation using statistics
schema_gen = SchemaGen(
    statistics=statistics_gen.outputs['statistics'],
    infer_feature_shape=True
)
print(f"SchemaGen created: {type(schema_gen).__name__}")

# 4. Transform - Feature engineering
transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file='../utils/feature_engineering.py'
)
print(f"Transform created: {type(transform).__name__}")

print("\nAll TFX components created successfully!")

ExampleGen created: CsvExampleGen
StatisticsGen created: StatisticsGen
SchemaGen created: SchemaGen
Transform created: Transform

All TFX components created successfully!


### Running Native TFX Pipeline

In [3]:
# Create pipeline using native API
from pipelines.house_price_pipeline import create_pipeline
from utils import config

tfx_pipeline = create_pipeline(
    pipeline_name='house_price_prediction',
    pipeline_root='../pipeline_outputs',
    data_path='../data',  # Note: data_path not data_root
    transform_module_file='../utils/feature_engineering.py',
    trainer_module_file='../utils/sklearn_trainer.py',
    metadata_path=config.METADATA_PATH,
    serving_model_dir=str(config.SERVING_MODEL_DIR)
)

print(f"Pipeline created with {len(tfx_pipeline.components)} components")
print("Component names:", [c.id for c in tfx_pipeline.components])

Pipeline created with 7 components
Component names: ['CsvExampleGen', 'StatisticsGen', 'SchemaGen', 'Transform', 'Trainer', 'Evaluator', 'Pusher']


## Part 2: Wrapper Layer

Our simplified wrapper provides a cleaner interface.

In [4]:
# Import wrapper classes (project root already in path from earlier cell)
from utils.tfx_pipeline_utils import (
    TFXPipelineWrapper,
    ModelComparisonWrapper,
    DataPipelineWrapper
)

print("Wrapper classes imported successfully")

Wrapper classes imported successfully


### 1. TFXPipelineWrapper - Simplified Pipeline Operations

In [5]:
# Initialize wrapper
wrapper = TFXPipelineWrapper(
    pipeline_name='house_price_prediction',
    pipeline_root='./pipeline_outputs',
    model_dir='./models'
)

print("TFXPipelineWrapper initialized")
print(f"Pipeline name: {wrapper.pipeline_name}")
print(f"Pipeline root: {wrapper.pipeline_root}")
print(f"Model directory: {wrapper.model_dir}")

TFXPipelineWrapper initialized
Pipeline name: house_price_prediction
Pipeline root: /app/notebooks/pipeline_outputs
Model directory: /app/notebooks/models


In [6]:
# Get latest deployed model
model_path = wrapper.get_latest_model_path()
if model_path:
    print(f"Latest model found at: {model_path}")
    
    # Load the model
    model = wrapper.load_model(model_path)
    print(f"Model type: {type(model).__name__}")
else:
    print("No model found yet. Run pipeline first.")

No model found yet. Run pipeline first.


### 2. ModelComparisonWrapper - Simplified Model Comparison

In [7]:
# Initialize comparator
comparator = ModelComparisonWrapper(output_dir='./models/comparison')

# Load existing comparison results
try:
    results = comparator.load_results()
    
    print("Model Comparison Results:\n")
    print(f"{'Model':<20} {'CV RMSE':<12} {'Training Time':<15}")
    print("="*50)
    
    for model_name, metrics in results['comparison']['models'].items():
        print(f"{model_name:<20} {metrics['cv_mean_rmse']:<12.4f} {metrics['training_time']:<15.2f}s")
    
    best_model = comparator.get_best_model_name()
    print(f"\nBest Model: {best_model}")
    
except FileNotFoundError:
    print("No comparison results found. Run model comparison first.")

No comparison results found. Run model comparison first.


### 3. DataPipelineWrapper - Simplified Data Loading

In [8]:
# Initialize data loader
data_loader = DataPipelineWrapper(data_root='../data')

# Load data
train_df = data_loader.load_training_data()
test_df = data_loader.load_test_data()

print(f"Training data shape: {train_df.shape}")
print(f"Test data shape: {test_df.shape}")
print(f"\nFirst few columns: {list(train_df.columns[:5])}")
print(f"Target column: SalePrice")
print(f"\nSalePrice statistics:\n{train_df['SalePrice'].describe()}")

Training data shape: (1460, 81)
Test data shape: (1459, 80)

First few columns: ['Id', 'MSSubClass', 'MSZoning', 'LotFrontage', 'LotArea']
Target column: SalePrice

SalePrice statistics:
count      1460.000000
mean     180921.195890
std       79442.502883
min       34900.000000
25%      129975.000000
50%      163000.000000
75%      214000.000000
max      755000.000000
Name: SalePrice, dtype: float64


## Comparison: Native vs Wrapper

### Native TFX API (Complex)

```python
# Many imports
from tfx.components import CsvExampleGen, SchemaGen, Transform, ...
from tfx.orchestration import pipeline
from tfx.orchestration.local.local_dag_runner import LocalDagRunner

# Create components
example_gen = CsvExampleGen(input_base=data_root)
schema_gen = SchemaGen(...)
transform = Transform(...)
trainer = Trainer(...)
evaluator = Evaluator(...)
pusher = Pusher(...)

# Create pipeline
tfx_pipeline = pipeline.Pipeline(
    pipeline_name='...',
    components=[example_gen, schema_gen, ...]
)

# Run pipeline
LocalDagRunner().run(tfx_pipeline)

# Load model (complex)
import tensorflow as tf
model = tf.saved_model.load(path)
```

### Wrapper Layer (Simple)

```python
# Single import
from utils.tfx_pipeline_utils import TFXPipelineWrapper

# Initialize
wrapper = TFXPipelineWrapper()

# Run pipeline
wrapper.run_pipeline()

# Load model
model = wrapper.load_model()
```

**Result:** 80% less code with the wrapper!

## Conclusion

This notebook demonstrated:

1. **Native TFX API** - Powerful but complex
   - 6 components to create and wire together
   - Multiple imports required
   - Verbose configuration

2. **Wrapper Layer** - Simple and intuitive
   - 3 main classes: TFXPipelineWrapper, ModelComparisonWrapper, DataPipelineWrapper
   - Clean, minimal interface
   - Same functionality, less code

**Best Practice:** Use the wrapper layer for rapid development, but understand the native TFX API for advanced customization.