# House Price - Data Processing with TF.Transform

This notebook will report all the required code to create and export a data processing pipeline using Apache Beam and TF.Transform.

Whenever possible the notebook will be coded to support cloud run on GCP and local run.

## Set global parameters

In [1]:
PROJECT = 'gcp-playground' # change to your project_Id
BUCKET = 'gcs-cloudml'     # change to your bucket name
REGION = 'region'          # change to your region
ROOT_DIR = 'houseprice_tft'# directory where the output is stored locally or on GCS

RUN_LOCAL = True # if True, the DirectRunner is used, else DataflowRunner

## Import required modules

In [2]:
import os

os.environ['PROJECT'] = PROJECT
os.environ['BUCKET'] = BUCKET
os.environ['REGION'] = REGION
os.environ['ROOT_DIR'] = ROOT_DIR
os.environ['RUN_LOCAL'] = str(RUN_LOCAL)

In [3]:
import os
import pandas as pd
import tensorflow as tf
import apache_beam as beam
import tensorflow_transform as tft

from tensorflow_transform.beam import impl
from tensorflow_transform.tf_metadata import dataset_schema
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.coders import example_proto_coder
from tensorflow_transform.tf_metadata import metadata_io
from tensorflow_transform.beam.tft_beam_io import transform_fn_io

#Apache Beam
import apache_beam as beam
from apache_beam.dataframe.io import read_csv
from apache_beam.dataframe.convert import to_dataframe
from apache_beam.dataframe.convert import to_pcollection

## Create raw data metadata

In [5]:
CATEGORICAL_FEATURE_NAMES = ['POSTED_BY', 'UNDER_CONSTRUCTION', 'RERA', 
                             'BHK_OR_RK', 'READY_TO_MOVE', 'RESALE', 
                             'ADDRESS']
CATEGORICAL_FEATURE_MAX_VALUES = [24]

NUMERIC_FEATURE_NAMES = ['BHK_NO.',  'SQUARE_FT', 'LONGITUDE','LATITUDE']
BUCKET_FEATURE_BUCKET_COUNT = [100]

TARGET_FEATURE_NAME = "TARGET(PRICE_IN_LACS)"

In [6]:
def create_raw_metadata():  
    
    raw_data_schema = {}
    
 
    # target feature schema
    raw_data_schema[TARGET_FEATURE_NAME]= dataset_schema.ColumnSchema(
        tf.float32, [], dataset_schema.FixedColumnRepresentation())
    
    # categorical features schema
    raw_data_schema.update({ column_name : dataset_schema.ColumnSchema(
        tf.string, [], dataset_schema.FixedColumnRepresentation())
                            for column_name in CATEGORICAL_FEATURE_NAMES})
    
    # numerical features schema
    raw_data_schema.update({ column_name : dataset_schema.ColumnSchema(
        tf.float32, [], dataset_schema.FixedColumnRepresentation())
                            for column_name in NUMERIC_FEATURE_NAMES})
    
    # create dataset_metadata given raw_schema
    raw_metadata = dataset_metadata.DatasetMetadata(
        dataset_schema.Schema(raw_data_schema))
    
    return raw_metadata

## Read data

In [8]:
def raw_ingestion(raw_data):
    """
    Raw data manipulation in pandas
    """
    print(raw_data)
    raw_data["ADDRESS"] = raw_data["ADDRESS"].split(",")[-1]
    return raw_data

In [9]:
def read_from_csv(pipeline, input_data, step):
    
    raw = ( pipeline | '{} - Read Data from csv file'.format(step) >> beam.io.ReadFromText(input_data))
    
    print(raw)
    # Perform ingestion with pandas function
    ingested =  raw_ingestion(raw)
    # Deferred DataFrames can also be converted back to schema'd PCollections
    raw_beam = to_pcollection(ingested, include_indexes=True)
    
    raw_metadata = create_raw_metadata()
    raw_dataset = (raw_beam, raw_metadata)
    return raw_dataset

## Transformation Pipeline

In [10]:
def run_transformation_pipeline(args):
    
    pipeline_options = beam.pipeline.PipelineOptions(flags=[], **args)
    
    runner = args['runner']
    transformed_data_location = args['transformed_data_location']
    transform_artefact_location = args['transform_artefact_location']
    temporary_dir = args['temporary_dir']
    debug = args['debug']
    
    print("Sink transformed data files location: {}".format(transformed_data_location))
    print("Sink transform artefact location: {}".format(transform_artefact_location))
    print("Temporary directory: {}".format(temporary_dir))
    print("Runner: {}".format(runner))
    print("Debug enabled: {}".format(debug))

    with beam.Pipeline(runner, options=pipeline_options) as pipeline:
        with impl.Context(temporary_dir):
            
            # Preprocess train data
            step = 'train'
            # Read raw train data from BQ
            raw_dataset = read_from_csv(pipeline,"data/train.csv", step)

### Pipeline parameters

In [11]:
%%writefile requirements.txt
tensorflow-transform==0.8.0

Overwriting requirements.txt


In [12]:
from datetime import datetime


OUTPUT_DIR = ROOT_DIR if RUN_LOCAL==True else "gs://{}/{}".format(BUCKET,ROOT_DIR)
TRANSFORM_ARTEFACTS_DIR = os.path.join(OUTPUT_DIR,'transform')
TRANSFORMED_DATA_DIR = os.path.join(OUTPUT_DIR,'transformed')
TEMP_DIR = os.path.join(OUTPUT_DIR, 'tmp')

runner = 'DirectRunner' if RUN_LOCAL == True else 'DataflowRunner'

job_name = 'preprocess-house-price-data-tft-{}'.format(datetime.utcnow().strftime('%y%m%d-%H%M%S'))

args = {
    
    'job_name': job_name,
    'runner': runner,
    'transformed_data_location':  TRANSFORMED_DATA_DIR,
    'transform_artefact_location':  TRANSFORM_ARTEFACTS_DIR,
    'temporary_dir': TEMP_DIR,
    'debug':False,
    
    'project': PROJECT,
    'region': REGION,
    'staging_location': os.path.join(OUTPUT_DIR, 'staging'),
    'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
    'worker_machine_type': 'n1-standard-1',
    'requirements_file': 'requirements.txt',
}

### Runner

In [13]:
try: 
    tf.gfile.DeleteRecursively(TRANSFORMED_DATA_DIR)
    tf.gfile.DeleteRecursively(TRANSFORM_ARTEFACTS_DIR)
    tf.gfile.DeleteRecursively(TEMP_DIR)
    print('previous transformation files deleted!')
except:
    pass

print('Launching {} job {} ... hang on'.format(runner, job_name))
print("")
run_transformation_pipeline(args)
print("Done!")



Launching DirectRunner job preprocess-house-price-data-tft-210122-154508 ... hang on

Sink transformed data files location: houseprice_tft/transformed
Sink transform artefact location: houseprice_tft/transform
Temporary directory: houseprice_tft/tmp
Runner: DirectRunner
Debug enabled: False


PCollection[train - Read Data from csv file/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn).None]
PCollection[train - Read Data from csv file/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn).None]


TypeError: 'PCollection' object is not subscriptable