In [1]:
#Check Python Version
import shutil
import sys
sys.version

'3.8.2 (default, Jan 31 2023, 18:34:03) \n[GCC 12.2.0]'

In [None]:
#Upgrade pip
%pip install --upgrade pip

In [None]:
#Check TF & TFX Versioning
import tensorflow as tf
print(tf.__version__)
from tfx import v1 as tfx
print(tfx.__version__)


In [None]:
#Setup Variables as importer_node_playground
import os

# Pipeline name
PIPELINE_NAME = "importer_node_playground"

# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = './artifacts'
# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')
# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

# Folder path to data
DATA_ROOT = './data/'

from absl import logging
logging.set_verbosity(logging.INFO)  # Set default logging level.

In [None]:
importer_variable = tfx.dsl.Importer

external_examples = tfx.types.standard_artifacts.Examples

external_examples.PROPERTIES['span'] = 1
external_examples.PROPERTIES['version'] = 1
external_examples.PROPERTIES['split_names'] = "Split_train"

# external_examples = './artifacts/CopyExampleGen/examples/'

print(external_examples.PROPERTIES['span'])

In [None]:
###################################################################################
### [USE CASE 1]
### Tfrecords are already created

### This component will:
### 1. Accept a dict with with {'split_name': './path/to/split_name/tfrecords.gz'}
### 2. Add them to a folder to follow Example Artifact directory structure
### 3. Import them with Importer node to register the external resource into MLMD
###################################################################################


# Create pipeline to run Importer node
def _create_pipeline(
  pipeline_name: str,
  pipeline_root: str,
  data_root: str,
  metadata_path: str
  ) -> tfx.dsl.Pipeline:


  ### Get external source data
  # Source directory of external Examples Artifacts
  source_examples_artifact_uri = '../artifacts/'

  # Destination directory for source
  destination_examples_artifact_uri = './artifacts/'

  # Get all files from source_examples_artifact_uri
  files = os.listdir(source_examples_artifact_uri)

  # Import source files to destination
  shutil.copytree(source_examples_artifact_uri, destination_examples_artifact_uri, dirs_exist_ok=True)

  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # Importer Node
  # examples_importer = tfx.dsl.Importer(
  #     source_uri='./tfrecords/1/',
  #     artifact_type=tfx.types.standard_artifacts.Examples).with_id(
  #         'examples_importer')

  # print("IMPORTER_NODE: ", importer_node.outputs)
  

  # EXAMPLEGEN COMPONENT OUTPUT
  # Brings data into the pipeline.
  # example_gen = tfx.components.CsvExampleGen(input_base=data_root)
  # print("EXAMPLE_GEN: ", example_gen.outputs['examples'])

  # print("tfx.components.CsvExampleGen.outputs['examples'] output: \n\n",
  #   "Artifact Type: ", example_gen.outputs['examples']._artifact_type, "\n"
  #   "Producer Component Id: ", example_gen.outputs['examples'].producer_component_id, "\n"
  #   "Output Key: ", example_gen.outputs['examples'].output_key, "\n"
  #   "Additional Properties: ", example_gen.outputs['examples'].additional_properties, "\n"
  #   "Additional Custom Properties: ", example_gen.outputs['examples'].additional_custom_properties, "\n"
  # )


  # Computes statistics over data for visualization and schema generation.
  # statistics_gen = tfx.components.StatisticsGen(
  #     examples=importer_node.outputs['examples'])
  

  components = [
    example_gen
    # examples_importer
    # statistics_gen
  ]

  return tfx.dsl.Pipeline(
    pipeline_name=PIPELINE_NAME,
    pipeline_root=PIPELINE_ROOT,
    metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(metadata_path),
    components=components
)

In [None]:
tfx.orchestration.LocalDagRunner().run(
  _create_pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      data_root=DATA_ROOT,
      metadata_path=METADATA_PATH)
  )

In [None]:
###################################################################################
### [USE CASE 2]
### Raw data only

### This component will:
### 1. Accept a dict with with {'split_name': './path/to/rawdata}
### 2. Convert these files to tfrecords
###       - Register under MLMD using Importer(?)
### 3. Format tfrecords to Examples Artifact directory structure
###       - Register under MLMD using Importer(?)
###################################################################################


# Create pipeline to run Importer node
def _create_pipeline(
  pipeline_name: str,
  pipeline_root: str,
  data_root: str,
  metadata_path: str
  ) -> tfx.dsl.Pipeline:

  ######################################################################

  # 1. Accept a dict with with {'split_name': './path/to/rawdata}
  # Source directory of external Examples Artifacts
  source_examples_artifact_uri = {
    'split_train':'../raw_data/split_train.csv',
    'split_eval': '../raw_data/'
  }

  ######################################################################

  # 2. Convert these files to tfrecords w/o using Beam executor
  










  # Brings data into the pipeline.
  # example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # print("EXAMPLE_GEN: ", example_gen.outputs['examples'])

  # print("tfx.components.CsvExampleGen.outputs['examples'] output: \n\n",
  #   "Artifact Type: ", example_gen.outputs['examples']._artifact_type, "\n"
  #   "Producer Component Id: ", example_gen.outputs['examples'].producer_component_id, "\n"
  #   "Output Key: ", example_gen.outputs['examples'].output_key, "\n"
  #   "Additional Properties: ", example_gen.outputs['examples'].additional_properties, "\n"
  #   "Additional Custom Properties: ", example_gen.outputs['examples'].additional_custom_properties, "\n"
  # )

  # Importer Node
  importer_node = tfx.dsl.Importer(
    source_uri='./artifacts/CopyExampleGen/examples/1/Split-eval/',
    artifact_type=tfx.types.standard_artifacts.Examples,
    output_key='examples'
    ).with_id('CopyExampleGen')

  # example_gen = tfx.components.ImportExampleGen(input_base='./artifacts/CopyExampleGen/examples/1/Split-eval/')

  print("IMPORTER_NODE: ", importer_node.outputs['examples'])
  
  # Computes statistics over data for visualization and schema generation.
  statistics_gen = tfx.components.StatisticsGen(
      examples=importer_node.outputs['examples'])
  

  components = [
    # example_gen,
    importer_node,
    statistics_gen
  ]

  return tfx.dsl.Pipeline(
    pipeline_name=PIPELINE_NAME,
    pipeline_root=PIPELINE_ROOT,
    metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(metadata_path),
    components=components
)

In [None]:
# [SPLIT_TRAIN] View Dataset Artifact as tf.record
train_uri = os.path.join('./artifacts/CsvExampleGen/examples/1/', 'Split-train')

tfrecord_filenames = [os.path.join(train_uri, name)
                      for name in os.listdir(train_uri)]

dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")

for tfrecord in dataset:
  # Prints out tf.record
  print(tfrecord)
  
  serialized_example = tfrecord.numpy()
  example = tf.train.Example()

  # Prints out parsed tfrecord as JSON
  example.ParseFromString(serialized_example)
  print(example)


In [None]:
# [SPLIT_EVAL] View Dataset Artifact
eval_uri = os.path.join('./artifacts/CsvExampleGen/examples/1/', 'Split-eval')

tfrecord_filenames = [os.path.join(eval_uri, name)
                      for name in os.listdir(eval_uri)]

print(tfrecord_filenames)


dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")

for tfrecord in dataset.take(3):
  # Prints out tf.record
  print(tfrecord)

  serialized_example = tfrecord.numpy()
  example = tf.train.Example()
  
  # Prints out parsed tfrecord as JSON
  example.ParseFromString(serialized_example)
  print(example)