# Featurestore - Mars21
## Feature retrieval (online & batch)
Christos Aniftos \
Soeren Petersen

In [100]:
from google.api_core import operations_v1
from google.cloud.aiplatform_v1beta1.types import io as io_pb2
from google.cloud.aiplatform_v1beta1.types.feature import Feature
from google.cloud.aiplatform_v1beta1 import FeaturestoreServiceClient
from google.cloud.aiplatform_v1beta1 import FeaturestoreOnlineServingServiceClient
from google.cloud.aiplatform_v1beta1.types import entity_type as entity_type_pb2
from google.cloud.aiplatform_v1beta1.types import featurestore as featurestore_pb2
from google.cloud.aiplatform_v1beta1.types import feature_selector as feature_selector_pb2
from google.cloud.aiplatform_v1beta1.types import featurestore_service as featurestore_service_pb2
from google.cloud.aiplatform_v1beta1.types import featurestore_online_service as featurestore_online_service_pb2

In [101]:
PROJECT_ID=!gcloud config get-value project # returns default project id 
PROJECT_ID=PROJECT_ID[0]

LOCATION = 'europe-west4'
API_ENDPOINT = LOCATION+"-aiplatform.googleapis.com" 
FEATURESTORE_ID = "telco"
ENTITY_CUSTOMER="customer"
ENTITY_PHONE="phone"

## Define clients for FS admin and data management

In [102]:
# Create admin_client for CRUD and data_client for reading feature values.
admin_client = FeaturestoreServiceClient(
    client_options={"api_endpoint": API_ENDPOINT})

data_client = FeaturestoreOnlineServingServiceClient(
    client_options={"api_endpoint": API_ENDPOINT})

In [103]:
LOC_PATH = admin_client.common_location_path(PROJECT_ID, LOCATION)
FS_PATH = admin_client.featurestore_path(PROJECT_ID, LOCATION, FEATURESTORE_ID)
ENTITY_CUSTOMER_PATH = admin_client.entity_type_path(PROJECT_ID, LOCATION, FEATURESTORE_ID, ENTITY_CUSTOMER)
FEATURE_CUSTOMER_PATH = admin_client.feature_path(PROJECT_ID, LOCATION, FEATURESTORE_ID, ENTITY_CUSTOMER, '{}')
ENTITY_PHONE_PATH = admin_client.entity_type_path(PROJECT_ID, LOCATION, FEATURESTORE_ID, ENTITY_PHONE)
FEATURE_PHONE_PATH = admin_client.feature_path(PROJECT_ID, LOCATION, FEATURESTORE_ID, ENTITY_PHONE, '{}')

print("Location: \t", LOC_PATH)
print("Feature Store: \t", FS_PATH)
print("Entity customer: \t", FEATURE_CUSTOMER_PATH)
print("Feature customer: \t",FEATURE_CUSTOMER_PATH)
print("Entity phone: \t", ENTITY_PHONE_PATH)
print("Feature phone: \t",FEATURE_PHONE_PATH)

Location: 	 projects/myfirstproject-226013/locations/europe-west4
Feature Store: 	 projects/myfirstproject-226013/locations/europe-west4/featurestores/universe
Entity customer: 	 projects/myfirstproject-226013/locations/europe-west4/featurestores/universe/entityTypes/customer/features/{}
Feature customer: 	 projects/myfirstproject-226013/locations/europe-west4/featurestores/universe/entityTypes/customer/features/{}
Entity phone: 	 projects/myfirstproject-226013/locations/europe-west4/featurestores/universe/entityTypes/phone
Feature phone: 	 projects/myfirstproject-226013/locations/europe-west4/featurestores/universe/entityTypes/phone/features/{}


## Read Values from FS Online Storage - Real time!

In [104]:
feature_selector_customer = feature_selector_pb2.FeatureSelector(
    id_matcher=feature_selector_pb2.IdMatcher(
        ids=["senior_citizen", "partner", "monthly_charges_avg_customer_clustered", "monthly_charges", "payment_method"]))

feature_selector_phone = feature_selector_pb2.FeatureSelector(
    id_matcher=feature_selector_pb2.IdMatcher(
        ids=["approx_price_euro"]))

In [110]:
%%time
customer_id = "9708-HPXWZ"
data_client.read_feature_values(
    featurestore_online_service_pb2.ReadFeatureValuesRequest(
        entity_type=ENTITY_CUSTOMER_PATH,
        entity_id= customer_id,
        feature_selector=feature_selector_customer))

CPU times: user 4.01 ms, sys: 0 ns, total: 4.01 ms
Wall time: 847 ms


header {
  entity_type: "projects/478111835512/locations/europe-west4/featurestores/universe/entityTypes/customer"
  feature_descriptors {
    id: "senior_citizen"
  }
  feature_descriptors {
    id: "partner"
  }
  feature_descriptors {
    id: "monthly_charges_avg_customer_clustered"
  }
  feature_descriptors {
    id: "monthly_charges"
  }
  feature_descriptors {
    id: "payment_method"
  }
}
entity_view {
  entity_id: "9708-HPXWZ"
  data {
    value {
      bool_value: true
      metadata {
        generate_time {
          seconds: 1629721572
          nanos: 772000000
        }
      }
    }
  }
  data {
    value {
      bool_value: false
      metadata {
        generate_time {
          seconds: 1629721572
          nanos: 772000000
        }
      }
    }
  }
  data {
    value {
      double_value: 45.4
      metadata {
        generate_time {
          seconds: 1629721572
          nanos: 772000000
        }
      }
    }
  }
  data {
    value {
      double_value: 45.4
 

In [111]:
%%time
customer_id = "Apple - iPhone 4"
data_client.read_feature_values(
    featurestore_online_service_pb2.ReadFeatureValuesRequest(
        entity_type=ENTITY_PHONE_PATH,
        entity_id= customer_id,
        feature_selector=feature_selector_phone))

CPU times: user 3.39 ms, sys: 0 ns, total: 3.39 ms
Wall time: 814 ms


header {
  entity_type: "projects/478111835512/locations/europe-west4/featurestores/universe/entityTypes/phone"
  feature_descriptors {
    id: "approx_price_euro"
  }
}
entity_view {
  entity_id: "Apple - iPhone 4"
  data {
    value {
      int64_value: 200
      metadata {
        generate_time {
          seconds: 1629721591
          nanos: 270000000
        }
      }
    }
  }
}

In [96]:
%%time
response_stream = data_client.streaming_read_feature_values(
    featurestore_online_service_pb2.StreamingReadFeatureValuesRequest(
        entity_type=ENTITY_PATH,
        entity_ids=["9708-HPXWZ", "2523-EWWZL"],
        feature_selector=feature_selector_customer))

for response in response_stream:
  print(response)

header {
  entity_type: "projects/478111835512/locations/europe-west4/featurestores/universe/entityTypes/customer"
  feature_descriptors {
    id: "senior_citizen"
  }
  feature_descriptors {
    id: "partner"
  }
  feature_descriptors {
    id: "monthly_charges_avg_customer_clustered"
  }
  feature_descriptors {
    id: "monthly_charges"
  }
  feature_descriptors {
    id: "payment_method"
  }
}

entity_view {
  entity_id: "2523-EWWZL"
  data {
    value {
      bool_value: false
      metadata {
        generate_time {
          seconds: 1629721572
          nanos: 772000000
        }
      }
    }
  }
  data {
    value {
      bool_value: true
      metadata {
        generate_time {
          seconds: 1629721572
          nanos: 772000000
        }
      }
    }
  }
  data {
    value {
      double_value: 73.93813559322037
      metadata {
        generate_time {
          seconds: 1629721572
          nanos: 772000000
        }
      }
    }
  }
  data {
    value {
      double

## Read Values from BATCH storage

In [112]:
!gsutil cp feature-request.csv gs://$PROJECT_ID/telco-churn/feature-request.csv

Copying file://feature-request.csv [Content-Type=text/csv]...
/ [1 files][  263.0 B/  263.0 B]                                                
Operation completed over 1 objects/263.0 B.                                      


In [113]:
EXPORT_TABLE_URI = "bq://{}.telco.training_churn_data_v6".format(PROJECT_ID)
FEATURE_REQ_CSV_PATH = "gs://{}/telco-churn/feature-request.csv".format(PROJECT_ID)

In [114]:
batch_serving_request = featurestore_service_pb2.BatchReadFeatureValuesRequest(
    featurestore=FS_PATH,
    csv_read_instances=io_pb2.CsvSource(
        gcs_source=io_pb2.GcsSource(uris=[FEATURE_REQ_CSV_PATH])),
    
    # Output info
    destination=featurestore_service_pb2.FeatureValueDestination(
        bigquery_destination=io_pb2.BigQueryDestination(
            # output to BigQuery table
            output_uri=EXPORT_TABLE_URI)),
    #destination=featurestore_service_pb2.FeatureValueDestination(
    #    tfrecord_destination=io_pb2.CsvDestination(
    #        gcs_destination=EXPORT_TF_PATH)),
   

    # Select features to read
    entity_type_specs=[
        featurestore_service_pb2.BatchReadFeatureValuesRequest.EntityTypeSpec(
            # read feature values of features subscriber_type and duration_minutes from "bikes"
            entity_type_id=ENTITY_CUSTOMER, 
            feature_selector=feature_selector_customer
        ),
        featurestore_service_pb2.BatchReadFeatureValuesRequest.EntityTypeSpec(
            # read feature values of features subscriber_type and duration_minutes from "bikes"
            entity_type_id=ENTITY_PHONE, 
            feature_selector=feature_selector_phone
        ),
    ])

In [115]:
%%time
try:
    print(admin_client.batch_read_feature_values(batch_serving_request).result())
except Exception as ex:
    print(ex)


CPU times: user 33.4 ms, sys: 0 ns, total: 33.4 ms
Wall time: 47.2 s
