<a href="https://colab.research.google.com/github/hadi-ansari/TFX/blob/main/Cat_%26_Dog_recognition_StatisticGen_%26_SchemaGen.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Simple TFX Pipeline Tutorial using cats & dogs dataset

***A pipeline to train a model for recognizing cats and dogs.***

In this notebook-based tutorial, we will create and run a TFX pipeline
for a simple classification model.
The pipeline will consist of three essential TFX components: ExampleGen,
Trainer and Pusher. The pipeline includes the most minimal ML workflow like
importing data, training a model and exporting the trained model.

Please see
[Understanding TFX Pipelines](https://www.tensorflow.org/tfx/guide/understanding_tfx_pipelines)
to learn more about various concepts in TFX.

## Set Up
We first need to install the TFX Python package and download
the dataset which we will use for our model.

### Upgrade Pip

In [1]:
!pip install --upgrade pip

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pip
  Downloading pip-23.1-py3-none-any.whl (2.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m30.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 23.0.1
    Uninstalling pip-23.0.1:
      Successfully uninstalled pip-23.0.1
Successfully installed pip-23.1


In [2]:
!pip install -U tfx

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting tfx
  Downloading tfx-1.12.0-py3-none-any.whl (2.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.7/2.7 MB[0m [31m30.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting ml-pipelines-sdk==1.12.0 (from tfx)
  Downloading ml_pipelines_sdk-1.12.0-py3-none-any.whl (1.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m38.0 MB/s[0m eta [36m0:00:00[0m
Collecting ml-metadata<1.13.0,>=1.12.0 (from tfx)
  Downloading ml_metadata-1.12.0-cp39-cp39-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (6.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.7/6.7 MB[0m [31m71.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting packaging<21,>=20 (from tfx)
  Downloading packaging-20.9-py2.py3-none-any.whl (40 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.9/40.9 kB[0m [31m5.1 MB/s[0m eta [36m

### Uninstall shapely

TODO(b/263441833) This is a temporal solution to avoid an
ImportError. Ultimately, it should be handled by supporting a
recent version of Bigquery, instead of uninstalling other extra
dependencies.

In [3]:
!pip uninstall shapely -y

Found existing installation: shapely 2.0.1
Uninstalling shapely-2.0.1:
  Successfully uninstalled shapely-2.0.1
[0m

### Did you restart the runtime?

If you are using Google Colab, the first time that you run
the cell above, you must restart the runtime by clicking
above "RESTART RUNTIME" button or using "Runtime > Restart
runtime ..." menu. This is because of the way that Colab
loads packages.

# Necessary imports

We also check the TensorFlow and TFX versions.

In [1]:
import numpy as np
import os
import PIL
import PIL.Image
import tensorflow_datasets as tfds
import tensorflow as tf

print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))

TensorFlow version: 2.11.1
TFX version: 1.12.0


In [2]:
from google.colab import files
files.upload()

Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"ismailsafwat1","key":"7b0531deedec78ff800fd18b88fd120b"}'}

Because TFX ExampleGen reads inputs from a directory, we need to create a
directory and copy dataset to it.

In [3]:
!pip install -q kaggle
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
'chmod 600 /root/.kaggle/kaggle.json'
!kaggle datasets download -d salader/dogs-vs-cats
!unzip dogs-vs-cats.zip
!rm -rf test train dogs-vs-cats.zip

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: train/dogs/dog.4419.jpg  
  inflating: train/dogs/dog.442.jpg  
  inflating: train/dogs/dog.4420.jpg  
  inflating: train/dogs/dog.4421.jpg  
  inflating: train/dogs/dog.4422.jpg  
  inflating: train/dogs/dog.4424.jpg  
  inflating: train/dogs/dog.4425.jpg  
  inflating: train/dogs/dog.4426.jpg  
  inflating: train/dogs/dog.4427.jpg  
  inflating: train/dogs/dog.4431.jpg  
  inflating: train/dogs/dog.4433.jpg  
  inflating: train/dogs/dog.4436.jpg  
  inflating: train/dogs/dog.4438.jpg  
  inflating: train/dogs/dog.4439.jpg  
  inflating: train/dogs/dog.444.jpg  
  inflating: train/dogs/dog.4440.jpg  
  inflating: train/dogs/dog.4441.jpg  
  inflating: train/dogs/dog.4442.jpg  
  inflating: train/dogs/dog.4443.jpg  
  inflating: train/dogs/dog.4444.jpg  
  inflating: train/dogs/dog.4445.jpg  
  inflating: train/dogs/dog.4446.jpg  
  inflating: train/dogs/dog.445.jpg  
  inflating: train/dogs/dog.4450.jpg  
  

# Reduce size of the dataset (optional)
We reduce size of the dataset by removing the images from both train and test folders. Train images decreas from 10000 images to 50 and test images decreas from 2500 to 10 images to make the process faster (OBS: just for testing).

In [4]:
!ls -1 dogs_vs_cats/train/dogs/* | tail -n +1001 | xargs rm 
!ls -1 dogs_vs_cats/train/cats/* | tail -n +1001 | xargs rm

!ls -1 dogs_vs_cats/test/dogs/* | tail -n +251 | xargs rm 
!ls -1 dogs_vs_cats/test/cats/* | tail -n +251 | xargs rm 

### Set up variables

There are some variables used to define a pipeline. You can customize these
variables as you want. By default all output from the pipeline will be
generated under the current directory.

In [5]:

data_root ='dogs_vs_cats'
train_dir = os.path.join(data_root, 'train')
validation_dir = os.path.join(data_root, 'test')

train_cats_dir = os.path.join(train_dir, 'cats')  # directory with our training cat pictures
train_dogs_dir = os.path.join(train_dir, 'dogs')  # directory with our training dog pictures
validation_cats_dir = os.path.join(validation_dir, 'cats')  # directory with our validation cat pictures
validation_dogs_dir = os.path.join(validation_dir, 'dogs')  # directory with our validation dog pictures

num_cats_tr = len(os.listdir(train_cats_dir))
num_dogs_tr = len(os.listdir(train_dogs_dir))

num_cats_val = len(os.listdir(validation_cats_dir))
num_dogs_val = len(os.listdir(validation_dogs_dir))

total_train = num_cats_tr + num_dogs_tr
total_val = num_cats_val + num_dogs_val

IMG_HEIGHT = 150
IMG_WIDTH = 150
PIPELINE_NAME = 'dogs_vs_cats_pipeline'
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'cat&dog_md.db')


print('total training cat images:', num_cats_tr)
print('total training dog images:', num_dogs_tr)
print('total validation cat images:', num_cats_val)
print('total validation dog images:', num_dogs_val)
print("--")
print("Total training images:", total_train)
print("Total validation images:", total_val)

total training cat images: 1000
total training dog images: 1000
total validation cat images: 250
total validation dog images: 250
--
Total training images: 2000
Total validation images: 500


# Conversion of image dataset to TFRecords

We followed the instructions according [this](https://ai.plainenglish.io/a-quick-and-simple-guide-to-tfrecord-c421337a6562).


In [6]:
import tensorflow as tf
import numpy as np
import os
from PIL import Image
import random
import cv2
import matplotlib.pyplot as plt

# Setup the train and test imgage directories
train_dir = os.path.join(data_root, "train")
test_dir = os.path.join(data_root, "test")

# setup train and test TFRecord file
train_tfrecord='train_data.tfrecords'
test_tfrecord = 'test_data.tfrecords'

# Define the name of folders of each class
# We only have two classes in this case.
folders=['dogs', 'cats']

# List all train and test image path
train_image_path=[]
test_image_path=[]

for i in range(len(folders)):
    for file in os.listdir(os.path.join(train_dir, folders[i])):
        train_image_path.append(os.path.join(train_dir, folders[i], file))
    for file in os.listdir(os.path.join(test_dir, folders[i])):
        test_image_path.append( os.path.join(test_dir, folders[i], file))


print("Number of train images found: ", len(train_image_path))
print("Number of test images found: ", len(test_image_path))

# Shuffle the image paths for better accuracy and precision
random.seed(0)
random.shuffle(train_image_path)
random.shuffle(test_image_path)

# create train and test lables for shuffled image paths
# 0 for cat and 1 for dog
train_labels=[]
test_labels=[]
for i in range(len(train_image_path)):
    if os.path.basename(train_image_path[i])[:3]=='cat':
        train_labels.append(0)
    else:
        train_labels.append(1)

for i in range(len(test_image_path)):
    if os.path.basename(test_image_path[i])[:3]=='cat':
        test_labels.append(0)
    else:
        test_labels.append(1)

def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""    
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy()
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def _int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
def _float_feature(value):
    """Returns a float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def serialize_example(image_string, label):
    ## Create a dictionary with features for images and their target labels
    image_shape = tf.io.decode_jpeg(image_string).shape

    feature = {
      'height': _int64_feature(image_shape[0]),
      'width': _int64_feature(image_shape[1]),
      'depth': _int64_feature(image_shape[2]),
      'label': _int64_feature(label),
      'image_raw': _bytes_feature(image_string),
    }
    #  Create a Features message using tf.train.Example.
    example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
    #serializes the message and returns it as a string. Note that the bytes are binary
    return example_proto.SerializeToString()

def write_TFRecord(image_path, label):
    image_string = open(image_path, 'rb').read()
    example = serialize_example(image_string, label)
    return example

#Write Train TFRecord file
with tf.io.TFRecordWriter(train_tfrecord) as writer:
    for image_path, label in zip(train_image_path, train_labels):
        writer.write(write_TFRecord(image_path, int(label)))
#Write Test TFRecord file
with tf.io.TFRecordWriter(test_tfrecord) as writer:
    for image_path, label in zip(test_image_path, test_labels):
         writer.write(write_TFRecord(image_path, int(label)))

Number of train images found:  2000
Number of test images found:  500


In [7]:
!rm -rf dataset
!mkdir dataset
!mkdir dataset/train
!mkdir dataset/test
!mv train_data.tfrecords dataset/train
!mv test_data.tfrecords dataset/test

#Pipeline Definition
We define the ML pipeline here. The components of the pipeline are: ImportExampleGen for ingesting the images into the pipeline, StatisticsGen and SchemaGen. 

For ingesting the image dataset into the pipeline, we got help from this github repository [github](https://github.com/tensorflow/tfx/blob/master/tfx/examples/cifar10/cifar10_pipeline_native_keras.py)

In [24]:
from tfx.components.example_gen.component import FileBasedExampleGen
from tfx.dsl.components.base import executor_spec
from tfx.components.example_gen.csv_example_gen import executor
from tfx.dsl.components.base.base_executor import BaseExecutor
from tfx.proto import example_gen_pb2
from tfx.components import ImportExampleGen
from tfx.proto import trainer_pb2


def _create_pipeline(pipeline_name: str, pipeline_root: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:
  print("Pipeline creation is running...\n\n\n")

  """Creates a three component penguin pipeline with TFX."""
 
  input_config = example_gen_pb2.Input(splits=[
      example_gen_pb2.Input.Split(name='train', pattern='train/*'),
      example_gen_pb2.Input.Split(name='eval', pattern='test/*')
  ])

  # Brings data into the pipeline.
  example_gen = ImportExampleGen(
      input_base='/content/dataset', input_config=input_config)
  

  # Computes statistics over data for visualization and schema generation.
  statistics_gen = tfx.components.StatisticsGen(
      examples=example_gen.outputs['examples'])
  
  # Generates schema based on the generated statistics.
  schema_gen = tfx.components.SchemaGen(
      statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)

  # Following three components will be included in the pipeline.
  components = [
      example_gen,
      statistics_gen,
      schema_gen,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(metadata_path),
      components=components)

## Run the pipeline

TFX supports multiple orchestrators to run pipelines.
In this tutorial we will use `LocalDagRunner` which is included in the TFX
Python package and runs pipelines on local environment.
We often call TFX pipelines "DAGs" which stands for directed acyclic graph.

`LocalDagRunner` provides fast iterations for development and debugging.
TFX also supports other orchestrators including Kubeflow Pipelines and Apache
Airflow which are suitable for production use cases.

See
[TFX on Cloud AI Platform Pipelines](https://www.tensorflow.org/tfx/tutorials/tfx/cloud-ai-platform-pipelines)
or
[TFX Airflow Tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/airflow_workshop)
to learn more about other orchestration systems.

Now we create a `LocalDagRunner` and pass a `Pipeline` object created from the
function we already defined.

The pipeline runs directly and you can see logs for the progress of the pipeline including ML model training.

In [25]:
tfx.orchestration.LocalDagRunner().run(
  _create_pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      metadata_path=METADATA_PATH))

Pipeline creation is running...







You should see "INFO:absl:Component Pusher is finished." at the end of the
logs if the pipeline finished successfully. Because `Pusher` component is the
last component of the pipeline.

The pusher component pushes the trained model to the `SERVING_MODEL_DIR` which
is the `serving_model/penguin-simple` directory if you did not change the
variables in the previous steps. You can see the result from the file browser
in the left-side panel in Colab, or using the following command:

## Review outputs of the pipeline

In [26]:
from ml_metadata.proto import metadata_store_pb2
# Non-public APIs, just for showcase.
from tfx.orchestration.portable.mlmd import execution_lib

# TODO(b/171447278): Move these functions into the TFX library.

def get_latest_artifacts(metadata, pipeline_name, component_id):
  """Output artifacts of the latest run of the component."""
  context = metadata.store.get_context_by_type_and_name(
      'node', f'{pipeline_name}.{component_id}')
  executions = metadata.store.get_executions_by_context(context.id)
  latest_execution = max(executions,
                         key=lambda e:e.last_update_time_since_epoch)
  return execution_lib.get_output_artifacts(metadata, latest_execution.id)

# Non-public APIs, just for showcase.
from tfx.orchestration.experimental.interactive import visualizations

def visualize_artifacts(artifacts):
  """Visualizes artifacts using standard visualization modules."""
  for artifact in artifacts:
    visualization = visualizations.get_registry().get_visualization(
        artifact.type_name)
    if visualization:
      visualization.display(artifact)

from tfx.orchestration.experimental.interactive import standard_visualizations
standard_visualizations.register_standard_visualizations()

In [27]:
# Non-public APIs, just for showcase.
from tfx.orchestration.metadata import Metadata
from tfx.types import standard_component_specs

metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(
    METADATA_PATH)

with Metadata(metadata_connection_config) as metadata_handler:
  # Find output artifacts from MLMD.
  stat_gen_output = get_latest_artifacts(metadata_handler, PIPELINE_NAME,
                                         'StatisticsGen')
  stats_artifacts = stat_gen_output[standard_component_specs.STATISTICS_KEY]

  schema_gen_output = get_latest_artifacts(metadata_handler,
                                           PIPELINE_NAME, 'SchemaGen')
  schema_artifacts = schema_gen_output[standard_component_specs.SCHEMA_KEY]

In [29]:
# docs-infra: no-execute
visualize_artifacts(stats_artifacts)

In [28]:
visualize_artifacts(schema_artifacts)

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'depth',INT,required,,-
'height',INT,required,,-
'image_raw',BYTES,required,,-
'label',INT,required,,-
'width',INT,required,,-


## Next steps

You can find more resources on https://www.tensorflow.org/tfx/tutorials.

Please see
[Understanding TFX Pipelines](https://www.tensorflow.org/tfx/guide/understanding_tfx_pipelines)
to learn more about various concepts in TFX.
