In [1]:
import pandas as pd
from generated import (
    house_area_pb2,
    house_prices_pb2,
)
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.message import Message
import google.auth
import google.auth.transport.requests
import httpx
import functools
import time
import datetime
from google.cloud import bigquery
from threading import Thread
# from google.cloud import aiplatform_v1 as aiplatform
from google.cloud import aiplatform
from google.api_core.exceptions import ResourceExhausted
from tenacity import retry, wait_fixed


In [2]:
feature_topic = "projects/ml-lab-324709/topics/house_area"
price_topic = "projects/ml-lab-324709/topics/house_prices"

## Start Feature Ingestion Thread

In [3]:
# fs = aiplatform.Featurestore(
#         "projects/ml-lab-324709"
#         "/locations/europe-west1"
#         "/featurestores/realestate"
#     )
# entity = fs.get_entity_type(entity_type_id="house")


In [4]:
# df = pd.read_csv("home-data-for-ml-course/train.csv")
# df["ts"] = datetime.datetime.now()
# # entity id must be a string
# df["house_id"] = df.Id.astype(str)

# df = df[0:100].copy()

# entity.ingest_from_df(
#     feature_ids=["flr_one_sq_feet", "flr_two_sq_feet"],
#     feature_time="ts",
#     feature_source_fields={
#         "flr_one_sq_feet": "1stFlrSF",
#         "flr_two_sq_feet": "2ndFlrSF",
#     },
#     entity_id_field="house_id",
#     df_source=df,
# )

In [3]:
class StreamHouseFeaturesToFS(Thread):
    def __init__(self, subscription_name: str) -> None:
        super().__init__()
        self.__n_messages_processed = 0
        fs = aiplatform.Featurestore(
                "projects/ml-lab-324709"
                "/locations/europe-west1"
                "/featurestores/realestate"
            )
        self.entity = fs.get_entity_type(entity_type_id="house")
        self.subscriber = pubsub_v1.SubscriberClient()
        self.subscription_path = self.subscriber.subscription_path(
            project="ml-lab-324709",
            subscription=subscription_name,
        )
        self.__should_stop = False
        self.timeout = datetime.timedelta(seconds=5).total_seconds()

    @property
    def n_messages_processed(self) -> int:
        return self.__n_messages_processed
    
    def stop(self) -> None:
        self.__should_stop = True

    def process_message(self, message: Message) -> None:
        house = house_area_pb2.HouseArea()
        house.ParseFromString(message.data)
        # payload = WriteFeatureValuesPayload(
        #     entity_id=house.house_id,
        #     feature_values = {
        #         "flr_one_sq_feet": FeatureValue(
        #             int64_value=house.flr_one_sq_feet
        #         ),
        #         "flr_two_sq_feet": FeatureValue(
        #             int64_value=house.flr_two_sq_feet
        #         ),
        #     },
        # )
        payload = {
            house.house_id: {
                "flr_one_sq_feet": house.flr_one_sq_feet,
                "flr_two_sq_feet": house.flr_two_sq_feet,
            }
        }
        self.entity.write_feature_values(instances=payload)
        
        message.ack()
        self.__n_messages_processed += 1

    def run(self) -> None:
        streaming_pull_future = self.subscriber.subscribe(
            subscription=self.subscription_path,
            callback=self.process_message
        )
        with self.subscriber:
            while True:
                try:
                    streaming_pull_future.result(timeout=self.timeout)
                except TimeoutError:
                    if self.__should_stop:
                        streaming_pull_future.cancel()
                        streaming_pull_future.result()
                        break


In [4]:
stream_threads = [
    StreamHouseFeaturesToFS(subscription_name="house_area_to_fs"),
]
for thread in stream_threads:
    thread.start()

# streaming = StreamHouseFeaturesToFS(subscription_name="house_area_to_fs")
# streaming.run()

## Publish data to Pub/Sub

In [None]:
publisher = pubsub_v1.PublisherClient()
df = pd.read_csv("home-data-for-ml-course/train.csv")
df = df[0:20].copy()
for idx, row in df.iterrows():
    utc_ts = int(time.mktime(
        datetime.datetime.utcnow().timetuple()
    ) * 1e6)  # python returns ts in seconds, bigquery expects micro-sec.
    # fill HouseFeatures
    feature_row = house_area_pb2.HouseArea()
    feature_row.house_id = str(row.Id)
    feature_row.house_valuation_timestamp = utc_ts
    feature_row.flr_one_sq_feet = row["1stFlrSF"]
    feature_row.flr_two_sq_feet = row["2ndFlrSF"]
    # fill HousePrice
    sales_row = house_prices_pb2.HousePrice()
    sales_row.house_id = str(row.Id)
    sales_row.sale_timestamp = utc_ts
    sales_row.sale_price = row.SalePrice
    # Publish messages
    _ = publisher.publish(feature_topic, 
                          feature_row.SerializeToString())
    _ = publisher.publish(price_topic, 
                          sales_row.SerializeToString())


Writing EntityType feature values: projects/670967409083/locations/europe-west1/featurestores/realestate/entityTypes/house
Writing EntityType feature values: projects/670967409083/locations/europe-west1/featurestores/realestate/entityTypes/house
Writing EntityType feature values: projects/670967409083/locations/europe-west1/featurestores/realestate/entityTypes/house
Writing EntityType feature values: projects/670967409083/locations/europe-west1/featurestores/realestate/entityTypes/house
Writing EntityType feature values: projects/670967409083/locations/europe-west1/featurestores/realestate/entityTypes/house
Writing EntityType feature values: projects/670967409083/locations/europe-west1/featurestores/realestate/entityTypes/house
Writing EntityType feature values: projects/670967409083/locations/europe-west1/featurestores/realestate/entityTypes/house
Writing EntityType feature values: projects/670967409083/locations/europe-west1/featurestores/realestate/entityTypes/house
Writing EntityTy

In [6]:
(stream_threads[0].n_messages_processed, stream_threads[0].is_alive())

(20, True)

In [7]:
stream_threads[0].n_messages_processed

20

In [8]:
for thread in stream_threads:
    thread.stop()
    
for thread in stream_threads:
    thread.join()

stream_threads[0].is_alive()

False

## Online serving

In [9]:
fs = aiplatform.Featurestore(
        "projects/ml-lab-324709"
        "/locations/europe-west1"
        "/featurestores/realestate"
    )
entity = fs.get_entity_type(entity_type_id="house")
entity.read(entity_ids="1")

AttributeError: _pb

In [20]:
entity.read(entity_ids="1")

AttributeError: _pb

In [1]:
from google.cloud import aiplatform_v1beta1 as aiplatform

In [2]:
client = aiplatform.services.featurestore_service.FeaturestoreServiceClient()
fs_name = (
    "projects/ml-lab-324709"
    "/locations/europe-west1"
    "/featurestores/realestate2" 
) 
fs = client.get_featurestore(name=fs_name)
# request = aiplatform.GetFeaturestoreRequest(
# )
# fs = client.get_featurestore(name=(
#         "projects/ml-lab-324709"
#         "/locations/europe-west1"
#         "/featurestores/realestate2"        
#     ))

E0128 15:55:52.425052000 8029278208 hpack_parser.cc:999]               Error parsing 'content-type' metadata: invalid value


Unknown: None Stream removed