# Install Requirements

In [1]:
# %cd drive/My\ Drive/Building\ ML\ Pipelines/
# !pip install -r requirements.txt
# %cd ..
# %cd ..

# Chapter 11: Pipelines Part 1: Apache Beam and Apache Airflow

In this and the next chapter, we will put all the components together and show how to run the full pipeline with three orchestrators:
 - Apache Beam
 - Apache Airflow
 - Kubeflow

In [None]:
import tensorflow as tf

base_dir = "drive/My Drive/Building ML Pipelines/"
chap_dir = base_dir + "Chapter 11/"
data_dir = base_dir + "Data/"
out_dir = chap_dir + "Outputs/"
csv_data_dir = base_dir + "CSV Data/"
csv_dir = csv_data_dir + "consumer_complaints_with_narrative.csv"
eval_data_dir = base_dir + "Chapter 6/Outputs/CsvExampleGen/examples/1/eval/data_tfrecord-00000-of-00001.gz"

## Which Orchestration Tool to Choose?

You need to pick only one Tool to run the pipeline.

### Apache Beam

If your are looking for a minimal installation, reusing Beam to orchestrate is a logical choice. It is easy to set up and also allows you to use any existing distributed data processing infrastructure you might already be familiar with (e.g. Google Cloud Dataflow). It could also be used as an intermediate step to ensure the correctness of the pipeline.<br>
However, Apache Beam is missing a variety of tools for scheduling your model updates or monitoring the process of a pipeline job, that's where the other two tools shine.

### Apache Airflow

It is often used in companies for data-loading tasks. If you use Apache Airflow in combinatino with a production-ready database like PostgreSQL, you can take advantage of executing partial pipelines.

### Kubeflow Pipelines

If you already have experience with Kubernetes and access to a Kubernetes cluster, it makes sense to consider Kubeflow Pipelines. While the setup is more complicated, it opens up a variety of new opportunities, including the ability to view TFDV and TFMA visualizations, model lineage and the artifact collections. It is further an excellent infrastructure platform to deploy machine learning models. Inference routing through the tool Istio is currently state of the art in the field of machine learning infrastructure.<br>
Setting up Kubernetes with a variety of cloud providers is also possible, which makes it very efficient and scalable.

### Kubeflow Pipelines on AI Platform

Kubeflow pipelines can run on Google's AI Platform, which is part of GCP. This takes care of much of the infrastructure and lets you load data more easily from Google Cloud Storage buckets.

## Converting Your Interactive TFX Pipeline to a Production Pipeline

In order to automate our pipelines, we will need to write a Python script that will run all these components without any input from us.<br>
Fortunately, we already have all the pieces of this script, here is a summary:
 - ExampleGen (Chapter 3)
 - StatisticsGen (Chapter 4)
 - SchemaGen (Chapter 4)
 - ExampleValidator (Chapter 4)
 - Transform (Chapter 5)
 - Trainer (chapter 6)
 - Resolver (Chapter 7)
 - Evaluator (Chapter 7)
 - Pusher (Chapter 7)

Here is the full base pipeline:

In [None]:
import tensorflow_model_analysis as tfma

from tfx.components import CsvExampleGen, Evaluator, ExampleValidator, Pusher, ResolverNode, SchemaGen, StatisticsGen, Trainer, Transform
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.proto import pusher_pb2, trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model, ModelBlessing
from tfx.utils.dsl_utils import external_input

In [None]:
def init_components(csv_data_dir, module_file, serving_model_dir,
                   training_steps=2000, eval_steps=200):
    
    
    
    examples = external_input(csv_data_dir)
    
    example_gen = CsvExampleGen(input=examples)
    
    statistics_gen = StatisticsGen(
        examples=example_gen.outputs['examples']
    )
    
    schema_gen = SchemaGen(
        statistics=statistics_gen.outputs['statistics'],
        infer_feature_shape=True
    )
    
    example_validator = ExampleValidator(
        statistics=statistics_gen.outputs['statistics'],
        schema=schema_gen.outputs['schema']
    )

    transform = Transform(
        examples=example_gen.outputs['examples'],
        schema=schema_gen.outputs['schema'],
        module_file=module_file
    )
    
    trainer = Trainer(
        module_file=module_file,
        # Override the executor to load the run_fn() function
        # custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
        examples=transform.outputs['transformed_examples'],
        schema=schema_gen.outputs['schema'],
        transform_graph=transform.outputs['transform_graph'],
        train_args=trainer_pb2.TrainArgs(num_steps=training_steps),
        eval_args=trainer_pb2.EvalArgs(num_steps=eval_steps)
    )

    model_resolver = ResolverNode(
        instance_name="latest_blessed_model_resolver",
        resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
        model=Channel(type=Model),
        model_blessing=Channel(type=ModelBlessing)
    )
           
    eval_config = tfma.EvalConfig(
        model_specs=[tfma.ModelSpec(label_key="consumer_disputed")],
        slicing_specs=[
            tfma.SlicingSpec(),
            tfma.SlicingSpec(feature_keys=["product"]),
        ],
        metrics_specs=[
            tfma.MetricsSpec(
                metrics=[
                    tfma.MetricConfig(class_name="BinaryAccuracy"),
                    tfma.MetricConfig(class_name="ExampleCount"),
                    tfma.MetricConfig(class_name="AUC"),
                    tfma.MetricConfig(class_name='Precision'),
                    tfma.MetricConfig(class_name='Recall')
                ],
                thresholds={
                    "AUC": tfma.config.MetricThreshold(
                        value_threshold=tfma.GenericValueThreshold(
                            lower_bound={"value": 0.65}
                        ),
                        change_threshold=tfma.GenericChangeThreshold(
                            direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                            absolute={"value": 0.01},
                        ),
                    )
                },
            )
        ],
    )


    evaluator = Evaluator(
        examples=example_gen.outputs["examples"],
        model=trainer.outputs["model"],
        baseline_model=model_resolver.outputs["model"],
        eval_config=eval_config
    )

    pusher = Pusher(
        model=trainer.outputs["model"],
        model_blessing=evaluator.outputs["blessing"],
        push_destination=pusher_pb2.PushDestination(
            filesystem=pusher_pb2.PushDestination.Filesystem(
                base_directory=serving_model_dir
            )
        )
    )

    components = [
                  example_gen,
                  statistics_gen,
                  schema_gen,
                  example_validator,
                  transform,
                  trainer,
                  model_resolver,
                  evaluator,
                  pusher
    ]

    return components

In the example project, we have split the component instantiation from the pipeline configuration to focus on the pipeline setup for the different orchestrators.<br>
The *init_components* function instantiates the components, it requires three inputs in addition to the number of training steps and evaluation steps:
 - data_dir
 - module_file
 - serving_model_dir

Besides the minor tweaks to the Google Cloud setup we will discuss in the next Chapter, the component setup will be identical for each orchestration platform.

## Simple Interactive Pipeline Conversion for Beam and Airflow

You can convert a notebook to a pipeline via the following steps.<br>
For any cells in your notebook that you do not want to export, use %%skip_for_export Jupyter magic command at the start of each cell.<br>
1. Set the pipeline name and the orchestration tool.

In [None]:
# Alternative "airflow"
runner_type = "beam"
pipeline_name = "consumer_complaints_beam"

2. Set all the relevant file paths:

In [None]:
import os

In [None]:
notebook_file = chap_dir + "Chapter 11: Pipelines Part 1: Apache Beam and Apache Airflow.ipynb"

# Pipeline inputs
# Directories are all defined at the top of the notebook
requirements_file = os.path.join(base_dir, "requirements.txt")
module_file = base_dir + "module.py"
serving_model_dir = out_dir + "serving_model_dir/"

# Pipeline outputs
output_base = os.path.join(out_dir, pipeline_name)
serving_model_dir = os.path.join(output_base, pipeline_name)
pipeline_root = os.path.join(output_base, "pipeline_root")
metadata_path = os.path.join(pipeline_root, "metadata.sqlite")

3. List the components you wish to include in your pipeline

In [None]:
components = [
              example_gen,
              statistics_gen,
              schema_gen,
              example_validator,
              transform,
              trainer,
            #   model_resolver,
              evaluator,
              pusher
    ]

4. Pipeline export

In [None]:
pipeline_export_file = "consumer_complaints_beam_export.py"
context.export_to_pipeline(
    notebook_filepath=notebook_file,
    export_filepath=pipeline_export_file,
    runner_type=runner_type
)

This export command will generate a script that ca be run using Beam or Airflow depending on the *runner_type* you choose.

## Introduction to Apache Beam

It this section, we will show how to orchestrate our example project using Beam.

### Orchestrating TFX Pipelines with Apache Beam

Next to things mentioned in the beginning Beam can also be used to debugy our ML pipeline. By using Beam during your pipeline debugging and the nmoving to Airflow or Kubeflow Pipelines, you can rule out root causes of pipeline errors coming from the more complex Airflow or Kubeflow Pipeline setups.

In [None]:
import absl
from tfx.orchestration import metadata, pipeline

In [None]:
def init_beam_pipeline(components, pipeline_root, direct_num_workers):
    absl.logging.info("Pipeline root set to: {}".format(pipeline_root))

    beam_arg = [
        # Beam lets you specify the number of workers
        # A sensible default is half the numbers of avaiable CPUs if there is more tahn one CPU
        "--direct_num_workers={}".format(direct_num_workers),
        "--requirements_file={}".format(requirements_file)
    ]

    # This is where you define your pipeline object with a configuratio
    p = pipeline.Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=pipeline_root,
        components=components,
        enable_cache=False, # We can set the cache to True if we would like to avoid rerunning components that have already finished
        metadata_connection_config=metadata.sqlite_metadata_connection_config(metadata_path),
        metadata_pipeline_args=beam_arg
    )

    return p

In [None]:
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner

In [2]:
direct_num_workers = int(os.cpu_count() / 2)
direct_num_workers = 1 if direct_num_workers < 1 else direct_num_workers

components = init_components(csv_data_dir, module_file, serving_model_dir,
                            training_steps=1000, eval_steps=100)
# print(components)
pipe_line = init_beam_pipeline(components, pipeline_root, direct_num_workers)
BeamDagRunner().run(pipe_line)

This is a minimal setup that you can easily integrate with the rest of your infrastructure or example using a cron job. You can also scale this pipeline using <a href="https://flink.apache.org/">Apache Flink</a> like in this <a href="https://github.com/tensorflow/tfx/blob/master/tfx/examples/chicago_taxi_pipeline/taxi_pipeline_portable_beam.py">TFX Example</a> or <a href="https://spark.apache.org/">Spark</a>.

## Introduction to Apache Airflow

Airflow is Apache's project for workflow automation. Airflow lets you represent workflow tasks thorugh DAGs (Direct Acylic Graphs) represented via Python code. Airflow also lets you schedule and monitor workflows.

### Installation and Initial Setup

In [None]:
# !pip install apache-airflow

For a complete list of Airflow extensions and how to install them look into the <a href="">Airflow documentation</a>.<br>
With airflow now installed, you need to create an initial database where all the task status infromation will be stored:

In [None]:
!airflow initdb

  """)
DB: sqlite:////root/airflow/airflow.db
[2020-10-09 10:14:46,564] {db.py:378} INFO - Creating tables
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
WARNI [airflow.models.crypto] cryptography not found - values will not be stored encrypted.
Done.


Airflow will initiate and SQLite database if not configurations have been changed. This setup works for demo purposes and run smaller projects, but for larger projects look in the <a href="https://airflow.apache.org/docs/stable/howto/index.html">Apache Airflow documentation</a>.<br>
A minimal Airflow setup consists of the Airflow scheduler, which coordinates the tasks and task dependencies, as well as a web server, which provides a UI to start, stop and monitor the tasks.

In [None]:
!airflow scheduler

  """)
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2020-10-09 09:24:03,359] {__init__.py:50} INFO - Using executor SequentialExecutor
[2020-10-09 09:24:03,373] {scheduler_job.py:1367} INFO - Starting the scheduler
[2020-10-09 09:24:03,373] {scheduler_job.py:1375} INFO - Running execute loop for -1 seconds
[2020-10-09 09:24:03,373] {scheduler_job.py:1376} INFO - Processing each file at most -1 times
[2020-10-09 09:24:03,373] {scheduler_job.py:1379} INFO - Searching for files in /root/airflow/dags
[2020-10-09 09:24:03,376] {scheduler_job.py:1381} INFO - There are 25 files in /root/airflow/dags
[2020-10-09 09:24:03,377] {scheduler_job.py:1438} INFO - Resetting orphaned tasks for active dag runs
[2020-10-09 09:24:03,386] {dag_processing.py:562} INFO - Launched DagFileProcessorManager with pid: 2939
[

KeyboardInterrupt: ignored

In [None]:
# Start the Airflow web server
# The command argument -p sets the port where your web browser can access the Airflow interface.
!airflow webserver -p 8081

  """)
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2020-10-09 09:40:28,707] {__init__.py:50} INFO - Using executor SequentialExecutor
[2020-10-09 09:40:28,707] {dagbag.py:417} INFO - Filling up the DagBag from /root/airflow/dags
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8081
Timeout: 120
Logfiles: - -
[2020-10-09 09:40:29 +0000] [3750] [INFO] Starting gunicorn 20.0.4
[2020-10-09 09:40:29 +0000] [3750] [INFO] Listening at: http://0.0.0.0:8081 (3750)
[2020-10-09 09:40:29 +0000] [3750] [INFO] Using worker: sync
[2020-10-09 09:40:29 +0000] [3754] [INFO] Booting worker with pid: 3754
[2020-10-09 09:40:29 +0000] [3755] [INFO] Booting worker with pid: 3755
[2020-10-09 09:40:29 +0000] [3756] [INFO] Booting worker with pid: 3756
[2020-10-09 09:40:29 +0000] [3757] [INFO] Booting worke

Go to: http://127.0.0.1:8081 and you should see an interface.

**Airflow Configuration**
The default settings of Airflow can be overwritten by chaning the relevant parameters in the Airflow configuration. The new settings have to be defined in ~/airflow/airflow.cfg

### Basic Airflow Example

Here we will not include any TFX scripts. Workflow pipelines are defined as Python scripts and Airflow expects the DAG definitions to be located in ~/airflow/dags. A basic pipeline consists of:
 - Project-Specific Configurations
 - Task Definitions
 - Definition of the Task Dependencies

#### Project-Specific Configurations

In [None]:
from airflow import DAG
from datetime import datetime, timedelta

In [None]:
# Location to define the project configuration
project_cfg = {
    "owner": "airflow",
    "email": ["jankezmann@t-online.de"],
    "email_on_failure": True,
    "start_date": datetime(2019, 8, 1),
    "retries": 1,
    "retry_delay": timedelta(hours=1)
}

# The DAG object will be picked up by Airflow
dag = DAG(
    "basic_pipeline",
    default_args=project_cfg,
    schedule_interval=timedelta(days=1)
)

#### Task Definitions

In [None]:
from airflow.operators.python_operator import PythonOperator

In [None]:
def example_task(_id, **kwargs):
    print("task {}".format(_id))
    return "completed task {}".format(id)

task_1 = PythonOperator(
    task_id="task_1",
    provide_context=True,
    python_callable=example_task,
    op_kwargs={"_id": 1},
    dag=dag,
)


task_2 = PythonOperator(
    task_id="task_2",
    provide_context=True,
    python_callable=example_task,
    op_kwargs={"_id": 2},
    dag=dag,
)

In a TFX pipeline, you do not need to define these tasks because the TFX library takes care of it for you.

#### Task dependencies

Lets assume task_2 depends on task_1.

In [None]:
task_1.set_downstream(task_2)

Airflow also offer as bit-shift operator to denote the task dependencies:

In [None]:
task_1 >> task_2

<Task(PythonOperator): task_2>

#### Putting it all together

In your DAG folder in your AIRFLOW_HOME path, usually at ~/airflow/dags, create a new file basic_pipeline.py: See details pages 332f.<br>
You can then test the pipeline setup by executing this command in your terminal

```
python ~/airflow/dags/basic_pipeline.py
```

The log file can be found at:
```
~/airflow/logs/NAME OF YOUR PIPELINE/TASK NAME/EXECTUION TIME/
```

If we want to inspect the result of the first task from our bsic pipeline, we have to investigate the log file:
```
cat ../logs/basic_pipeline/task_1/.../1.log
```

To test whether Airflow recognized the new pipeline you can execute:
```
!airflow list_dags
```

## Orchestrating TFX Pipelines with Apache Airflow

### Pipeline Setup

Instead of importing the BeamDagRunner, we will use the AirflowDAGRunner. Again the files for an Airflow pipeline need to be located in the ~/airflow/dags folder.

In [None]:
airflow_config = {
    "schedule_interval": None,
    "start_date": datetime(2020, 10, 9),
    "pipeline_name": "your_ml_pipeline"
}

In [None]:
from typing import Text

In [None]:
def init_pipeline(
    components, pipeline_root: Text, direct_num_workers: int
) -> pipeline.Pipeline:

    beam_arg = [
        f"--direct_num_workers={direct_num_workers}",
    ]
    p = pipeline.Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=pipeline_root,
        components=components,
        enable_cache=True,
        metadata_connection_config=metadata.sqlite_metadata_connection_config(
            metadata_path
        ),
        beam_pipeline_args=beam_arg,
    )
    return p

In [None]:
from tfx.orchestration.airflow.airflow_dag_runner import AirflowDagRunner, AirflowPipelineConfig
# from base_pipeline import init_components # needs to be defined see book repo

In [None]:
components = init_components(
    data_dir,
    module_file,
    serving_model_dir,
    training_steps=50000,
    eval_steps=10000,
)
pipe_line = init_pipeline(components, pipeline_root, 0)
DAG = AirflowDagRunner(AirflowPipelineConfig(airflow_config)).run(pipe_line)

[2020-10-09 10:35:18,980] {component.py:88} INFO - Excluding no splits because exclude_splits is not set.
[2020-10-09 10:35:18,989] {component.py:98} INFO - Excluding no splits because exclude_splits is not set.
[2020-10-09 10:35:18,997] {component.py:96} INFO - Excluding no splits because exclude_splits is not set.


# References and Additional Resources

 - <a href="https://flink.apache.org/">Apache Flink</a>
 - <a href="https://github.com/tensorflow/tfx/blob/master/tfx/examples/chicago_taxi_pipeline/taxi_pipeline_portable_beam.py">TFX Example</a>
 - <a href="https://spark.apache.org/">Spark</a>
 - <a href="https://airflow.apache.org/">Apache Airflow</a>