In [None]:
!apt install --allow-change-held-packages libcudnn8=8.1.0.77-1+cuda11.2

In [None]:
!pip install tfx 

In [None]:
import json 
import os
from pathlib import Path

# your api key
api_key = {
'username':"your_kaggle_username" ,
'key':"your_kaggle_api_key"}

# uses pathlib Path
kaggle_path = Path('/root/.kaggle')
os.makedirs(kaggle_path, exist_ok=True)

# opens file and dumps python dict to json object 
with open (kaggle_path/'kaggle.json', 'w') as handl:
    json.dump(api_key,handl)

os.chmod(kaggle_path/'kaggle.json', 600)  


In [None]:
!kaggle competitions download -c dogs-vs-cats

Downloading dogs-vs-cats.zip to /content
100% 811M/812M [00:06<00:00, 127MB/s]
100% 812M/812M [00:06<00:00, 132MB/s]


In [None]:
!unzip /content/dogs-vs-cats.zip -d /content/

Archive:  /content/dogs-vs-cats.zip
  inflating: /content/sampleSubmission.csv  
  inflating: /content/test1.zip      
  inflating: /content/train.zip      


In [None]:
!unzip /content/train.zip -d /content/

In [None]:
import os 
runner_type = 'beam'
pipeline_dir = '/content/tfx'
pipeline_name = 'cat_dog'

data_dir =  os.path.join(pipeline_dir, 'data')
module_file = os.path.join(pipeline_dir, 'components', 'module.py')

output_base = os.path.join(pipeline_dir, 'output', pipeline_name)
serving_model_dir = os.path.join(output_base, pipeline_name)
pipeline_root = os.path.join(output_base, 'pipeline_root')

metadata_path = os.path.join(pipeline_root, 'metadata.sqlite')

In [None]:
import io 
import numpy as np
from PIL import Image
import tensorflow as tf
import cv2
import os


base_path = "/content/train"
filenames = os.listdir(base_path)

def generate_label_from_path(image_path):
    if  image_path[15:18] == "cat":
        return 1
    else:
        return 0


def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    if isinstance(value, type(tf.constant(0))): # if value ist tensor
        value = value.numpy() # get value of tensor
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))


def serialize_array(array):
  array = tf.io.serialize_tensor(array)
  return array

def image_label_to_tf_train(image, label):

    data = {
        'raw_image' : _bytes_feature(serialize_array(image)),
        'label' : _int64_feature(label)
    }
    return tf.train.Example(features=tf.train.Features(feature=data))        


tfrecord_filename = '/content/tfx/data/images.tfrecord'

count = 0
with tf.io.TFRecordWriter(tfrecord_filename) as writer:
  for img_path in filenames[:5000]:
    image_path = os.path.join(base_path, img_path)

    img = cv2.imread(image_path)

    img = cv2.resize(img,(100,100))

    label = generate_label_from_path(image_path)
    example = image_label_to_tf_train(img, label)
    writer.write(example.SerializeToString())
    count = count + 1
writer.close()
print(f"Wrote {count} elements to TFRecord")
    

Wrote 5000 elements to TFRecord


In [None]:
from tfx import components
import tensorflow_model_analysis as tfma 
from tfx.components import (ImportExampleGen, Evaluator, ExampleValidator, Pusher, SchemaGen, StatisticsGen, Trainer, Transform)
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.proto import pusher_pb2, trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model, ModelBlessing
from tfx.proto import example_gen_pb2
from tfx import v1 as tfx
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner
import os
import absl




def init_components(data_root, module_file, serving_model_dir,
                     training_steps=2000, eval_steps=200):
  

  output = example_gen_pb2.Output(split_config = example_gen_pb2.SplitConfig(splits=[
            example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=6),
            example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=2),
            example_gen_pb2.SplitConfig.Split(name='test', hash_buckets=2)
        ])
  )
        
  example_gen = ImportExampleGen(input_base = data_root, output_config =output)

  statistics_gen = StatisticsGen(examples = example_gen.outputs['examples'])

  schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)

  example_validator = ExampleValidator(statistics = statistics_gen.outputs['statistics'],schema=schema_gen.outputs['schema'])

  transform = Transform(examples=example_gen.outputs['examples'],schema=schema_gen.outputs['schema'],module_file=module_file)

  trainer = Trainer(module_file=(module_file),transformed_examples=transform.outputs['transformed_examples'],transform_graph=transform.outputs['transform_graph'],
                      schema=schema_gen.outputs['schema'],train_args=trainer_pb2.TrainArgs(num_steps=training_steps),eval_args=trainer_pb2.EvalArgs(num_steps=eval_steps))

  model_resolver = tfx.dsl.Resolver(
      strategy_class=tfx.dsl.experimental.LatestBlessedModelStrategy,
      model=tfx.dsl.Channel(type=tfx.types.standard_artifacts.Model),
      model_blessing=tfx.dsl.Channel(type=tfx.types.standard_artifacts.ModelBlessing),
  ) 
  
  eval_config = tfma.EvalConfig(
      model_specs=[tfma.ModelSpec(label_key='label')],
      slicing_specs=[tfma.SlicingSpec()],
      metrics_specs=[
          tfma.MetricsSpec(metrics=[
              tfma.MetricConfig(
                  class_name='SparseCategoricalAccuracy',
                  threshold=tfma.MetricThreshold(
                      value_threshold=tfma.GenericValueThreshold(
                          lower_bound={'value': 0.8})))
          ])
      ])
  
  evaluator = Evaluator(examples=transform.outputs['transformed_examples'],model=trainer.outputs['model'],eval_config=eval_config)

  pusher = Pusher(model=trainer.outputs['model'],model_blessing=evaluator.outputs['blessing'],
                  push_destination=pusher_pb2.PushDestination(filesystem=pusher_pb2.PushDestination.Filesystem(base_directory=serving_model_dir)))

  
 
  components=[
          example_gen,
          statistics_gen,
          schema_gen,
          example_validator,
          transform,
          trainer,
          evaluator,
          pusher,
      ]

  return components          
      

In [None]:
import absl
from tfx.orchestration import metadata, pipeline

def init_beam_pipeline(components, pipeline_root, direct_num_workers):

  absl.logging.info("Pipeline root set to:{}".format(pipeline_root))
  beam_arg =[
      "--direct_num_workers={}".format(direct_num_workers),
  ]

  p = pipeline.Pipeline(
      pipeline_name = pipeline_name,
      pipeline_root = pipeline_root,
      components = components,
      enable_cache = False,
      metadata_connection_config=metadata.sqlite_metadata_connection_config(metadata_path),
      beam_pipeline_args=beam_arg
  )
  return p

In [None]:
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner

components = init_components(data_dir, module_file, serving_model_dir,
 training_steps=3000, eval_steps=2000)
pipeline = init_beam_pipeline(components, pipeline_root, 2)
BeamDagRunner().run(pipeline)