In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf
import apache_beam as beam
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam
from tensorflow_transform.tf_metadata import schema_utils
from tensorflow_transform.tf_metadata import dataset_metadata
from tfx_bsl.public import tfxio
from tfx_bsl.coders.example_coder import RecordBatchToExamples

import tempfile

print(f'TensorFlow version: {tf.__version__}')
print(f'TFX Transform version: {tft.__version__}')

TensorFlow version: 2.13.1
TFX Transform version: 1.14.0


In [2]:
train_data_file = "data/train_cleaned.csv"
train_df =  pd.read_csv(train_data_file)

In [3]:
train_df.head()


Unnamed: 0,age,workclass,fnlwgt,education,education_num,marital_status,occupation,relationship,race,sex,capital_gain,capital_loss,hours_per_week,native_country,income
0,39,State-gov,77516,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,2174,0,40,United-States,<=50K
1,50,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,13,United-States,<=50K
2,38,Private,215646,HS-grad,9,Divorced,Handlers-cleaners,Not-in-family,White,Male,0,0,40,United-States,<=50K
3,53,Private,234721,11th,7,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0,0,40,United-States,<=50K
4,28,Private,338409,Bachelors,13,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0,0,40,Cuba,<=50K


In [4]:
CATEGORICAL_FEATURE_KEYS = [
    'workclass',
    'education',
    'marital_status',
    'occupation',
    'relationship',
    'race',
    'sex',
    'native_country',
]

NUMERIC_FEATURE_KEYS = [
    'age',
    'fnlwgt',
    'capital_gain',
    'capital_loss',
    'hours_per_week',
    'education_num',
]

LABEL_KEY = 'income'


RAW_DATA_FEATURE_SPEC = dict(
    [(name, tf.io.FixedLenFeature([], tf.string))
     for name in CATEGORICAL_FEATURE_KEYS] +
    [(name, tf.io.FixedLenFeature([], tf.float32))
     for name in NUMERIC_FEATURE_KEYS] +
    [(LABEL_KEY, tf.io.FixedLenFeature([], tf.string))]
)
print(RAW_DATA_FEATURE_SPEC)
SCHEMA = dataset_metadata.DatasetMetadata(
    schema_utils.schema_from_feature_spec(RAW_DATA_FEATURE_SPEC)
).schema

raw_data_metadata = dataset_metadata.DatasetMetadata(
    schema_utils.schema_from_feature_spec(RAW_DATA_FEATURE_SPEC)
)

{'workclass': FixedLenFeature(shape=[], dtype=tf.string, default_value=None), 'education': FixedLenFeature(shape=[], dtype=tf.string, default_value=None), 'marital_status': FixedLenFeature(shape=[], dtype=tf.string, default_value=None), 'occupation': FixedLenFeature(shape=[], dtype=tf.string, default_value=None), 'relationship': FixedLenFeature(shape=[], dtype=tf.string, default_value=None), 'race': FixedLenFeature(shape=[], dtype=tf.string, default_value=None), 'sex': FixedLenFeature(shape=[], dtype=tf.string, default_value=None), 'native_country': FixedLenFeature(shape=[], dtype=tf.string, default_value=None), 'age': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'fnlwgt': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'capital_gain': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'capital_loss': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None), 'hours_per_week': FixedLenFeature(shape=[], dtype=tf.float32, default_v

In [5]:
print(raw_data_metadata._schema)
print("\n")
print(SCHEMA)

feature {
  name: "age"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "capital_gain"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "capital_loss"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "education"
  type: BYTES
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "education_num"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "fnlwgt"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "hours_per_week"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "income"
  type: BYTES
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "marital_status"
  type: BYTES
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "native_country"
  type: BYTES
  presence {
    min_fraction: 1.0
  }
  shape {
  }

In [6]:
type(SCHEMA)

tensorflow_metadata.proto.v0.schema_pb2.Schema

In [7]:
ORDERED_CSV_COLUMNS = list(train_df.columns)
ORDERED_CSV_COLUMNS

['age',
 'workclass',
 'fnlwgt',
 'education',
 'education_num',
 'marital_status',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_per_week',
 'native_country',
 'income']

In [8]:
# Read CSV files and translate to RecordBatch
pipeline = beam.Pipeline()
csv_tfxio = tfxio.CsvTFXIO(train_data_file,
                           telemetry_descriptors=[],
                           column_names=ORDERED_CSV_COLUMNS,
                           schema=SCHEMA,
                          skip_header_lines=1)

#pipeline = beam.Pipeline()
raw_data = pipeline | 'TFXIORead' >> csv_tfxio.BeamSource()



In [9]:
# Read CSV and perform some clean up
#pipeline = beam.Pipeline()

#csv_tfxio = tfxio.BeamRecordCsvTFXIO(
#    physical_format='text', column_names=ORDERED_CSV_COLUMNS, schema=SCHEMA)

#raw_data = (
#    pipeline
#    | 'ReadTrainData' >> beam.io.ReadFromText(
#       train_data_file, coder=beam.coders.BytesCoder())
#    | 'FixCommasTrainData' >> beam.Map(
#        lambda line: line.replace(b', ', b','))
#    | 'DecodeTrainData' >> csv_tfxio.BeamSource())

In [10]:
raw_data

<PCollection[[8]: TFXIORead/RawRecordToRecordBatch/CollectRecordBatchTelemetry/ProfileRecordBatches.None] at 0x1df320af280>

In [11]:
# Features with string data types that will be converted to indices
CATEGORICAL_FEATURE_KEYS = [
    'education', 'marital_status', 'occupation', 'race', 'relationship', 'workclass', 'sex', 'native_country'
]

# Feature to replace before converting to string data type
REPLACE_KEY = 'marital_status'

# Numerical features that are marked as continuous
NUMERIC_FEATURE_KEYS = ['fnlwgt', 
                        'education_num',
                        'capital_gain', 'capital_loss', 'hours_per_week'
                       ]

# Feature that can be grouped into buckets
BUCKET_FEATURE_KEYS = ['age']

# Number of buckets used by tf.transform for encoding each bucket feature.
FEATURE_BUCKET_COUNT = {'age': 4}

# Feature that the model will predict
LABEL_KEY = 'income'

In [12]:
# Utility function for renaming the feature
def transformed_name(key):
    return key + '_xf'
    
def preprocessing_fn(inputs):
    """Preprocess input columns into transformed columns.
    Args: 
        inputs (): 

    Return:
        dictionary
        
    """
    outputs = {}
    # Replace and map some features to reduce the number of classes
    inputs[REPLACE_KEY] = tf.strings.regex_replace(inputs[REPLACE_KEY], pattern='Married-civ-spouse', rewrite='Married')
    inputs[REPLACE_KEY] = tf.strings.regex_replace(inputs[REPLACE_KEY], pattern='Never-married', rewrite='Single')
    inputs[REPLACE_KEY] = tf.strings.regex_replace(inputs[REPLACE_KEY], pattern='Widowed', rewrite='Widowed')
    inputs[REPLACE_KEY] = tf.strings.regex_replace(inputs[REPLACE_KEY], pattern='Separated', rewrite='Divorced')
    inputs[REPLACE_KEY] = tf.strings.regex_replace(inputs[REPLACE_KEY], pattern='Divorced', rewrite='Divorced')
    inputs[REPLACE_KEY] = tf.strings.regex_replace(inputs[REPLACE_KEY], pattern='Married-spouse-absent', rewrite='Divorced')
    inputs[REPLACE_KEY] = tf.strings.regex_replace(inputs[REPLACE_KEY], pattern='Married-AF-spouse', rewrite='Divorced')

    # Map features To reduce get binary classes as the percentage of data is much
    # Feature to change to two classes before applying vocab
    inputs['workclass'] = tf.where(inputs['workclass'] == 'Private', 'Private', 'Others')
    inputs['race'] = tf.where(inputs['race'] ==  'White','White', 'Others')
    inputs['native_country'] = tf.where(inputs['native_country'] == 'United-States', 'United-States', 'Others')
    
    # Scale these fatures to be [0,1]
    for key in NUMERIC_FEATURE_KEYS:
        outputs[transformed_name(key)] = tft.scale_to_0_1(inputs[key])

    for key in CATEGORICAL_FEATURE_KEYS:
        outputs[transformed_name(key)] = tft.compute_and_apply_vocabulary(inputs[key])

    for key in BUCKET_FEATURE_KEYS:
        outputs[transformed_name(key)] = tft.bucketize(inputs[key], FEATURE_BUCKET_COUNT[key])


    outputs[transformed_name(LABEL_KEY)] = tft.compute_and_apply_vocabulary(inputs[LABEL_KEY])
    
    
    return outputs

In [13]:
import pprint

In [None]:
#train_df.to_dict('records')

In [14]:

# Ignore the warnings
tf.get_logger().setLevel('ERROR')

# a temporary directory is needed when analyzing the data
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
    
    # define the pipeline using Apache Beam syntax
    transformed_dataset1, transform_fn1 = (
        
        # analyze and transform the dataset using the preprocessing function
        (train_df.to_dict('records'), raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(
            preprocessing_fn)
    )

# unpack the transformed dataset
transformed_data1, transformed_metadata1 = transformed_dataset1



In [16]:
# print the results
print('\nRaw data:\n{}\n'.format(pprint.pformat(train_df.to_dict('records')[:10])))
print('Transformed data:\n{}'.format(pprint.pformat(transformed_data1[:10])))


Raw data:
[{'age': 39,
  'capital_gain': 2174,
  'capital_loss': 0,
  'education': 'Bachelors',
  'education_num': 13,
  'fnlwgt': 77516,
  'hours_per_week': 40,
  'income': ' <=50K',
  'marital_status': 'Never-married',
  'native_country': 'United-States',
  'occupation': 'Adm-clerical',
  'race': 'White',
  'relationship': 'Not-in-family',
  'sex': 'Male',
  'workclass': 'State-gov'},
 {'age': 50,
  'capital_gain': 0,
  'capital_loss': 0,
  'education': 'Bachelors',
  'education_num': 13,
  'fnlwgt': 83311,
  'hours_per_week': 13,
  'income': ' <=50K',
  'marital_status': 'Married-civ-spouse',
  'native_country': 'United-States',
  'occupation': 'Exec-managerial',
  'race': 'White',
  'relationship': 'Husband',
  'sex': 'Male',
  'workclass': 'Self-emp-not-inc'},
 {'age': 38,
  'capital_gain': 0,
  'capital_loss': 0,
  'education': 'HS-grad',
  'education_num': 9,
  'fnlwgt': 215646,
  'hours_per_week': 40,
  'income': ' <=50K',
  'marital_status': 'Divorced',
  'native_country': 'U

In [None]:
# Create constant module
census_constant_module = "transformation/income_constant.py"

In [None]:
%%writefile {census_constant_module}
# Features with string data types that will be converted to indices
CATEGORICAL_FEATURE_KEYS = [
    'education', 'marital_status', 'occupation', 'race', 'relationship', 'workclass', 'sex', 'native_country'
]

# Numerical features that are marked as continuous
NUMERIC_FEATURE_KEYS = ['fnlwgt', 
                        #'education_num',
                        'capital_gain', 'capital_loss', 'hours_per_week'
                       ]

# Feature that can be grouped into buckets
BUCKET_FEATURE_KEYS = ['age']

# Number of buckets used by tf.transform for encoding each bucket feature.
FEATURE_BUCKET_COUNT = {'age': 4}

# Feature that the model will predict
LABEL_KEY = 'income'

# Utility function for renaming the feature
def transformed_name(key):
    return key + '_xf'

In [None]:
census_transform_module_file = 'transformation/income_transform.py'

In [None]:
%%writefile {census_transform_module_file}
import income_constant
import tensorflow as tf
import tensorflow_transform as tft

# Unpack the contents of the constants module
_NUMERIC_FEATURE_KEYS = income_constant.NUMERIC_FEATURE_KEYS
_CATEGORICAL_FEATURE_KEYS = income_constant.CATEGORICAL_FEATURE_KEYS
_BUCKET_FEATURE_KEYS = income_constant.BUCKET_FEATURE_KEYS
_FEATURE_BUCKET_COUNT = income_constant.FEATURE_BUCKET_COUNT
_LABEL_KEY = income_constant.LABEL_KEY
_transformed_name = income_constant.transformed_name

def preprocessing_fn(inputs):
    """Preprocess input columns into transformed columns.
    Args: 
        inputs (): 

    Return:
        dictionary
        
    """
    outputs = {}
    # Scale these fatures to be [0,1]
    for key in _NUMERIC_FEATURE_KEYS:
        outputs[_transformed_name(key)] = tft.scale_to_0_1(inputs[key])

    for key in _CATEGORICAL_FEATURE_KEYS:
        outputs[_transformed_name(key)] = tft.compute_and_apply_vocabulary(inputs[key])

    for key in _BUCKET_FEATURE_KEYS:
        outputs[_transformed_name(key)] = tft.bucketize(inputs[key], _FEATURE_BUCKET_COUNT[key])


    outputs[_transformed_name(_LABEL_KEY)] = tft.compute_and_apply_vocabulary(inputs[_LABEL_KEY])
    
    
    return outputs

In [17]:
raw_dataset = (raw_data, csv_tfxio.TensorAdapterConfig())

In [18]:
#from transformation.income_transform import preprocessing_fn
# faster
# ignor Warnings

tf.get_logger().setLevel('ERROR')

# create a temp directory to analyze the data
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
    transformed_dataset2, transform_function2 = (
        raw_dataset | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn=preprocessing_fn, 
                                                          output_record_batches=True
                                                         )
    )

In [19]:
#output_dir = tempfile.mkdtemp()
transformed_data2, transformed_metadata2 = transformed_dataset2

In [20]:
transformed_data2

<PCollection[[18]: AnalyzeAndTransformDataset/TransformDataset/ConvertToRecordBatch.None] at 0x1df43366980>

In [21]:
import os
os.makedirs("transformation/transformed_data", exist_ok=True)

In [22]:
output_dir = "transformation/transformed_data"

In [23]:
#transformed_data, _ = transformed_data
#import os
_ = (
    transformed_data2
    | 'EncodeTrainData' >>
    beam.FlatMapTuple(lambda batch, transformed_metadata2: RecordBatchToExamples(batch))
    | 'WriteTrainData' >> beam.io.WriteToTFRecord(
        os.path.join(output_dir , 'transformed.tfrecord')))

In [24]:
_ = (
    transform_function2
    | 'WriteTransformFn' >> tft_beam.WriteTransformFn(output_dir))

In [25]:
result = pipeline.run().wait_until_finish()

