# Imports and Setup

In [24]:
import os 
import sys
import jsonlines
from google.cloud import aiplatform, storage
from google.protobuf import json_format
from datetime import datetime
import tensorflow as tf 
from tfx import v1 as tfx

%env GOOGLE_APPLICATION_CREDENTIALS /media/david/warehaus1/youtube_series/proven-script.json

GOOGLE_CLOUD_REGION = 'us-central1'
GOOGLE_CLOUD_PROJECT  = 'proven-script-347020'
GCS_BUCKET_NAME = 'salary-pipeline-347020'

PIPELINE_NAME = 'salary-pipeline'

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
# Path to various pipeline artifact.
PIPELINE_ROOT = f'gs://{GCS_BUCKET_NAME}/pipeline_root/{PIPELINE_NAME}'

# Paths for users' Python module.
MODULE_ROOT = f'gs://{GCS_BUCKET_NAME}/pipeline_module/{PIPELINE_NAME}'

# Paths for users' data.
DATA_ROOT = f'gs://{GCS_BUCKET_NAME}/data/{PIPELINE_NAME}'

# Name of Vertex AI Endpoint.
ENDPOINT_NAME = PIPELINE_NAME +'-'+ TIMESTAMP


env: GOOGLE_APPLICATION_CREDENTIALS=/media/david/warehaus1/youtube_series/proven-script.json


In [5]:
'''
!pip install tfx==1.4.0
!pip install "kfp<2"
'''


'\n!pip install tfx==1.4.0\n!pip install "kfp<2"\n'

In [6]:
!gsutil mb -l {GOOGLE_CLOUD_REGION} gs://{GCS_BUCKET_NAME}
!gsutil cp data.csv {DATA_ROOT}/

Creating gs://salary-pipeline-347020/...
Copying file://data.csv [Content-Type=text/csv]...
\ [1 files][  3.2 MiB/  3.2 MiB]                                                
Operation completed over 1 objects/3.2 MiB.                                      


In [20]:
import pandas as pd  
df = pd.read_csv('data.csv')
df.head()

Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,label
0,39,State-gov,77516,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,2174,0,40,United-States,0
1,50,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,13,United-States,0
2,38,Private,215646,HS-grad,9,Divorced,Handlers-cleaners,Not-in-family,White,Male,0,0,40,United-States,0
3,53,Private,234721,11th,7,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0,0,40,United-States,0
4,28,Private,338409,Bachelors,13,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0,0,40,Cuba,0


# Write module code

In [28]:
%%writefile transform.py

import tensorflow as tf 
import tensorflow_transform as tft 

NUMERIC_FEATURE_KEYS = [
    'age', 'capital-gain','capital-loss',
    'education-num', 'fnlwgt', 'hours-per-week'
]

VOCAB_FEATURE_DICT ={
    'education':16, 'marital-status':7, 'native-country':41,
    'occupation':15, 'race':5, 'relationship':6, 'sex':2,
    'workclass':9
}

NUM_OOV_BUCKETS = 2

LABEL_KEY = 'label'

def transformed_name(key):
    key = key.replace('-', '_')
    return key + '_xf'

def preprocessing_fn(inputs):
    
    outputs = {}
    
    for key in NUMERIC_FEATURE_KEYS:
        scaled = tft.scale_to_0_1(inputs[key])
        outputs[transformed_name(key)] = tf.reshape(scaled, [-1])
        
    for key, vocab_size in VOCAB_FEATURE_DICT.items():
        indices = tft.compute_and_apply_vocabulary(inputs[key], num_oov_buckets = NUM_OOV_BUCKETS)
        one_hot = tf.one_hot(indices, vocab_size + NUM_OOV_BUCKETS)
        outputs[transformed_name(key)] = tf.reshape(one_hot, [-1, vocab_size + NUM_OOV_BUCKETS])
        
    outputs[transformed_name(LABEL_KEY)] = tf.cast(inputs[LABEL_KEY], tf.float32)
    return outputs

Overwriting transform.py


# Create Pipeline

In [29]:
def _create_pipeline(pipeline_name:str, pipeline_root:str, data_root:str,
                    module_file:str, endpoint_name:str, project_id:str,
                    region:str, use_gpu:bool) -> tfx.dsl.Pipeline:
    # ingest data
    example_gen = tfx.components.CsvExampleGen(input_base=data_root)
    
    # generate statistics
    statistics_gen = tfx.components.StatisticsGen(examples=example_gen.outputs['examples'])
    
    # generate schema
    schema_gen= tfx.components.SchemaGen(statistics=statistics_gen.outputs['statistics'])
    
    # Identify anomalies
    validator = tfx.components.ExampleValidator(statistics=statistics_gen.outputs['statistics'],
                                               schema=schema_gen.outputs['schema'])
    
    # transform component
    transform = tfx.components.Transform(
        examples=example_gen.outputs['examples'],
        schema=schema_gen.outputs['schema'],
        module_file='gs://salary-pipeline-347020/models/transform.py'
    )
    
    #components
    components = [
        example_gen,
        statistics_gen,
        schema_gen,
        validator,
        transform
        
    ]
    
    return tfx.dsl.Pipeline(
    pipeline_name=pipeline_name,
    pipeline_root=pipeline_root,
    components=components
    )

# Run Pipeline

In [30]:
PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE
    )
_ = runner.run(
        _create_pipeline(
            pipeline_name=PIPELINE_NAME,
            pipeline_root=PIPELINE_ROOT,
            data_root=DATA_ROOT,
            module_file='',
            endpoint_name='',
            project_id=GOOGLE_CLOUD_PROJECT,
            region=GOOGLE_CLOUD_REGION,
            use_gpu=False
        )
)

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.


# Submit job to vertex ai platform

In [31]:
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
import logging 

logging.getLogger().setLevel(logging.INFO)

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                               display_name=PIPELINE_NAME)
job.submit()

Creating PipelineJob


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob


PipelineJob created. Resource name: projects/167901156608/locations/us-central1/pipelineJobs/salary-pipeline-20220512121814


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/167901156608/locations/us-central1/pipelineJobs/salary-pipeline-20220512121814


To use this PipelineJob in another session:


INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:


pipeline_job = aiplatform.PipelineJob.get('projects/167901156608/locations/us-central1/pipelineJobs/salary-pipeline-20220512121814')


INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/167901156608/locations/us-central1/pipelineJobs/salary-pipeline-20220512121814')


View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/salary-pipeline-20220512121814?project=167901156608


INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/salary-pipeline-20220512121814?project=167901156608


# Visualize Statistics

In [2]:
import os
import tensorflow as tf
import tensorflow_data_validation as tfdv
STATS_URI = "gs://salary-pipeline-347020/pipeline_root/salary-pipeline/167901156608/salary-pipeline-20220506075236/StatisticsGen_1800290861297172480/statistics"
directories = tf.io.gfile.glob(os.path.join(STATS_URI, 'Split-*'))
names = map(os.path.basename, directories)
splits = {name: os.path.join(directory, 'FeatureStats.pb') for name, directory in zip(names, directories)}


In [3]:
splits 

{'Split-eval': 'gs://salary-pipeline-347020/pipeline_root/salary-pipeline/167901156608/salary-pipeline-20220506075236/StatisticsGen_1800290861297172480/statistics/Split-eval/FeatureStats.pb',
 'Split-train': 'gs://salary-pipeline-347020/pipeline_root/salary-pipeline/167901156608/salary-pipeline-20220506075236/StatisticsGen_1800290861297172480/statistics/Split-train/FeatureStats.pb'}

In [4]:
lhs_split = 'Split-train'
rhs_split = 'Split-eval'

In [7]:
tfdv.visualize_statistics(
    lhs_statistics=tfdv.load_stats_binary(splits[lhs_split]),
    lhs_name=lhs_split,
    rhs_statistics=tfdv.load_stats_binary(splits[rhs_split]),
    rhs_name=rhs_split
)

# View Schema

In [8]:
import os
import tensorflow_data_validation as tfdv
SCHEMA_URI = "gs://salary-pipeline-347020/pipeline_root/salary-pipeline/167901156608/salary-pipeline-20220506075236/SchemaGen_-1658473652523368448/schema"
schema = tfdv.load_schema_text(os.path.join(SCHEMA_URI, 'schema.pbtxt'))
tfdv.display_schema(schema)

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'age',INT,required,,-
'capital-gain',INT,required,,-
'capital-loss',INT,required,,-
'education',STRING,required,,'education'
'education-num',INT,required,,-
'fnlwgt',INT,required,,-
'hours-per-week',INT,required,,-
'label',INT,required,,-
'marital-status',STRING,required,,'marital-status'
'native-country',STRING,required,,'native-country'


Unnamed: 0_level_0,Values
Domain,Unnamed: 1_level_1
'education',"'10th', '11th', '12th', '1st-4th', '5th-6th', '7th-8th', '9th', 'Assoc-acdm', 'Assoc-voc', 'Bachelors', 'Doctorate', 'HS-grad', 'Masters', 'Preschool', 'Prof-school', 'Some-college'"
'marital-status',"'Divorced', 'Married-AF-spouse', 'Married-civ-spouse', 'Married-spouse-absent', 'Never-married', 'Separated', 'Widowed'"
'native-country',"'?', 'Cambodia', 'Canada', 'China', 'Columbia', 'Cuba', 'Dominican-Republic', 'Ecuador', 'El-Salvador', 'England', 'France', 'Germany', 'Greece', 'Guatemala', 'Haiti', 'Holand-Netherlands', 'Honduras', 'Hong', 'Hungary', 'India', 'Iran', 'Ireland', 'Italy', 'Jamaica', 'Japan', 'Laos', 'Mexico', 'Nicaragua', 'Outlying-US(Guam-USVI-etc)', 'Peru', 'Philippines', 'Poland', 'Portugal', 'Puerto-Rico', 'Scotland', 'South', 'Taiwan', 'Thailand', 'Trinadad&Tobago', 'United-States', 'Vietnam', 'Yugoslavia'"
'occupation',"'?', 'Adm-clerical', 'Armed-Forces', 'Craft-repair', 'Exec-managerial', 'Farming-fishing', 'Handlers-cleaners', 'Machine-op-inspct', 'Other-service', 'Priv-house-serv', 'Prof-specialty', 'Protective-serv', 'Sales', 'Tech-support', 'Transport-moving'"
'race',"'Amer-Indian-Eskimo', 'Asian-Pac-Islander', 'Black', 'Other', 'White'"
'relationship',"'Husband', 'Not-in-family', 'Other-relative', 'Own-child', 'Unmarried', 'Wife'"
'sex',"'Female', 'Male'"
'workclass',"'?', 'Federal-gov', 'Local-gov', 'Never-worked', 'Private', 'Self-emp-inc', 'Self-emp-not-inc', 'State-gov', 'Without-pay'"


# View anomalies

In [9]:
import os
import tensorflow_data_validation as tfdv
ANOMALIES_URI = "gs://salary-pipeline-347020/pipeline_root/salary-pipeline/167901156608/salary-pipeline-20220506075236/ExampleValidator_7564898384331407360/anomalies"
anomalies = tfdv.load_anomalies_text(ANOMALIES_URI)
tfdv.display_anomalies(anomalies)