<a href="https://colab.research.google.com/github/KevinTheRainmaker/MLOps/blob/main/MLOps_07.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#데이터 전처리 using TFT

In [1]:
!pip install -q tensorflow-transform

[K     |████████████████████████████████| 422 kB 5.1 MB/s 
[K     |████████████████████████████████| 9.9 MB 42.7 MB/s 
[K     |████████████████████████████████| 19.1 MB 1.2 MB/s 
[K     |████████████████████████████████| 48 kB 5.4 MB/s 
[K     |████████████████████████████████| 151 kB 56.6 MB/s 
[K     |████████████████████████████████| 63 kB 1.5 MB/s 
[K     |████████████████████████████████| 2.3 MB 36.7 MB/s 
[K     |████████████████████████████████| 247 kB 66.2 MB/s 
[K     |████████████████████████████████| 45 kB 3.1 MB/s 
[K     |████████████████████████████████| 180 kB 72.6 MB/s 
[K     |████████████████████████████████| 110 kB 46.8 MB/s 
[K     |████████████████████████████████| 255 kB 74.6 MB/s 
[K     |████████████████████████████████| 435 kB 67.4 MB/s 
[K     |████████████████████████████████| 171 kB 77.1 MB/s 
[K     |████████████████████████████████| 183 kB 73.1 MB/s 
[K     |████████████████████████████████| 144 kB 61.0 MB/s 
[K     |██████████████████████

In [2]:
import tensorflow_transform as tft

def preprocessing_fn(inputs):
  x = inputs['x']
  x_normalized = tft.scale_to_0_1(x)

  return {
      'x_xf': x_normalized
  }

In [3]:
import tensorflow as tf

def preprocessing_fn(raw_image):
  raw_image = tf.reshape(raw_image, [-1])

  # JPEG Decoding
  img_rgb = tf.io.decode_jpeg(raw_image, channels=3)

  # RGB to Gray
  img_gray = tf.image.rgb_to_grayscale(img_rgb)
  img = tf.image.convert_image_dtype(img_gray, tf.float32)

  # Re-sizing the image to 300*300
  resized_img = tf.image.resize_with_pad(
      img,
      target_height=300,
      target_width=300
  )

  img_grayscale = tf.image.rgb_to_grayscale(resized_img)

  return tf.reshape(img_grayscale, [-1, 300, 300, 1])

In [4]:
raw_data = [
            {'x':   1.20},
            {'x':   2.99},
            {'x': 100.00}
]

In [5]:
import tensorflow as tf
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import schema_utils

raw_data_metadata = dataset_metadata.DatasetMetadata(
    schema_utils.schema_from_feature_spec({
        'x':tf.io.FixedLenFeature([], tf.float32),
    })
)

In [6]:
import tempfile
import tensorflow_transform.beam.impl as tft_beam

# 이전에 정의한 preprocessing_fn()
def preprocessing_fn(inputs):
  x = inputs['x']
  x_normalized = tft.scale_to_0_1(x)

  return {
      'x_xf': x_normalized
  }

with beam.Pipeline() as pipeline:
  with tft_beam.Context(temp_dir=tempfile.mkdtemp()):

    tfrecord_file = 'TFRecords_File.tfrecord'
    raw_data = (
        pipeline|beam.io.ReadFromTFRecord(tfrecord_file)
    )

    transformed_dataset, transform_fn = (
        (raw_data, raw_data_metadata)|tft_beam.AnalyzeAndTransformDataset(preprocessing_fn)
    )

FileNotFoundError: ignored

In [None]:
transformed_data, transformed_metadata = transformed_dataset

print(transformed_data)

In [7]:
# [
#  {'x_xf': 0.0},
#  {'x_xf': 0.018117407},
#  {'x_xf': 1.0}
# ]

In [7]:
import tensorflow as tf
import tensorflow_transform as tft

LABEL_KEY = 'consumer_disputed'

# 'feature_name':feature_dimension
ONE_HOT_FEATURES = {
    'product':11,
    'sub+product':45,
    'company_response':5,
    'state':60,
    'issue':90
}

# 'feature_name':bucket_count
BUCKET_FEATURES = {
    'zip_code':10
}

# 'feature_name':non-defined value
TEXT_FEATURES = {
    'consumer_complaint_narrative':None
}

In [8]:
def transformed_name(key):
  return key + '_xf'

In [9]:
def fill_in_missing(x):
  default_value = '' if x.dtype == tf.string or to_string else 0
  if type(x) == tf.SparseTensor:
    x = tf.sparse.to_dense(
        tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
        default_value
    )

  return tf.squeeze(x, axis=1)

In [11]:
def convert_num_to_one_hot(label_tensor, num_labels=2):
  one_hot_tensor = tf.one_hot(label_tensor, num_labels)

  return tf.reshape(one_hot_tensor, [-1, num_labels])

In [13]:
def convert_zip_code(zip_code):
  if zip_code == '':
    zip_code = '00000'
  zip_code = tf.strings.regex_replace(zip_code, r'X{0.5}', '0')
  zip_code = tf.strings.to_number(zip_code, out_type=tf.float32)

  return zip_code

In [14]:
def preprocessing_fn(inputs):
  outputs = {}
  for key in ONE_HOT_FEATURES. keys():
    dim = ONE_HOT_FEATURES[key]
    index = tft.compute_and_apply_vocabulary(
        fill_in_missing(inputs[key]), top_k=dim+1        
    )
    outputs[transformed_name(key)] = convert_num_to_one_hot(
        index, num_labels=dim+1
    )

    return outputs

In [None]:
for key, bucket_count in BUCKET_FEATURES.items():
  temp_feature = tft.bucketize(
      convert_zip_code(fill_in_missing(inputs[key])),
      bucket_count,
      always_return_num_quantiles=False
  )
  outputs[transformed_name(key)] = convert_num_to_one_hot(
      temp_feature,
      num_labels=bucket_count+1
  )

In [None]:
for key in TEXT_FEATURES.keys():
  outputs[transformed_name(key)] = fill_in_missing(inputs[key])

  outputs[transformed_name(LABEL_KEY)] = fill_in_missing(inputs[LABEL_KEY]) 

In [None]:
from typing import Union

import tensorflow as tf
import tensorflow_transform as tft

LABEL_KEY = 'consumer_disputed'

# 'feature_name':feature_dimension
ONE_HOT_FEATURES = {
    'product':11,
    'sub+product':45,
    'company_response':5,
    'state':60,
    'issue':90
}

# 'feature_name':bucket_count
BUCKET_FEATURES = {
    'zip_code':10
}

# 'feature_name':non-defined value
TEXT_FEATURES = {
    'consumer_complaint_narrative':None
}

def transformed_name(key):
  return key + '_xf'

def fill_in_missing(x):
  default_value = '' if x.dtype == tf.string or to_string else 0
  if type(x) == tf.SparseTensor:
    x = tf.sparse.to_dense(
        tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
        default_value
    )

  return tf.squeeze(x, axis=1)

def convert_num_to_oh(label_tensor, num_labels=2):
  one_hot_tensor = tf.one_hot(label_tensor, num_labels)

  return tf.reshape(one_hot_tensor, [-1, num_labels])

def convert_zip_code(zip_code):
  if zip_code == '':
    zip_code = '00000'
  zip_code = tf.strings.regex_replace(zip_code, r'X{0.5}', '0')
  zip_code = tf.strings.to_number(zip_code, out_type=tf.float32)

  return zip_code

def preprocessing_fn(inputs):
  outputs = {}
  for key in ONE_HOT_FEATURES. keys():
    dim = ONE_HOT_FEATURES[key]
    index = tft.compute_and_apply_vocabulary(
        fill_in_missing(inputs[key]), top_k=dim+1        
    )
    outputs[transformed_name(key)] = convert_num_to_oh(
        index, num_labels=dim+1
    )

    return outputs

  for key, bucket_count in BUCKET_FEATURES.items():
    temp_feature = tft.bucketize(
        convert_zip_code(fill_in_missing(inputs[key])),
        bucket_count,
        always_return_num_quantiles=False
    )
    outputs[transformed_name(key)] = convert_num_to_oh(
        temp_feature,
        num_labels=bucket_count+1
    )
  
  for key in TEXT_FEATURES.keys():
    outputs[transformed_name(key)] = fill_in_missing(inputs[key])

    outputs[transformed_name(LABEL_KEY)] = fill_in_missing(inputs[LABEL_KEY])

  return outputs

In [None]:
transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file=os.path.abspath('module_file_name.py')
)

context.run(transform)