In [1]:
from google.cloud import bigquery
import time, logging

In [2]:
from kfp.v2.dsl import (
    component,
    Input,
    Output,
    Dataset,
    Metrics,
    Artifact,
    Model,
    ClassificationMetrics
)

In [3]:
@component(packages_to_install=["google-cloud-bigquery==2.24.1"])
def data_selector(
    query: str,
    bq_project_id: str,
    bq_dataset_id: str,
    bq_table_id: str,
    bq_location: str,
    table_dataset: Output[Dataset]
) -> None:
    
    from google.cloud import bigquery
    import logging
    
    bq_query_data_table="{project}.{dataset}.{table}".format(
        project=bq_project_id, 
        dataset=bq_dataset_id, 
        table=bq_table_id)

    client = bigquery.Client(project=bq_project_id, location=bq_location,)

    overwrite_table = False
    job_config = bigquery.QueryJobConfig(
        write_disposition = bigquery.job.WriteDisposition.WRITE_TRUNCATE if overwrite_table else bigquery.job.WriteDisposition.WRITE_EMPTY,
        destination = bq_query_data_table)

    try:
        query_job = client.query(query = query, 
                                 job_config = job_config)
        query_job.result()
        #if .total_rows == 0:
        #    raise Exception("Query return no rows".format(bq_query_data_table))

        if query_job.errors: 
            raise Exception() 
    except Exception as e:
        logging.error(query_job.errors)
        raise e

    

    table = client.get_table(bq_query_data_table)  # Make an API request.
    table_dataset.path = "bq://{}".format(bq_query_data_table)
    table_dataset.metadata['table_name'] = bq_query_data_table
    
    print(vars(table_dataset))
    
    return None


# View table properties
#print("Table schema: {}".format(table.schema))
#print("Table description: {}".format(table.description))
#print("Table has {} rows".format(table.num_rows))

In [11]:
from google_cloud_pipeline_components import aiplatform as gcc_aip

import kfp.dsl as dsl
from kfp import components
from kfp.v2 import compiler

query = """
    SELECT planet as planets, terrestrial_date as timestamp, 5 as pt
        FROM `feature-store-mars21.mars.three_planets_tmp` WHERE 1=1
    """

bq_location = 'US'
bq_project_id = "feature-store-mars21"
bq_dataset_id = "mars"
bq_table_id = "tmp-table-v14"

bq_export_table_id = "training-v1"
    

BUCKET_NAME = "gs://feature-store-mars21"
@dsl.pipeline(
  name='bq-fs-export',
  description='',
  pipeline_root=BUCKET_NAME+"/xgb-pl"
)
def pipeline(
    query: str,
    bq_project_id: str,
    bq_dataset_id: str,
    bq_table_id: str,
    bq_location: str
):
    
    prepro_op = data_selector(
        query,
        bq_project_id,
        bq_dataset_id,
        bq_table_id,
        bq_location)
    
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="pl.json"
)

In [4]:
params = dict()

params['query'] = """
    SELECT planet as planets, terrestrial_date as timestamp, 5 as pt
        FROM `feature-store-mars21.mars.three_planets_tmp` WHERE 1=1
    """

params['bq_location'] = 'US'
params['bq_project_id'] = "feature-store-mars21"
params['bq_dataset_id'] = "mars"
params['bq_table_id'] = "tmp-table-v15"

In [5]:
from google.cloud.aiplatform.pipeline_jobs import PipelineJob


pl = PipelineJob(
        display_name= 'bq-fs-export',
        template_path= "pl.json",
        location='us-central1',
        parameter_values=params)

pl.run(sync=False)

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/202835066335/locations/us-central1/pipelineJobs/bq-fs-export-20220208142302
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/202835066335/locations/us-central1/pipelineJobs/bq-fs-export-20220208142302')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/bq-fs-export-20220208142302?project=202835066335
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/202835066335/locations/us-central1/pipelineJobs/bq-fs-export-20220208142302 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/202835066335/locations/us-central1/pipelineJobs/bq-fs-export-2022

In [23]:
from google.cloud import bigquery
import logging

bq_location = 'US'
bq_project_id = "feature-store-mars21"
bq_dataset_id = "mars"
bq_table_id = "tmp-table-v21"

bq_export_table_id = "training-v1"
    

BUCKET_NAME = "gs://feature-store-mars21"

bq_query_data_table="{project}.{dataset}.{table}".format(
    project=bq_project_id, 
    dataset=bq_dataset_id, 
    table=bq_table_id)

client = bigquery.Client(project=bq_project_id, location=bq_location,)

overwrite_table = False
job_config = bigquery.QueryJobConfig(
    write_disposition = bigquery.job.WriteDisposition.WRITE_TRUNCATE if overwrite_table else bigquery.job.WriteDisposition.WRITE_EMPTY,
    destination = bq_query_data_table)

try:
    query_job = client.query(query = query, 
                             job_config = job_config)
    query_job.result()
    #if .total_rows == 0:
    #    raise Exception("Query return no rows".format(bq_query_data_table))

    if query_job.errors: 
        raise Exception() 
except Exception as e:
    logging.error(query_job.errors)
    raise e

table_dataset_metadata={}

table = client.get_table(bq_query_data_table)  # Make an API request.
table_dataset_path = "bq://{}".format(bq_query_data_table)
table_dataset_metadata['table_name'] = bq_query_data_table



if table.num_rows==0:
    raise Exception("BQ table {} has no rows. Ensure thet your query returns results: {}".format(bq_query_data_table, query))

In [24]:
from collections import OrderedDict # in case dict is not created using python>=3.6
schema = OrderedDict((i.name,i.field_type) for i in table.schema)

In [25]:
entity_type_cols = []
pass_through_cols = []
reading_entity_types=True
for key, value in schema.items():
    if key=='timestamp':
        reading_entity_types=False
        if value!="TIMESTAMP":
            raise ValueError("timestamp column must be of type TIMESTAMP")
    else:
        if reading_entity_types==True:
            entity_type_cols.append(key)
        else:
            pass_through_cols.append(key)
        
if reading_entity_types==True: # means timestamp column was not found so this remained False
    raise ValueError("timestamp column missing from BQ table. It is required for feature store data retrieval")

In [26]:
entity_type_cols

['planets']

In [27]:
pass_through_cols

['pt']

In [28]:
# validate entity types

In [29]:
fs_location = 'us-central1'
fs_project = 'feature-store-mars21'
fs_featurestore_name = 'universe'

fs_path= 'projects/{fs_project}/locations/{fs_location}/featurestores/{fs_name}'.format(fs_project=fs_project,
                                                   fs_location=fs_location,
                                                   fs_name=fs_featurestore_name)
    
from google.cloud.aiplatform_v1beta1 import FeaturestoreServiceClient

API_ENDPOINT = "{}-aiplatform.googleapis.com".format(fs_location)

admin_client = FeaturestoreServiceClient(
    client_options={"api_endpoint": API_ENDPOINT})

fs_entities = admin_client.list_entity_types(parent=fs_path).entity_types

fs_entities = [i.name.split('/')[-1] for i in fs_entities]

if len(set(entity_type_cols).difference(fs_entities))>0:
    raise ValueError("Table column(s) {} before timestamp column do not match entities in feature store {} ".format(entity_type_cols, fs_entities))

NotFound: 404 The Featurestore does not exist.

In [577]:
# read from BQ and export to BQ

In [578]:
# features to retrieve for each entity type
my_features  = {'planets': ["avg_max_temp_5d", "arr_max_temp_3d", "min_temp_std"]}
feature_diff = set(my_features.keys()).difference(entity_type_cols)
if len(feature_diff)>0:
    raise LookupError("Features requested for entities {} that does not exist in filtering query columns: {} ".format(feature_diff, query))

In [579]:


bq_export_data_table="{project}.{dataset}.{table}".format(
    project=bq_project_id, 
    dataset=bq_dataset_id, 
    table=bq_export_table_id)




entity_type_cols

entity_type_specs_arr=[]

# Select features to read
for ent_type, features_arr in my_features.items():
    entity_type_specs_arr.append(
        featurestore_service_pb2.BatchReadFeatureValuesRequest.EntityTypeSpec(
            # read feature values of features subscriber_type and duration_minutes from "bikes"
            entity_type_id= ent_type, 
            feature_selector= feature_selector_pb2.FeatureSelector(
                id_matcher=feature_selector_pb2.IdMatcher(
                ids=features_arr))
        )
    )
    
batch_serving_request = featurestore_service_pb2.BatchReadFeatureValuesRequest(
    featurestore=fs_path,
    bigquery_read_instances=BigQuerySource(input_uri = "bq://{}".format(bq_query_data_table)),
    #csv_read_instances=io_pb2.CsvSource(
    #    gcs_source=io_pb2.GcsSource(uris=[FEATURE_REQ_CSV_PATH])),
    
    # Output info
    destination=featurestore_service_pb2.FeatureValueDestination(
        bigquery_destination=io_pb2.BigQueryDestination(
            # output to BigQuery table
            output_uri='bq://{}'.format(bq_export_data_table))),
    #destination=featurestore_service_pb2.FeatureValueDestination(
    #    tfrecord_destination=io_pb2.CsvDestination(
    #        gcs_destination=EXPORT_TF_PATH)),
    
   entity_type_specs=entity_type_specs_arr


)

In [580]:
%%time
try:
    print(admin_client.batch_read_feature_values(batch_serving_request).result())
except Exception as ex:
    print(ex)

409 Destination Table `bq://feature-store-mars21.mars.training-v1` must not exist.
CPU times: user 0 ns, sys: 4.2 ms, total: 4.2 ms
Wall time: 999 ms
