Skip to content

Commit

Permalink
fix: Support timestamp in Vertex SDK write_feature_values()
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 525289489
  • Loading branch information
vertex-sdk-bot authored and Copybara-Service committed Apr 18, 2023
1 parent 9a5c4be commit 4b0722c
Show file tree
Hide file tree
Showing 2 changed files with 372 additions and 15 deletions.
148 changes: 138 additions & 10 deletions google/cloud/aiplatform/featurestore/_entity_type.py
Expand Up @@ -18,6 +18,7 @@
import datetime
from typing import Dict, List, Optional, Sequence, Tuple, Union
import uuid
from google.protobuf import timestamp_pb2

from google.auth import credentials as auth_credentials
from google.protobuf import field_mask_pb2
Expand Down Expand Up @@ -1575,6 +1576,7 @@ def write_feature_values(
],
"pd.DataFrame", # type: ignore # noqa: F821 - skip check for undefined name 'pd'
],
feature_time: Union[str, datetime.datetime] = None,
) -> "EntityType": # noqa: F821
"""Streaming ingestion. Write feature values directly to Feature Store.
Expand All @@ -1584,7 +1586,8 @@ def write_feature_values(
featurestore_id="my_featurestore_id",
)
# writing feature values from a pandas DataFrame
# writing feature values from a pandas DataFrame without feature timestamp column.
# In this case, current timestamp will be applied to all data.
my_dataframe = pd.DataFrame(
data = [
{"entity_id": "movie_01", "average_rating": 4.9}
Expand All @@ -1597,7 +1600,40 @@ def write_feature_values(
instances=my_df
)
# writing feature values from a Python dict
# writing feature values from a pandas DataFrame with feature timestamp column
# Example of datetime creation.
feature_time = datetime.datetime(year=2022, month=1, day=1, hour=11, minute=59, second=59)
or
feature_time_str = datetime.datetime.now().isoformat(sep=" ", timespec="milliseconds")
feature_time = datetime.datetime.strptime(feature_time_str, "%Y-%m-%d %H:%M:%S.%f")
my_dataframe = pd.DataFrame(
data = [
{"entity_id": "movie_01", "average_rating": 4.9,
"feature_timestamp": feature_time}
],
columns=["entity_id", "average_rating", "feature_timestamp"],
)
my_dataframe = my_df.set_index("entity_id")
my_entity_type.write_feature_values(
instances=my_df, feature_time="feature_timestamp"
)
# writing feature values with a timestamp. The timestamp will be applied to the entire Dataframe.
my_dataframe = pd.DataFrame(
data = [
{"entity_id": "movie_01", "average_rating": 4.9}
],
columns=["entity_id", "average_rating"],
)
my_dataframe = my_df.set_index("entity_id")
my_entity_type.write_feature_values(
instances=my_df, feature_time=feature_time
)
# writing feature values from a Python dict without timestamp column.
# In this case, current timestamp will be applied to all data.
my_data_dict = {
"movie_02" : {"average_rating": 3.7}
}
Expand All @@ -1606,16 +1642,40 @@ def write_feature_values(
instances=my_data_dict
)
# writing feature values from a Python dict with timestamp column
my_data_dict = {
"movie_02" : {"average_rating": 3.7, "feature_timestamp": timestmap}}
}
my_entity_type.write_feature_values(
instances=my_data_dict, feature_time="feature_timestamp"
)
# writing feature values from a Python dict and apply the same Feature_Timestamp
my_data_dict = {
"movie_02" : {"average_rating": 3.7}
}
my_entity_type.write_feature_values(
instances=my_data_dict, feature_time=feature_time
)
# writing feature values from a list of WriteFeatureValuesPayload objects
payloads = [
gca_featurestore_online_service.WriteFeatureValuesPayload(
entity_id="movie_03",
feature_values=gca_featurestore_online_service.FeatureValue(
double_value=4.9
)
feature_values={
"average_rating": featurestore_online_service.FeatureValue(
string_value="test",
metadata=featurestore_online_service.FeatureValue.Metadata(
generate_time=timestmap
)
}
}
)
]
# when instance is WriteFeatureValuesPayload,
# feature_time param of write_feature_values() is ignored.
my_entity_type.write_feature_values(
instances=payloads
)
Expand All @@ -1641,18 +1701,27 @@ def write_feature_values(
in the pandas Dataframe represents an entity, which has an entity ID
and its associated feature values. Currently, a single payload can be
written in a single request.
feature_time Union[str, datetime.datetime]:
Optional. Either column name in DataFrame or Dict which contains timestamp value,
or datetime to apply to the entire DataFrame or Dict.
Timestamp will be applied to generate_timestmap in all FeatureValue.
If not provided, curreent timestamp is used. This param is not used
when instances is List[WriteFeatureValuesPayload].
Returns:
EntityType - The updated EntityType object.
"""

if isinstance(instances, Dict):
payloads = self._generate_payloads(instances=instances)
payloads = self._generate_payloads(
instances=instances, feature_time=feature_time
)
elif isinstance(instances, List):
payloads = instances
else:
instances_dict = instances.to_dict(orient="index")
payloads = self._generate_payloads(instances=instances_dict)
payloads = self._generate_payloads(
instances=instances_dict, feature_time=feature_time
)

_LOGGER.log_action_start_against_resource(
"Writing",
Expand Down Expand Up @@ -1688,6 +1757,7 @@ def _generate_payloads(
],
],
],
feature_time: Union[str, datetime.datetime] = None,
) -> List[gca_featurestore_online_service.WriteFeatureValuesPayload]:
"""Helper method used to generate GAPIC WriteFeatureValuesPayloads from
a Python dict.
Expand All @@ -1696,18 +1766,39 @@ def _generate_payloads(
instances (Dict[str, Dict[str, Union[int, str, float, bool, bytes,
List[int], List[str], List[float], List[bool]]]]):
Required. Dict mapping entity IDs to their corresponding features.
feature_time Union[str, datetime.datetime]:
Optional. Either string representing column name which stores
feature timestamp, or timestamp to apply to entire DataFrame or
Dict.
Returns:
List[gca_featurestore_online_service.WriteFeatureValuesPayload] -
A list of WriteFeatureValuesPayload objects ready to be written to the Feature Store.
"""
payloads = []
timestamp_to_all_field = None
if feature_time and cls._is_timestamp(feature_time):
# timestamp_to_all_field will be applied to all FeatureValues.
timestamp_to_all_field = feature_time

for entity_id, features in instances.items():
feature_values = {}
for feature_id, value in features.items():
if feature_id == feature_time:
continue
feature_value = cls._convert_value_to_gapic_feature_value(
feature_id=feature_id, value=value
)
# Create a FeatureValue Metadata with generate_time if
# valid feature_time param is provided.
timestamp = cls._apply_feature_timestamp(
cls, features, timestamp_to_all_field, feature_time
)
if timestamp:
feature_value.metadata = (
gca_featurestore_online_service.FeatureValue.Metadata(
generate_time=timestamp
)
)
feature_values[feature_id] = feature_value
payload = gca_featurestore_online_service.WriteFeatureValuesPayload(
entity_id=entity_id, feature_values=feature_values
Expand All @@ -1716,6 +1807,43 @@ def _generate_payloads(

return payloads

@staticmethod
def _apply_feature_timestamp(
cls,
features: Union[
int,
str,
float,
bool,
bytes,
List[int],
List[str],
List[float],
List[bool],
],
timestamp_to_all_field: datetime.datetime = None,
feature_time: str = None,
) -> Union[datetime.datetime, timestamp_pb2.Timestamp]:
if feature_time is None:
return None
if timestamp_to_all_field:
return timestamp_to_all_field

# Return a timestamp in Dict or Dataframe if it is valid.
if feature_time in features.keys() and cls._is_timestamp(
features[feature_time]
):
return features[feature_time]
return None

@staticmethod
def _is_timestamp(
timestamp: Union[datetime.datetime, timestamp_pb2.Timestamp]
) -> bool:
return isinstance(timestamp, datetime.datetime) or isinstance(
timestamp, timestamp_pb2.Timestamp
)

@classmethod
def _convert_value_to_gapic_feature_value(
cls,
Expand Down

0 comments on commit 4b0722c

Please sign in to comment.