# Featurestore - Mars21
## Data Injestion
Christos Aniftos \
Soeren Petersen

In [5]:
from google.api_core import operations_v1
from google.cloud.aiplatform_v1beta1.types import io as io_pb2
from google.cloud.aiplatform_v1beta1.types.feature import Feature
from google.cloud.aiplatform_v1beta1 import FeaturestoreServiceClient
from google.cloud.aiplatform_v1beta1 import FeaturestoreOnlineServingServiceClient
from google.cloud.aiplatform_v1beta1.types import entity_type as entity_type_pb2
from google.cloud.aiplatform_v1beta1.types import featurestore as featurestore_pb2
from google.cloud.aiplatform_v1beta1.types import feature_selector as feature_selector_pb2
from google.cloud.aiplatform_v1beta1.types import featurestore_service as featurestore_service_pb2
from google.cloud.aiplatform_v1beta1.types import featurestore_online_service as featurestore_online_service_pb2

In [6]:
PROJECT_ID = "feature-store-mars21" # Change to your project id
LOCATION = "us-central1" 
API_ENDPOINT = LOCATION+"-aiplatform.googleapis.com" 
FEATURESTORE_ID = "universe"
ENTITY="customer"

## Define clients for FS admin and data management

In [7]:
# Create admin_client for CRUD 
admin_client = FeaturestoreServiceClient(
    client_options={"api_endpoint": API_ENDPOINT})


In [8]:
LOC_PATH = admin_client.common_location_path(PROJECT_ID, LOCATION)
FS_PATH = admin_client.featurestore_path(PROJECT_ID, LOCATION, FEATURESTORE_ID)
ENTITY_PATH = admin_client.entity_type_path(PROJECT_ID, LOCATION, FEATURESTORE_ID, ENTITY)
FEATURE_PATH = admin_client.feature_path(PROJECT_ID, LOCATION, FEATURESTORE_ID, ENTITY, '{}')

print("Location: \t", LOC_PATH)
print("Feature Store: \t", FS_PATH)
print("Entity: \t", ENTITY_PATH)
print("Feature: \t",FEATURE_PATH)

Location: 	 projects/feature-store-mars21/locations/us-central1
Feature Store: 	 projects/feature-store-mars21/locations/us-central1/featurestores/universe
Entity: 	 projects/feature-store-mars21/locations/us-central1/featurestores/universe/entityTypes/customer
Feature: 	 projects/feature-store-mars21/locations/us-central1/featurestores/universe/entityTypes/customer/features/{}


## Injest

#### prepare aggregated data to injest- this creates a temporary bq table with the features

In [9]:
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

query = """
CREATE OR REPLACE TABLE `{project_id}.telco.churn_features_tmp` 
AS SELECT 
t1.* EXCEPT(TotalCharges),
SAFE_CAST(t1.TotalCharges AS FLOAT64) as TotalCharges, 
IF(t1.gender='Male',True,False) male,
IF(t1.gender='Female',True,False) female,
ML.STANDARD_SCALER(t1.MonthlyCharges) OVER() as MonthlyCharges_std,
MonthlyCharges_avg_customer_clustered,
CURRENT_TIMESTAMP() as update_time 
FROM `{project_id}.telco.churn` t1
LEFT JOIN (SELECT 
    AVG(MonthlyCharges) MonthlyCharges_avg_customer_clustered,
    SeniorCitizen,
    Partner,
    PhoneService,
    InternetService,
    DeviceProtection,
    TechSupport,
    StreamingTV,
    StreamingMovies,
    PaymentMethod
FROM `myfirstproject-226013.telco.churn`  Group by 
SeniorCitizen,
Partner,
PhoneService,
InternetService,
DeviceProtection,
TechSupport,
StreamingTV,
StreamingMovies,
PaymentMethod) t2 ON
t1.SeniorCitizen=t2.SeniorCitizen AND 
t1.Partner=t2.Partner AND 
t1.PhoneService=t2.PhoneService AND 
t1.InternetService=t2.InternetService AND 
t1.DeviceProtection=t2.DeviceProtection AND 
t1.TechSupport=t2.TechSupport AND 
t1.StreamingTV=t2.StreamingTV AND 
t1.StreamingMovies=t2.StreamingMovies AND 
t1.PaymentMethod=t2.PaymentMethod
""".format(project_id=PROJECT_ID)
query_job = client.query(query)  # Make an API request.

In [10]:
query_job.result()

NotFound: 404 Not found: Dataset feature-store-mars21:telco was not found in location EU

(job ID: 443e84c9-59ea-4e7c-88e6-2d89bff249af)

                           -----Query Job SQL Follows-----                            

    |    .    |    .    |    .    |    .    |    .    |    .    |    .    |    .    |
   1:
   2:CREATE OR REPLACE TABLE `feature-store-mars21.telco.churn_features_tmp` 
   3:AS SELECT 
   4:t1.* EXCEPT(TotalCharges),
   5:SAFE_CAST(t1.TotalCharges AS FLOAT64) as TotalCharges, 
   6:IF(t1.gender='Male',1,0) male,
   7:IF(t1.gender='Female',1,0) female,
   8:ML.STANDARD_SCALER(t1.MonthlyCharges) OVER() as MonthlyCharges_std,
   9:TotalCharges_avg_customer_clustered,
  10:MonthlyCharges_avg_customer_clustered,
  11:FROM `feature-store-mars21.telco.churn` t1
  12:LEFT JOIN (SELECT 
  13:    AVG(SAFE_CAST(TotalCharges AS FLOAT64)) TotalCharges_avg_customer_clustered, 
  14:    AVG(MonthlyCharges) MonthlyCharges_avg_customer_clustered,
  15:    SeniorCitizen,
  16:    Partner,
  17:    PhoneService,
  18:    InternetService,
  19:    DeviceProtection,
  20:    TechSupport,
  21:    StreamingTV,
  22:    StreamingMovies,
  23:    PaymentMethod
  24:FROM `myfirstproject-226013.telco.churn`  Group by 
  25:SeniorCitizen,
  26:Partner,
  27:PhoneService,
  28:InternetService,
  29:DeviceProtection,
  30:TechSupport,
  31:StreamingTV,
  32:StreamingMovies,
  33:PaymentMethod) t2 ON
  34:t1.SeniorCitizen=t2.SeniorCitizen AND 
  35:t1.Partner=t2.Partner AND 
  36:t1.PhoneService=t2.PhoneService AND 
  37:t1.InternetService=t2.InternetService AND 
  38:t1.DeviceProtection=t2.DeviceProtection AND 
  39:t1.TechSupport=t2.TechSupport AND 
  40:t1.StreamingTV=t2.StreamingTV AND 
  41:t1.StreamingMovies=t2.StreamingMovies AND 
  42:t1.PaymentMethod=t2.PaymentMethod
    |    .    |    .    |    .    |    .    |    .    |    .    |    .    |    .    |

### Supported injest modes

avro_source (google.cloud.aiplatform_v1beta1.types.AvroSource):

bigquery_source (google.cloud.aiplatform_v1beta1.types.BigQuerySource):

csv_source (google.cloud.aiplatform_v1beta1.types.CsvSource):

In [11]:
ENTITY_PATH

'projects/feature-store-mars21/locations/us-central1/featurestores/universe/entityTypes/customer'

### Injest from temp table to feature store

In [8]:
import_request = featurestore_service_pb2.ImportFeatureValuesRequest(
    entity_type=ENTITY_PATH,
    bigquery_source=io_pb2.BigQuerySource(
        input_uri="bq://{project_id}.telco.churn".format(project_id=PROJECT_ID)
    ),
    feature_specs=[
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="gender", source_field="gender"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="SeniorCitizen", source_field="SeniorCitizen"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="Partner", source_field="Partner"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="Dependents", source_field="Dependents"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="tenure", source_field="tenure"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="PhoneService", source_field="PhoneService"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="MultipleLines", source_field="MultipleLines"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="InternetService", source_field="InternetService"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="OnlineSecurity", source_field="OnlineSecurity"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="OnlineBackup", source_field="OnlineBackup"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="DeviceProtection", source_field="DeviceProtection"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="TechSupport", source_field="TechSupport"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="StreamingTV", source_field="StreamingTV"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="StreamingMovies", source_field="StreamingMovies"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="Contract", source_field="Contract"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="PaperlessBilling", source_field="PaperlessBilling"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="PaymentMethod", source_field="PaymentMethod"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="MonthlyCharges", source_field="MonthlyCharges"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="male", source_field="male"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="female", source_field="female"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="MonthlyCharges_std", source_field="MonthlyCharges_std"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="TotalCharges", source_field="TotalCharges"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="MonthlyCharges_avg_customer_clustered", source_field="MonthlyCharges_avg_customer_clustered")
    ],
    entity_id_field="customerID",
    feature_time_field="update_time",
    worker_count=1)

In [9]:
%%time
admin_client.import_feature_values(import_request).result()

CPU times: user 90.6 ms, sys: 32.2 ms, total: 123 ms
Wall time: 6min 48s


imported_entity_count: 3
imported_feature_value_count: 45