# Featurestore - Mars21
## Data Injestion
Christos Aniftos \
Soeren Petersen

In [None]:
from google.api_core import operations_v1
from google.cloud.aiplatform_v1beta1.types import io as io_pb2
from google.cloud.aiplatform_v1beta1.types.feature import Feature
from google.cloud.aiplatform_v1beta1 import FeaturestoreServiceClient
from google.cloud.aiplatform_v1beta1 import FeaturestoreOnlineServingServiceClient
from google.cloud.aiplatform_v1beta1.types import entity_type as entity_type_pb2
from google.cloud.aiplatform_v1beta1.types import featurestore as featurestore_pb2
from google.cloud.aiplatform_v1beta1.types import feature_selector as feature_selector_pb2
from google.cloud.aiplatform_v1beta1.types import featurestore_service as featurestore_service_pb2
from google.cloud.aiplatform_v1beta1.types import featurestore_online_service as featurestore_online_service_pb2

In [None]:
PROJECT_ID = "PROJECT_ID" # Change to your project id
LOCATION = "us-central1" 
API_ENDPOINT = LOCATION+"-aiplatform.googleapis.com" 
FEATURESTORE_ID = "universe"
ENTITY="planets"

## Define clinets for FS admin and data management

In [None]:
# Create admin_client for CRUD 
admin_client = FeaturestoreServiceClient(
    client_options={"api_endpoint": API_ENDPOINT})

In [None]:
LOC_PATH = admin_client.common_location_path(PROJECT_ID, LOCATION)
FS_PATH = admin_client.featurestore_path(PROJECT_ID, LOCATION, FEATURESTORE_ID)
ENTITY_PATH = admin_client.entity_type_path(PROJECT_ID, LOCATION, FEATURESTORE_ID, ENTITY)
FEATURE_PATH = admin_client.feature_path(PROJECT_ID, LOCATION, FEATURESTORE_ID, ENTITY, '{}')

print("Location: \t", LOC_PATH)
print("Feature Store: \t", FS_PATH)
print("Entity: \t", ENTITY_PATH)
print("Feature: \t",FEATURE_PATH)

## Injest

#### prepare aggregated data to injest- this creates a temporary bq table with the features

In [None]:
from google.cloud import bigquery
date_1 = "2012-09-02 00:00:00+00:00"
date_2 = "2012-10-02 00:00:00+00:00"
date_3 = "2012-11-02 00:00:00+00:00"
# Construct a BigQuery client object.
client = bigquery.Client()

query = """
CREATE OR REPLACE TABLE mars.planets_tmp AS 
SELECT * FROM (SELECT 
    *,
    ML.STANDARD_SCALER(sol) OVER() sol_std,
    ML.STANDARD_SCALER(min_temp) OVER() min_temp_std,
    ML.STANDARD_SCALER(max_temp) OVER() max_temp_std,
    ML.STANDARD_SCALER(pressure) OVER() pressure_std,
    ML.STANDARD_SCALER(ls) OVER() ls_std,
    AVG(pressure) OVER (PARTITION BY planet ORDER BY terrestrial_date ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) pressure_5_days,
    AVG(min_temp) OVER (PARTITION BY planet ORDER BY terrestrial_date ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) min_temp_5_days,
    AVG(max_temp) OVER (PARTITION BY planet ORDER BY terrestrial_date ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) max_temp_5_days,
    AVG(ls) OVER (PARTITION BY planet ORDER BY terrestrial_date ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) ls_5_days,
    ARRAY_AGG(pressure) OVER (PARTITION BY planet ORDER BY terrestrial_date ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) pressure_last_3_days,
    ARRAY_AGG(ls) OVER (PARTITION BY planet ORDER BY terrestrial_date ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) ls_last_3_days,
    ARRAY_AGG(min_temp) OVER (PARTITION BY planet ORDER BY terrestrial_date ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) min_temp_last_3_days,
    ARRAY_AGG(max_temp) OVER (PARTITION BY planet ORDER BY terrestrial_date ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) max_temp_last_3_days,
FROM
(SELECT 
    id,
    CAST(terrestrial_date AS TIMESTAMP) as terrestrial_date,
    sol,
    ls,
    month,
    min_temp,
    max_temp,
    pressure,
    atmo_opacity,
    'mars' AS planet
 FROM `feature-store-mars21.mars.weather`
 WHERE IS_NAN(min_temp) IS False
 and IS_NAN(max_temp) IS False
 and IS_NAN(ls) IS False
 and IS_NAN(pressure) IS False)
UNION ALL

### Pluto
SELECT 
    *,
    ML.STANDARD_SCALER(sol) OVER() sol_std,
    ML.STANDARD_SCALER(min_temp) OVER() min_temp_std,
    ML.STANDARD_SCALER(max_temp) OVER() max_temp_std,
    ML.STANDARD_SCALER(pressure) OVER() pressure_std,
    ML.STANDARD_SCALER(ls) OVER() ls_std,
    AVG(pressure) OVER (PARTITION BY planet ORDER BY terrestrial_date ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) pressure_5_days,
    AVG(min_temp) OVER (PARTITION BY planet ORDER BY terrestrial_date ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) min_temp_5_days,
    AVG(max_temp) OVER (PARTITION BY planet ORDER BY terrestrial_date ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) max_temp_5_days,
    AVG(ls) OVER (PARTITION BY planet ORDER BY terrestrial_date ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) ls_5_days,
    ARRAY_AGG(pressure) OVER (PARTITION BY planet ORDER BY terrestrial_date ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) pressure_last_3_days,
    ARRAY_AGG(ls) OVER (PARTITION BY planet ORDER BY terrestrial_date ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) ls_last_3_days,
    ARRAY_AGG(min_temp) OVER (PARTITION BY planet ORDER BY terrestrial_date ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) min_temp_last_3_days,
    ARRAY_AGG(max_temp) OVER (PARTITION BY planet ORDER BY terrestrial_date ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) max_temp_last_3_days,
FROM
(SELECT 
    id,
    CAST(terrestrial_date AS TIMESTAMP) as terrestrial_date,
    2*sol sol,
    2*ls ls,
    month,
    2*min_temp min_temp,
    2*max_temp max_temp,
    2*pressure pressure,
    atmo_opacity,
    'pluto' AS planet
 FROM `feature-store-mars21.mars.weather`
 WHERE IS_NAN(min_temp) IS False
 and IS_NAN(max_temp) IS False
 and IS_NAN(ls) IS False
 and IS_NAN(pressure) IS False)

UNION ALL

 ### Jupyter
SELECT 
    *,
    ML.STANDARD_SCALER(sol) OVER() sol_std,
    ML.STANDARD_SCALER(min_temp) OVER() min_temp_std,
    ML.STANDARD_SCALER(max_temp) OVER() max_temp_std,
    ML.STANDARD_SCALER(pressure) OVER() pressure_std,
    ML.STANDARD_SCALER(ls) OVER() ls_std,
    AVG(pressure) OVER (PARTITION BY planet ORDER BY terrestrial_date ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) pressure_5_days,
    AVG(min_temp) OVER (PARTITION BY planet ORDER BY terrestrial_date ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) min_temp_5_days,
    AVG(max_temp) OVER (PARTITION BY planet ORDER BY terrestrial_date ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) max_temp_5_days,
    AVG(ls) OVER (PARTITION BY planet ORDER BY terrestrial_date ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) ls_5_days,
    ARRAY_AGG(pressure) OVER (PARTITION BY planet ORDER BY terrestrial_date ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) pressure_last_3_days,
    ARRAY_AGG(ls) OVER (PARTITION BY planet ORDER BY terrestrial_date ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) ls_last_3_days,
    ARRAY_AGG(min_temp) OVER (PARTITION BY planet ORDER BY terrestrial_date ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) min_temp_last_3_days,
    ARRAY_AGG(max_temp) OVER (PARTITION BY planet ORDER BY terrestrial_date ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) max_temp_last_3_days,
FROM
(SELECT 
    id,
    CAST(terrestrial_date AS TIMESTAMP) as terrestrial_date,
    3*sol sol,
    3*ls ls,
    month,
    3*min_temp min_temp,
    3*max_temp max_temp,
    3*pressure pressure,
    atmo_opacity,
    'jupyter' AS planet
 FROM `feature-store-mars21.mars.weather`
 WHERE IS_NAN(min_temp) IS False
 and IS_NAN(max_temp) IS False
 and IS_NAN(ls) IS False
 and IS_NAN(pressure) IS False))
 WHERE terrestrial_date=TIMESTAMP("{}")
""".format(date_1)
query_job = client.query(query)  # Make an API request.

In [None]:
query_job.result()

### Supported injest modes

avro_source (google.cloud.aiplatform_v1beta1.types.AvroSource):

bigquery_source (google.cloud.aiplatform_v1beta1.types.BigQuerySource):

csv_source (google.cloud.aiplatform_v1beta1.types.CsvSource):

### Injest from temp table to feature store

In [None]:
import_request = featurestore_service_pb2.ImportFeatureValuesRequest(
    entity_type=ENTITY_PATH,
    bigquery_source=io_pb2.BigQuerySource(
        input_uri="bq://feature-store-mars21.mars.three_planets_tmp"
    ),
    feature_specs=[
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="ls", source_field="ls"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="month", source_field="month"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="max_temp", source_field="max_temp"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="min_temp", source_field="min_temp"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="pressure", source_field="pressure"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="atmo_opacity", source_field="atmo_opacity"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="max_temp_std", source_field="max_temp_std"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="min_temp_std", source_field="min_temp_std"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="pressure_std", source_field="pressure_std"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="avg_pressure_5d", source_field="pressure_5_days"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="avg_min_temp_5d", source_field="min_temp_5_days"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="avg_max_temp_5d", source_field="max_temp_5_days"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="arr_pressure_3d", source_field="pressure_last_3_days"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="arr_min_temp_3d", source_field="min_temp_last_3_days"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(
            id="arr_max_temp_3d", source_field="max_temp_last_3_days")
    ],
    entity_id_field="planet",
    feature_time_field="terrestrial_date",
    worker_count=5)

In [None]:
%%time
admin_client.import_feature_values(import_request).result()