Skip to content

Commit

Permalink
fix: updated location in create methods, changed args to data_source …
Browse files Browse the repository at this point in the history
…and feature_time for oneof, changed method validate_id to _is_resource_id, changed method validate_value_type to return None
  • Loading branch information
morgandu committed Dec 10, 2021
1 parent 720cf0f commit 75ac97e
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 224 deletions.
181 changes: 79 additions & 102 deletions google/cloud/aiplatform/featurestore/entity_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,11 +476,11 @@ def create(
System reserved label keys are prefixed with
"aiplatform.googleapis.com/" and are immutable.
project (str):
Optional. Project to create EntityType in. If not set, project
set in aiplatform.init will be used.
Optional. Project to create EntityType in if `featurestore_name` is passed an featurestore ID.
If not set, project set in aiplatform.init will be used.
location (str):
Optional. Location to create EntityType in. If not set, location
set in aiplatform.init will be used.
Optional. Location to create EntityType in if `featurestore_name` is passed an featurestore ID.
If not set, location set in aiplatform.init will be used.
credentials (auth_credentials.Credentials):
Optional. Custom credentials to use to create EntityTypes. Overrides
credentials set in aiplatform.init.
Expand All @@ -503,15 +503,24 @@ def create(
location=location,
)

gapic_entity_type = gca_entity_type.EntityType()

if labels:
utils.validate_labels(labels)
gapic_entity_type.labels = labels

gapic_entity_type = gca_entity_type.EntityType(
description=description, labels=labels,
if description:
gapic_entity_type.description = description

api_client = cls._instantiate_client(
location=featurestore_utils.CompatFeaturestoreServiceClient.parse_featurestore_path(
path=featurestore_name
)[
"location"
],
credentials=credentials,
)

api_client = cls._instantiate_client(location=location, credentials=credentials)

created_entity_type_lro = api_client.create_entity_type(
parent=featurestore_name,
entity_type=gapic_entity_type,
Expand Down Expand Up @@ -733,12 +742,9 @@ def batch_create_features(
def _validate_and_get_import_feature_values_request(
self,
feature_ids: List[str],
feature_time: Union[str, datetime.datetime],
data_source: Union[gca_io.AvroSource, gca_io.BigQuerySource, gca_io.CsvSource],
feature_source_fields: Optional[Dict[str, str]] = None,
avro_source: Optional[gca_io.AvroSource] = None,
bigquery_source: Optional[gca_io.BigQuerySource] = None,
csv_source: Optional[gca_io.CsvSource] = None,
feature_time_field: Optional[str] = None,
feature_time: Optional[datetime.datetime] = None,
entity_id_field: Optional[str] = None,
disable_online_serving: Optional[bool] = None,
worker_count: Optional[int] = None,
Expand All @@ -749,6 +755,18 @@ def _validate_and_get_import_feature_values_request(
Required. IDs of the Feature to import values
of. The Features must exist in the target
EntityType, or the request will fail.
feature_time (Union[str, datetime.datetime]):
Required. The feature_time can be one of:
- The source column that holds the Feature
timestamp for all Feature values in each entity.
- A single Feature timestamp for all entities
being imported. The timestamp must not have
higher than millisecond precision.
data_source (Union[gca_io.AvroSource, gca_io.BiqQuerySource, gca_io.CsvSource]):
Required. The data_source can be one of:
- AvroSource
- BiqQuerySource
- CsvSource
feature_source_fields (Dict[str, str]):
Optional. User defined dictionary to map ID of the Feature for importing values
of to the source column for getting the Feature values from.
Expand All @@ -774,24 +792,6 @@ def _validate_and_get_import_feature_values_request(
feature_source_fields = {
'my_feature_id_1': 'my_feature_id_1_source_field',
}
avro_source (gca_io.AvroSource):
Optional. This field is a member of `oneof`_ ``source``.
bigquery_source (gca_io.BigQuerySource):
Optional. This field is a member of `oneof`_ ``source``.
csv_source (gca_io.CsvSource):
Optional. This field is a member of `oneof`_ ``source``.
feature_time_field (str):
Optional. Source column that holds the Feature
timestamp for all Feature values in each entity.
This field is a member of `oneof`_ ``feature_time_source``.
feature_time (datetime.datetime):
Optional. Single Feature timestamp for all entities
being imported. The timestamp must not have
higher than millisecond precision.
This field is a member of `oneof`_ ``feature_time_source``.
entity_id_field (str):
Optional. Source column that holds entity IDs. If not provided, entity
IDs are extracted from the column named ``entity_id``.
Expand All @@ -813,8 +813,8 @@ def _validate_and_get_import_feature_values_request(
Returns:
gca_featurestore_service.ImportFeatureValuesRequest - request message for importing feature values
Raises:
ValueError if no source or more than one source is provided
ValueError if no feature_time_source or more than one feature_time_source is provided
ValueError if data_source type is not supported
ValueError if feature_time type is not supported
"""
feature_specs = []
for feature_id in set(feature_ids):
Expand All @@ -837,28 +837,29 @@ def _validate_and_get_import_feature_values_request(
entity_type=self.resource_name, feature_specs=feature_specs,
)

# oneof source
if avro_source and not bigquery_source and not csv_source:
import_feature_values_request.avro_source = avro_source
elif not avro_source and bigquery_source and not csv_source:
import_feature_values_request.bigquery_source = bigquery_source
elif not avro_source and not bigquery_source and csv_source:
import_feature_values_request.csv_source = csv_source
if isinstance(data_source, gca_io.AvroSource):
import_feature_values_request.avro_source = data_source
elif isinstance(data_source, gca_io.BigQuerySource):
import_feature_values_request.bigquery_source = data_source
elif isinstance(data_source, gca_io.CsvSource):
import_feature_values_request.csv_source = data_source
else:
raise ValueError(
"One and only one of `avro_source`, `bigquery_source`, and `csv_source` need to be passed. "
f"The type of `data_source` field should be: "
f"`gca_io.AvroSource`, `gca_io.BigQuerySource`, or `gca_io.CsvSource`, "
f"get {type(data_source)} instead. "
)

# oneof feature_time_source
if feature_time_field and not feature_time:
import_feature_values_request.feature_time_field = feature_time_field
elif not feature_time_field and feature_time:
if isinstance(feature_time, str):
import_feature_values_request.feature_time_field = feature_time
elif isinstance(feature_time, datetime.datetime):
import_feature_values_request.feature_time = utils.get_timestamp_proto(
time=feature_time
)
else:
raise ValueError(
"One and only one of `feature_time_field` and `feature_time` need to be passed. "
f"The type of `feature_time` field should be: `str` or `datetime.datetime`, "
f"get {type(feature_time)} instead. "
)

if entity_id_field is not None:
Expand Down Expand Up @@ -918,14 +919,13 @@ def _import_feature_values(

def ingest_from_bq(
self,
bq_source_uri: str,
feature_ids: List[str],
feature_time: Union[str, datetime.datetime],
bq_source_uri: str,
batch_create_feature_configs: Optional[
Dict[str, Dict[str, Union[bool, int, Dict[str, str], str]]]
] = None,
feature_source_fields: Optional[Dict[str, str]] = None,
feature_time_field: Optional[str] = None,
feature_time: Optional[datetime.datetime] = None,
entity_id_field: Optional[str] = None,
disable_online_serving: Optional[bool] = None,
worker_count: Optional[int] = None,
Expand All @@ -935,14 +935,21 @@ def ingest_from_bq(
"""Ingest feature values from BigQuery.
Args:
bq_source_uri (str):
Required. BigQuery URI to the input table.
Example:
'bq://project.dataset.table_name'
feature_ids (List[str]):
Required. IDs of the Feature to import values
of. The Features must exist in the target
EntityType, or the request will fail.
feature_time (Union[str, datetime.datetime]):
Required. The feature_time can be one of:
- The source column that holds the Feature
timestamp for all Feature values in each entity.
- A single Feature timestamp for all entities
being imported. The timestamp must not have
higher than millisecond precision.
bq_source_uri (str):
Required. BigQuery URI to the input table.
Example:
'bq://project.dataset.table_name'
batch_create_feature_configs (Dict[str, Dict[str, Union[bool, int, Dict[str, str], str]]]):
Optional. A user defined Dict containing configurations for feature creation if to create features before ingest.
Default to None when features exist in the Featurestore.
Expand Down Expand Up @@ -995,18 +1002,6 @@ def ingest_from_bq(
feature_source_fields = {
'my_feature_id_1': 'my_feature_id_1_source_field',
}
feature_time_field (str):
Optional. Source column that holds the Feature
timestamp for all Feature values in each entity.
This field is a member of `oneof`_ ``feature_time_source``.
feature_time (datetime.datetime):
Optional. Single Feature timestamp for all entities
being imported. The timestamp must not have
higher than millisecond precision.
This field is a member of `oneof`_ ``feature_time_source``.
entity_id_field (str):
Optional. Source column that holds entity IDs. If not provided, entity
IDs are extracted from the column named ``entity_id``.
Expand Down Expand Up @@ -1040,20 +1035,15 @@ def ingest_from_bq(
self.batch_create_features(
feature_configs=batch_create_feature_configs,
request_metadata=request_metadata,
sync=sync,
)

if not sync:
self.wait()

bigquery_source = gca_io.BigQuerySource(input_uri=bq_source_uri)

import_feature_values_request = self._validate_and_get_import_feature_values_request(
feature_ids=feature_ids,
feature_source_fields=feature_source_fields,
bigquery_source=bigquery_source,
feature_time_field=feature_time_field,
feature_time=feature_time,
data_source=bigquery_source,
feature_source_fields=feature_source_fields,
entity_id_field=entity_id_field,
disable_online_serving=disable_online_serving,
worker_count=worker_count,
Expand All @@ -1067,15 +1057,14 @@ def ingest_from_bq(

def ingest_from_gcs(
self,
feature_ids: List[str],
feature_time: Union[str, datetime.datetime],
gcs_source_uris: Union[str, List[str]],
gcs_source_type: str,
feature_ids: List[str],
batch_create_feature_configs: Optional[
Dict[str, Dict[str, Union[bool, int, Dict[str, str], str]]]
] = None,
feature_source_fields: Optional[Dict[str, str]] = None,
feature_time_field: Optional[str] = None,
feature_time: Optional[datetime.datetime] = None,
entity_id_field: Optional[str] = None,
disable_online_serving: Optional[bool] = None,
worker_count: Optional[int] = None,
Expand All @@ -1085,6 +1074,17 @@ def ingest_from_gcs(
"""Ingest feature values from GCS.
Args:
feature_ids (List[str]):
Required. IDs of the Feature to import values
of. The Features must exist in the target
EntityType, or the request will fail.
feature_time (Union[str, datetime.datetime]):
Required. The feature_time can be one of:
- The source column that holds the Feature
timestamp for all Feature values in each entity.
- A single Feature timestamp for all entities
being imported. The timestamp must not have
higher than millisecond precision.
gcs_source_uris (Union[str, List[str]]):
Required. Google Cloud Storage URI(-s) to the
input file(s). May contain wildcards. For more
Expand All @@ -1097,10 +1097,6 @@ def ingest_from_gcs(
gcs_source_type (str):
Required. The type of the input file(s) provided by `gcs_source_uris`,
the value of gcs_source_type can only be either `csv`, or `avro`.
feature_ids (List[str]):
Required. IDs of the Feature to import values
of. The Features must exist in the target
EntityType, or the request will fail.
batch_create_feature_configs (Dict[str, Dict[str, Union[bool, int, Dict[str, str], str]]]):
Optional. A user defined Dict containing configurations for feature creation if to create features before ingest.
Default to None when features exist in the Featurestore.
Expand Down Expand Up @@ -1153,18 +1149,6 @@ def ingest_from_gcs(
feature_source_fields = {
'my_feature_id_1': 'my_feature_id_1_source_field',
}
feature_time_field (str):
Optional. Source column that holds the Feature
timestamp for all Feature values in each entity.
This field is a member of `oneof`_ ``feature_time_source``.
feature_time (datetime.datetime):
Optional. Single Feature timestamp for all entities
being imported. The timestamp must not have
higher than millisecond precision.
This field is a member of `oneof`_ ``feature_time_source``.
entity_id_field (str):
Optional. Source column that holds entity IDs. If not provided, entity
IDs are extracted from the column named ``entity_id``.
Expand Down Expand Up @@ -1200,12 +1184,8 @@ def ingest_from_gcs(
self.batch_create_features(
feature_configs=batch_create_feature_configs,
request_metadata=request_metadata,
sync=sync,
)

if not sync:
self.wait()

if gcs_source_type not in featurestore_utils.GCS_SOURCE_TYPE:
raise ValueError(
"Only %s are supported gcs_source_type, not `%s`. "
Expand All @@ -1219,19 +1199,16 @@ def ingest_from_gcs(
gcs_source_uris = [gcs_source_uris]
gcs_source = gca_io.GcsSource(uris=gcs_source_uris)

csv_source, avro_source = None, None
if gcs_source_type == "csv":
csv_source = gca_io.CsvSource(gcs_source=gcs_source)
data_source = gca_io.CsvSource(gcs_source=gcs_source)
if gcs_source_type == "avro":
avro_source = gca_io.AvroSource(gcs_source=gcs_source)
data_source = gca_io.AvroSource(gcs_source=gcs_source)

import_feature_values_request = self._validate_and_get_import_feature_values_request(
feature_ids=feature_ids,
feature_source_fields=feature_source_fields,
avro_source=avro_source,
csv_source=csv_source,
feature_time_field=feature_time_field,
feature_time=feature_time,
data_source=data_source,
feature_source_fields=feature_source_fields,
entity_id_field=entity_id_field,
disable_online_serving=disable_online_serving,
worker_count=worker_count,
Expand Down
16 changes: 11 additions & 5 deletions google/cloud/aiplatform/featurestore/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ def create(
Example: "projects/123/locations/us-central1/featurestores/my_featurestore_id/entityTypes/my_entity_type_id"
or "my_entity_type_id" when project and location are initialized or passed, with featurestore_id passed.
featurestore_id (str):
Optional. Featurestore to create Feature in.
Optional. Featurestore to create Feature in if `entity_type_name` is passed an entity_type ID.
description (str):
Optional. Description of the Feature.
labels (Dict[str, str]):
Expand All @@ -531,11 +531,11 @@ def create(
System reserved label keys are prefixed with
"aiplatform.googleapis.com/" and are immutable.
project (str):
Optional. Project to create Feature in. If not set, project
set in aiplatform.init will be used.
Optional. Project to create Feature in if `entity_type_name` is passed an entity_type ID.
If not set, project set in aiplatform.init will be used.
location (str):
Optional. Location to create Feature in. If not set, location
set in aiplatform.init will be used.
Optional. Location to create Feature in if `entity_type_name` is passed an entity_type ID.
If not set, location set in aiplatform.init will be used.
credentials (auth_credentials.Credentials):
Optional. Custom credentials to use to create Features. Overrides
credentials set in aiplatform.init.
Expand Down Expand Up @@ -563,13 +563,19 @@ def create(
project=project,
location=location,
)
location = featurestore_utils.CompatFeaturestoreServiceClient.parse_entity_type_path(
path=entity_type_name
)[
"location"
]

feature_config = featurestore_utils._FeatureConfig(
feature_id=feature_id,
value_type=value_type,
description=description,
labels=labels,
)

create_feature_request = feature_config.get_create_feature_request()
create_feature_request.parent = entity_type_name

Expand Down

0 comments on commit 75ac97e

Please sign in to comment.