<a href="https://colab.research.google.com/github/armahdavi/MLOps/blob/main/ML%20Data%20Lifecycle%20in%20Production%20(Coursera)/tft_practice_apache_beam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install -U tensorflow-transform
import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam

from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import schema_utils

import pprint
import tempfile

print(f'TensorFlow version: {tf.__version__}')
print(f'TFX Transform version: {tft.__version__}')


TensorFlow version: 2.16.2
TFX Transform version: 1.16.0


In [2]:
# define sample data
raw_data = [
      {'x': 1, 'y': 1, 's': 'hello'},
      {'x': 2, 'y': 2, 's': 'world'},
      {'x': 3, 'y': 3, 's': 'hello'}
  ]

In [3]:
# define the schema as a DatasetMetadata object
raw_data_metadata = dataset_metadata.DatasetMetadata(

    # use convenience function to build a Schema protobuf
    schema_utils.schema_from_feature_spec({

        # define a dictionary mapping the keys to its feature spec type
        'y': tf.io.FixedLenFeature([], tf.float32),
        'x': tf.io.FixedLenFeature([], tf.float32),
        's': tf.io.FixedLenFeature([], tf.string),
    }))

raw_data_metadata

{'_schema': feature {
  name: "s"
  type: BYTES
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "x"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "y"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
, '_output_record_batches': True}

In [4]:
def preprocessing_fn(inputs):
    """Preprocess input columns into transformed columns."""

    # extract the columns and assign to local variables
    x = inputs['x']
    y = inputs['y']
    s = inputs['s']

    # data transformations using tft functions
    x_centered = x - tft.mean(x)
    y_normalized = tft.scale_to_0_1(y)
    s_integerized = tft.compute_and_apply_vocabulary(s)
    x_centered_times_y_normalized = (x_centered * y_normalized)

    # return the transformed data
    return {
        'x_centered': x_centered,
        'y_normalized': y_normalized,
        's_integerized': s_integerized,
        'x_centered_times_y_normalized': x_centered_times_y_normalized,
    }

In [5]:
# Ignore the warnings
tf.get_logger().setLevel('ERROR')

# a temporary directory is needed when analyzing the data
with tft_beam.Context(temp_dir = tempfile.mkdtemp()):

    # define the pipeline using Apache Beam syntax
    transformed_dataset, transform_fn = (

        # analyze and transform the dataset using the preprocessing function
        (raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(
            preprocessing_fn)
    )





In [6]:
transformed_dataset

([{'s_integerized': 0,
   'x_centered': -1.0,
   'x_centered_times_y_normalized': -0.0,
   'y_normalized': 0.0},
  {'s_integerized': 1,
   'x_centered': 0.0,
   'x_centered_times_y_normalized': 0.0,
   'y_normalized': 0.5},
  {'s_integerized': 0,
   'x_centered': 1.0,
   'x_centered_times_y_normalized': 1.0,
   'y_normalized': 1.0}],
 BeamDatasetMetadata(dataset_metadata={'_schema': feature {
   name: "s_integerized"
   type: INT
   int_domain {
     is_categorical: true
   }
   presence {
     min_fraction: 1.0
   }
   shape {
   }
 }
 feature {
   name: "x_centered"
   type: FLOAT
   presence {
     min_fraction: 1.0
   }
   shape {
   }
 }
 feature {
   name: "x_centered_times_y_normalized"
   type: FLOAT
   presence {
     min_fraction: 1.0
   }
   shape {
   }
 }
 feature {
   name: "y_normalized"
   type: FLOAT
   presence {
     min_fraction: 1.0
   }
   shape {
   }
 }
 , '_output_record_batches': False}, deferred_metadata=[{'_schema': feature {
   name: "s_integerized"
   type

In [7]:
transform_fn

(['/tmp/tmprgrrdn3w/tftransform_tmp/4d1fb8cde17b44eaa501568c8c266743'],
 BeamDatasetMetadata(dataset_metadata={'_schema': feature {
   name: "s_integerized"
   type: INT
   int_domain {
     is_categorical: true
   }
   presence {
     min_fraction: 1.0
   }
   shape {
   }
 }
 feature {
   name: "x_centered"
   type: FLOAT
   presence {
     min_fraction: 1.0
   }
   shape {
   }
 }
 feature {
   name: "x_centered_times_y_normalized"
   type: FLOAT
   presence {
     min_fraction: 1.0
   }
   shape {
   }
 }
 feature {
   name: "y_normalized"
   type: FLOAT
   presence {
     min_fraction: 1.0
   }
   shape {
   }
 }
 , '_output_record_batches': False}, deferred_metadata=[{'_schema': feature {
   name: "s_integerized"
   type: INT
   int_domain {
     min: -1
     max: 1
     is_categorical: true
   }
   presence {
     min_fraction: 1.0
   }
   shape {
   }
 }
 feature {
   name: "x_centered"
   type: FLOAT
   presence {
     min_fraction: 1.0
   }
   shape {
   }
 }
 feature {
   na

In [8]:
transformed_data, transformed_metadata = transformed_dataset
transformed_data

[{'s_integerized': 0,
  'x_centered': -1.0,
  'x_centered_times_y_normalized': -0.0,
  'y_normalized': 0.0},
 {'s_integerized': 1,
  'x_centered': 0.0,
  'x_centered_times_y_normalized': 0.0,
  'y_normalized': 0.5},
 {'s_integerized': 0,
  'x_centered': 1.0,
  'x_centered_times_y_normalized': 1.0,
  'y_normalized': 1.0}]

In [9]:
transformed_metadata

BeamDatasetMetadata(dataset_metadata={'_schema': feature {
  name: "s_integerized"
  type: INT
  int_domain {
    is_categorical: true
  }
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "x_centered"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "x_centered_times_y_normalized"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "y_normalized"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
, '_output_record_batches': False}, deferred_metadata=[{'_schema': feature {
  name: "s_integerized"
  type: INT
  int_domain {
    min: -1
    max: 1
    is_categorical: true
  }
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "x_centered"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "x_centered_times_y_normalized"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "y_normalized