In [34]:
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/Users/prichoud3/Documents/MLOps-Tutorial/terraform/gcp-creds.json"

In [41]:
from google.api_core.client_options import ClientOptions
from google.cloud import bigquery
from google.cloud.aiplatform import Featurestore
from google.cloud.bigquery.table import TableReference

params = {
    'PROJECT_ID': "gcp-tutorial-400612",
    'REGION': "us-central1",
    'ONLINE_STORE_FIXED_NODE_COUNT': 1,
    'TEMP_DATASET': "dataset_011023",
    'TEMP_SRC_TABLE': "preprocessed_data",
    'NEW_FEATURE_STORE_NAME': "feature_store_21023"
}
PROJECT_ID=params['PROJECT_ID']
REGION=params['REGION']
ONLINE_STORE_FIXED_NODE_COUNT=params['ONLINE_STORE_FIXED_NODE_COUNT']
TEMP_DATASET=params['TEMP_DATASET']
TEMP_SRC_TABLE=params['TEMP_SRC_TABLE']
NEW_FEATURE_STORE_NAME=params['NEW_FEATURE_STORE_NAME']

BQ_CLIENT_INFO = ClientOptions(quota_project_id = PROJECT_ID)
BQ_CLIENT = bigquery.client.Client(project = PROJECT_ID, 
                                    client_options=BQ_CLIENT_INFO)

def map_dtype_to_featurestore(feature_type):
    if feature_type == "STRING":
        return "STRING"
    elif feature_type in ["INTEGER", "INT", "SMALLINT", "INTEGER", "BIGINT", "TINYINT", "BYTEINT", "INT64"]:
        return "INT64"
    elif feature_type.startswith("ARRAY") or feature_type.startswith("STRUCT"):
        raise "Cannot process source table having columns with datatype " + feature_type
    elif feature_type in ["BIGNUMERIC", "NUMERIC", "DECIMAL", "BIGDECIMAL", "FLOAT64", "FLOAT"]:
        return "DOUBLE"
    elif feature_type.startswith("BIGNUMERIC") or feature_type.startswith("NUMERIC") or feature_type.startswith("DECIMAL") or feature_type.startswith("BIGDECIMAL"):
        return "DOUBLE"
    elif feature_type == "BOOL":
        return "BOOL"
    elif feature_type == "BYTES":
        return "BYTES"
    elif feature_type in ["DATE", "DATETIME", "INTERVAL", "TIME", "TIMESTAMP"]:
        return "STRING"
    elif feature_type == "JSON":
        return "STRING"
    else:
        return "STRING"


def populate_feature_store(name):
    fs_already_exists = False

    # Check if already exists
    try:
        fs = Featurestore(
            featurestore_name=name,
            project=PROJECT_ID,
            location=REGION,
        )
        print('Feature Store already exists')
        preprocessed_entity_type = fs.get_entity_type(
            entity_type_id="preprocessed"
        )
        fs_already_exists = True
        return fs, preprocessed_entity_type, fs_already_exists
    except Exception as e:
        print('Feature Store does not exists. Creating Feature Store')
        fs = Featurestore.create(
            featurestore_id=name,
            online_store_fixed_node_count=ONLINE_STORE_FIXED_NODE_COUNT,
            project=PROJECT_ID,
            location=REGION,
            sync=True,
        )

    fs = Featurestore(
        featurestore_name=name,
        project=PROJECT_ID,
        location=REGION,
    )
    print(f"{fs.gca_resource=}")

    preprocessed_entity_type = fs.create_entity_type(
        entity_type_id="preprocessed",
        description="Reading of metadata from app",
    )
    table_obj = BQ_CLIENT.get_table('{}.{}.{}'.format(PROJECT_ID, TEMP_DATASET, TEMP_SRC_TABLE))
    for s in table_obj.schema:
        preprocessed_entity_type.create_feature(
            feature_id=s.name.lower(),
            value_type=map_dtype_to_featurestore(s.field_type),
            # description="Unnamed integer column",
        )

    return fs, preprocessed_entity_type, fs_already_exists

def get_feature_source_fields(preprocessed_entity_type):
    lof = preprocessed_entity_type.list_features(order_by='create_time')
    lofn = [f.name for f in lof]
    # LOGGER.info(lofn)

    src_table = BQ_CLIENT.get_table(TableReference.from_string('{}.{}.{}'.format(PROJECT_ID, TEMP_DATASET, TEMP_SRC_TABLE), default_project=PROJECT_ID))
    columns = [s.name for s in src_table.schema]

    print('Obtained mapping from feature store to bigquery')
    return lofn, dict(zip(lofn, columns))



def populate_features_extract_features(fs, preprocessed_entity_type, fs_already_exists):
    try:
        lofn, feature_source_fields = get_feature_source_fields(preprocessed_entity_type)
        if fs_already_exists is False:
            preprocessed_entity_type.ingest_from_bq(
                feature_ids=lofn,
                feature_time="created_at",
                bq_source_uri='bq://{}.{}.{}'.format(PROJECT_ID, TEMP_DATASET, TEMP_SRC_TABLE),
                feature_source_fields=feature_source_fields,
                entity_id_field='Address',
                disable_online_serving=False,
                sync=True
            )
            print('Ingested Bigquery Source table into Feature Store')
    except:
        print('Error populating features in bigquery')
        raise



In [36]:
fs, preprocessed_entity_type, fs_already_exists = populate_feature_store(NEW_FEATURE_STORE_NAME)


Feature Store does not exists. Creating Feature Store
Creating Featurestore
Create Featurestore backing LRO: projects/836048009338/locations/us-central1/featurestores/feature_store_21023/operations/1841782787997171712
Featurestore created. Resource name: projects/836048009338/locations/us-central1/featurestores/feature_store_21023
To use this Featurestore in another session:
featurestore = aiplatform.Featurestore('projects/836048009338/locations/us-central1/featurestores/feature_store_21023')
fs.gca_resource=name: "projects/836048009338/locations/us-central1/featurestores/feature_store_21023"
create_time {
  seconds: 1696198075
  nanos: 512194000
}
update_time {
  seconds: 1696198075
  nanos: 709250000
}
etag: "AMEw9yPw6cYYdJiSLou_c8YkyfwQLsX1ZKltpZYFs0Pzql-JroKxDzIft69rHxDmKHe5"
online_serving_config {
  fixed_node_count: 1
}
state: STABLE
online_storage_ttl_days: 4000

Creating EntityType
Create EntityType backing LRO: projects/836048009338/locations/us-central1/featurestores/feature

In [42]:
populate_features_extract_features(fs, preprocessed_entity_type, fs_already_exists)

Error populating features in bigquery


NotFound: 404 The Featurestore does not exist.

In [10]:
fs.get_entity_type("preprocessed").read(entity_ids="preprocessed",feature_ids=["rooms"])

Unnamed: 0,entity_id,rooms
0,preprocessed,


In [40]:
fs.delete("feature_store_21023", force=True)

Deleting Featurestore : projects/836048009338/locations/us-central1/featurestores/feature_store_21023
Delete Featurestore  backing LRO: projects/836048009338/locations/us-central1/operations/6298094619280277504
Featurestore deleted. . Resource name: projects/836048009338/locations/us-central1/featurestores/feature_store_21023
