From 08415bf8fa6b7d26ce0aab2a2cbc38e67fed3bf0 Mon Sep 17 00:00:00 2001 From: Mizanur Rahman Date: Tue, 17 May 2022 04:45:22 -0700 Subject: [PATCH 1/6] feature: support update feature group and update feature metadata --- src/sagemaker/feature_store/feature_group.py | 59 +++++++++++ src/sagemaker/feature_store/inputs.py | 23 +++++ src/sagemaker/session.py | 64 +++++++++++- tests/integ/test_feature_store.py | 99 ++++++++++++++++++- .../feature_store/test_feature_store.py | 48 +++++++++ .../sagemaker/feature_store/test_inputs.py | 5 + 6 files changed, 296 insertions(+), 2 deletions(-) diff --git a/src/sagemaker/feature_store/feature_group.py b/src/sagemaker/feature_store/feature_group.py index 41bdcd764c..ffd3598a6d 100644 --- a/src/sagemaker/feature_store/feature_group.py +++ b/src/sagemaker/feature_store/feature_group.py @@ -53,6 +53,7 @@ OfflineStoreConfig, DataCatalogConfig, FeatureValue, + FeatureParameter ) logger = logging.getLogger(__name__) @@ -537,6 +538,64 @@ def describe(self, next_token: str = None) -> Dict[str, Any]: feature_group_name=self.name, next_token=next_token ) + def update(self, feature_additions: Sequence[FeatureDefinition]) -> Dict[str, Any]: + """Update a FeatureGroup and add new features from the given feature definitions. + + Args: + feature_additions (Sequence[Dict[str, str]): list of feature definitions to be updated. + Returns: + Response dict from service. + """ + + return self.sagemaker_session.update_feature_group( + feature_group_name=self.name, + feature_additions=[ + feature_addition.to_dict() for feature_addition in feature_additions + ], + ) + + def update_feature_metadata( + self, + feature_name: str, + description: str = None, + parameter_additions: Sequence[FeatureParameter] = [], + parameter_removals: Sequence[str] = [], + ) -> Dict[str, Any]: + """Update a feature metadata and add/remove metadata. + + Args: + feature_name (str): name of the feature to update. + description (str): description of the feature to update. + parameter_additions (Sequence[Dict[str, str]): list of feature parameter to be added. + parameter_removals (Sequence[str]): list of feature parameter key to be removed. + Returns: + Response dict from service. + """ + + return self.sagemaker_session.update_feature_metadata( + feature_group_name=self.name, + feature_name=feature_name, + description=description, + parameter_additions=[ + parameter_addition.to_dict() + for parameter_addition in parameter_additions + ], + parameter_removals=parameter_removals, + ) + + def describe_feature_metadata(self, feature_name: str) -> Dict[str, Any]: + """Describe feature metadata by feature name. + + Args: + feature_name (str): name of the feature. + Returns: + Response dict from service. + """ + + return self.sagemaker_session.describe_feature_metadata( + feature_group_name=self.name, feature_name=feature_name + ) + def load_feature_definitions( self, data_frame: DataFrame, diff --git a/src/sagemaker/feature_store/inputs.py b/src/sagemaker/feature_store/inputs.py index 1f31caa4ae..405d189c7a 100644 --- a/src/sagemaker/feature_store/inputs.py +++ b/src/sagemaker/feature_store/inputs.py @@ -207,3 +207,26 @@ def to_dict(self) -> Dict[str, Any]: FeatureName=self.feature_name, ValueAsString=self.value_as_string, ) + +@attr.s +class FeatureParameter(Config): + """FeatureParameter for FeatureStore. + + Attributes: + key (str): key of the parameter. + value (str): value of the parameter. + """ + + key: str = attr.ib(default=None) + value: str = attr.ib(default=None) + + def to_dict(self) -> Dict[str, Any]: + """Construct a dictionary based on the attributes provided. + + Returns: + dict represents the attributes. + """ + return Config.construct_dict( + Key=self.key, + Value=self.value, + ) diff --git a/src/sagemaker/session.py b/src/sagemaker/session.py index 263e3597f2..b93bb9afe9 100644 --- a/src/sagemaker/session.py +++ b/src/sagemaker/session.py @@ -4048,7 +4048,7 @@ def describe_feature_group( """Describe a FeatureGroup by name in FeatureStore service. Args: - feature_group_name (str): name of the FeatureGroup to descibe. + feature_group_name (str): name of the FeatureGroup to describe. next_token (str): next_token to get next page of features. Returns: Response dict from service. @@ -4057,6 +4057,68 @@ def describe_feature_group( kwargs = dict(FeatureGroupName=feature_group_name) update_args(kwargs, NextToken=next_token) return self.sagemaker_client.describe_feature_group(**kwargs) + + def update_feature_group( + self, feature_group_name: str, feature_additions: Sequence[Dict[str, str]] + ) -> Dict[str, Any]: + """Update a FeatureGroup and add new features from the given feature definitions. + + Args: + feature_group_name (str): name of the FeatureGroup to update. + feature_additions (Sequence[Dict[str, str]): list of feature definitions to be updated. + Returns: + Response dict from service. + """ + + return self.sagemaker_client.update_feature_group( + FeatureGroupName=feature_group_name, FeatureAdditions=feature_additions + ) + + def update_feature_metadata( + self, + feature_group_name: str, + feature_name: str, + description: str = None, + parameter_additions: Sequence[Dict[str, str]] = None, + parameter_removals: Sequence[str] = None, + ) -> Dict[str, Any]: + """Update a feature metadata and add/remove metadata. + + Args: + feature_group_name (str): name of the FeatureGroup to update. + feature_name (str): name of the feature to update. + description (str): description of the feature to update. + parameter_additions (Sequence[Dict[str, str]): list of feature parameter to be added. + parameter_removals (Sequence[Dict[str, str]): list of feature parameter to be removed. + Returns: + Response dict from service. + """ + + request = { + "FeatureGroupName": feature_group_name, + "FeatureName": feature_name, + "ParameterAdditions": parameter_additions, + "ParameterRemovals": parameter_removals, + } + if description is not None: + request["Description"] = description + return self.sagemaker_client.update_feature_metadata(**request) + + def describe_feature_metadata( + self, feature_group_name: str, feature_name: str + ) -> Dict[str, Any]: + """Describe feature metadata by feature name in FeatureStore service. + + Args: + feature_group_name (str): name of the FeatureGroup. + feature_name (str): name of the feature. + Returns: + Response dict from service. + """ + + return self.sagemaker_client.describe_feature_metadata( + FeatureGroupName=feature_group_name, FeatureName=feature_name + ) def put_record( self, diff --git a/tests/integ/test_feature_store.py b/tests/integ/test_feature_store.py index 15c1db41ab..59cb4ca806 100644 --- a/tests/integ/test_feature_store.py +++ b/tests/integ/test_feature_store.py @@ -22,9 +22,17 @@ import pytest from pandas import DataFrame -from sagemaker.feature_store.feature_group import FeatureGroup +from sagemaker.feature_store.feature_group import ( + FeatureGroup, + FeatureParameter +) from sagemaker.feature_store.inputs import FeatureValue from sagemaker.session import get_execution_role, Session +from sagemaker.feature_store.feature_definition import ( + FractionalFeatureDefinition, + IntegralFeatureDefinition, + StringFeatureDefinition, +) from tests.integ.timeout import timeout BUCKET_POLICY = { @@ -237,6 +245,95 @@ def test_create_feature_store( assert output["FeatureGroupArn"].endswith(f"feature-group/{feature_group_name}") +def test_update_feature_group( + feature_store_session, + role, + feature_group_name, + offline_store_s3_uri, + pandas_data_frame, +): + feature_group = FeatureGroup( + name=feature_group_name, sagemaker_session=feature_store_session + ) + feature_group.load_feature_definitions(data_frame=pandas_data_frame) + + with cleanup_feature_group(feature_group): + output = feature_group.create( + s3_uri=offline_store_s3_uri, + record_identifier_name="feature1", + event_time_feature_name="feature3", + role_arn=role, + enable_online_store=True, + ) + _wait_for_feature_group_create(feature_group) + + new_feature_name = "new_feature" + new_features = [FractionalFeatureDefinition(feature_name=new_feature_name)] + feature_group.update(new_features) + time.sleep(10) + feature_definitions = feature_group.describe().get("FeatureDefinitions") + assert ( + any( + [ + True + for elem in feature_definitions + if new_feature_name in elem.values() + ] + ) + == True + ) + + +def test_feature_metadata( + feature_store_session, + role, + feature_group_name, + offline_store_s3_uri, + pandas_data_frame, +): + feature_group = FeatureGroup( + name=feature_group_name, sagemaker_session=feature_store_session + ) + feature_group.load_feature_definitions(data_frame=pandas_data_frame) + + with cleanup_feature_group(feature_group): + output = feature_group.create( + s3_uri=offline_store_s3_uri, + record_identifier_name="feature1", + event_time_feature_name="feature3", + role_arn=role, + enable_online_store=True, + ) + _wait_for_feature_group_create(feature_group) + + parameter_additions = [ + FeatureParameter(key="key1", value="value1"), + FeatureParameter(key="key2", value="value2"), + ] + description = "test description" + feature_name = "feature1" + feature_group.update_feature_metadata( + feature_name=feature_name, + description=description, + parameter_additions=parameter_additions, + ) + describe_feature_metadata = feature_group.describe_feature_metadata( + feature_name=feature_name + ) + assert description == describe_feature_metadata.get("Description") + assert 2 == len(describe_feature_metadata.get("Parameters")) + + parameter_removals = ["key1"] + feature_group.update_feature_metadata( + feature_name=feature_name, parameter_removals=parameter_removals + ) + describe_feature_metadata = feature_group.describe_feature_metadata( + feature_name=feature_name + ) + assert description == describe_feature_metadata.get("Description") + assert 1 == len(describe_feature_metadata.get("Parameters")) + + def test_ingest_without_string_feature( feature_store_session, role, diff --git a/tests/unit/sagemaker/feature_store/test_feature_store.py b/tests/unit/sagemaker/feature_store/test_feature_store.py index ef6a36980b..01e2de0974 100644 --- a/tests/unit/sagemaker/feature_store/test_feature_store.py +++ b/tests/unit/sagemaker/feature_store/test_feature_store.py @@ -31,6 +31,9 @@ AthenaQuery, IngestionError, ) +from sagemaker.feature_store.inputs import ( + FeatureParameter +) class PicklableMock(Mock): @@ -154,6 +157,51 @@ def test_feature_store_describe(sagemaker_session_mock): ) +def test_feature_store_update(sagemaker_session_mock, feature_group_dummy_definitions): + feature_group = FeatureGroup( + name="MyFeatureGroup", sagemaker_session=sagemaker_session_mock + ) + feature_group.update(feature_group_dummy_definitions) + sagemaker_session_mock.update_feature_group.assert_called_with( + feature_group_name="MyFeatureGroup", + feature_additions=[fd.to_dict() for fd in feature_group_dummy_definitions], + ) + + +def test_feature_metadata_update(sagemaker_session_mock): + feature_group = FeatureGroup( + name="MyFeatureGroup", sagemaker_session=sagemaker_session_mock + ) + + parameter_additions = [FeatureParameter(key="key1", value="value1")] + parameter_removals = ["key2"] + + feature_group.update_feature_metadata( + feature_name="Feature1", + description="DummyDescription", + parameter_additions=parameter_additions, + parameter_removals=parameter_removals, + ) + sagemaker_session_mock.update_feature_metadata.assert_called_with( + feature_group_name="MyFeatureGroup", + feature_name="Feature1", + description="DummyDescription", + parameter_additions=[pa.to_dict() for pa in parameter_additions], + parameter_removals=parameter_removals + ) + + +def test_feature_metadata_describe(sagemaker_session_mock): + feature_group = FeatureGroup( + name="MyFeatureGroup", sagemaker_session=sagemaker_session_mock + ) + + feature_group.describe_feature_metadata(feature_name="Feature1") + sagemaker_session_mock.describe_feature_metadata.assert_called_with( + feature_group_name="MyFeatureGroup", feature_name="Feature1" + ) + + def test_put_record(sagemaker_session_mock): feature_group = FeatureGroup(name="MyFeatureGroup", sagemaker_session=sagemaker_session_mock) feature_group.put_record(record=[]) diff --git a/tests/unit/sagemaker/feature_store/test_inputs.py b/tests/unit/sagemaker/feature_store/test_inputs.py index d111cc0c00..cc3cef1631 100644 --- a/tests/unit/sagemaker/feature_store/test_inputs.py +++ b/tests/unit/sagemaker/feature_store/test_inputs.py @@ -19,6 +19,7 @@ S3StorageConfig, DataCatalogConfig, OfflineStoreConfig, + FeatureParameter ) @@ -83,3 +84,7 @@ def test_offline_data_store_config(): "DisableGlueTableCreation": False, } ) + +def test_feature_metadata(): + config = FeatureParameter(key="key", value="value") + assert ordered(config.to_dict()) == ordered({"Key": "key", "Value": "value"}) From 9d7ae9ed5a60aa458f0503d4ca3be3bc146adedc Mon Sep 17 00:00:00 2001 From: Mizanur Rahman Date: Tue, 17 May 2022 14:20:31 -0700 Subject: [PATCH 2/6] code reformatted --- src/sagemaker/feature_store/feature_group.py | 14 +++++++++----- src/sagemaker/feature_store/inputs.py | 1 + src/sagemaker/session.py | 2 +- tests/integ/test_feature_store.py | 5 +---- .../sagemaker/feature_store/test_feature_store.py | 6 ++---- tests/unit/sagemaker/feature_store/test_inputs.py | 3 ++- 6 files changed, 16 insertions(+), 15 deletions(-) diff --git a/src/sagemaker/feature_store/feature_group.py b/src/sagemaker/feature_store/feature_group.py index ffd3598a6d..dcd8171169 100644 --- a/src/sagemaker/feature_store/feature_group.py +++ b/src/sagemaker/feature_store/feature_group.py @@ -53,7 +53,7 @@ OfflineStoreConfig, DataCatalogConfig, FeatureValue, - FeatureParameter + FeatureParameter, ) logger = logging.getLogger(__name__) @@ -558,8 +558,8 @@ def update_feature_metadata( self, feature_name: str, description: str = None, - parameter_additions: Sequence[FeatureParameter] = [], - parameter_removals: Sequence[str] = [], + parameter_additions: Sequence[FeatureParameter] = None, + parameter_removals: Sequence[str] = None, ) -> Dict[str, Any]: """Update a feature metadata and add/remove metadata. @@ -579,8 +579,12 @@ def update_feature_metadata( parameter_additions=[ parameter_addition.to_dict() for parameter_addition in parameter_additions - ], - parameter_removals=parameter_removals, + ] + if parameter_additions is not None + else [], + parameter_removals=parameter_removals + if parameter_removals is not None + else [], ) def describe_feature_metadata(self, feature_name: str) -> Dict[str, Any]: diff --git a/src/sagemaker/feature_store/inputs.py b/src/sagemaker/feature_store/inputs.py index 405d189c7a..75cb99b5f6 100644 --- a/src/sagemaker/feature_store/inputs.py +++ b/src/sagemaker/feature_store/inputs.py @@ -208,6 +208,7 @@ def to_dict(self) -> Dict[str, Any]: ValueAsString=self.value_as_string, ) + @attr.s class FeatureParameter(Config): """FeatureParameter for FeatureStore. diff --git a/src/sagemaker/session.py b/src/sagemaker/session.py index b93bb9afe9..053fce05dc 100644 --- a/src/sagemaker/session.py +++ b/src/sagemaker/session.py @@ -4057,7 +4057,7 @@ def describe_feature_group( kwargs = dict(FeatureGroupName=feature_group_name) update_args(kwargs, NextToken=next_token) return self.sagemaker_client.describe_feature_group(**kwargs) - + def update_feature_group( self, feature_group_name: str, feature_additions: Sequence[Dict[str, str]] ) -> Dict[str, Any]: diff --git a/tests/integ/test_feature_store.py b/tests/integ/test_feature_store.py index 59cb4ca806..601ca942ba 100644 --- a/tests/integ/test_feature_store.py +++ b/tests/integ/test_feature_store.py @@ -22,10 +22,7 @@ import pytest from pandas import DataFrame -from sagemaker.feature_store.feature_group import ( - FeatureGroup, - FeatureParameter -) +from sagemaker.feature_store.feature_group import FeatureGroup, FeatureParameter from sagemaker.feature_store.inputs import FeatureValue from sagemaker.session import get_execution_role, Session from sagemaker.feature_store.feature_definition import ( diff --git a/tests/unit/sagemaker/feature_store/test_feature_store.py b/tests/unit/sagemaker/feature_store/test_feature_store.py index 01e2de0974..31e3ab5085 100644 --- a/tests/unit/sagemaker/feature_store/test_feature_store.py +++ b/tests/unit/sagemaker/feature_store/test_feature_store.py @@ -31,9 +31,7 @@ AthenaQuery, IngestionError, ) -from sagemaker.feature_store.inputs import ( - FeatureParameter -) +from sagemaker.feature_store.inputs import FeatureParameter class PicklableMock(Mock): @@ -187,7 +185,7 @@ def test_feature_metadata_update(sagemaker_session_mock): feature_name="Feature1", description="DummyDescription", parameter_additions=[pa.to_dict() for pa in parameter_additions], - parameter_removals=parameter_removals + parameter_removals=parameter_removals, ) diff --git a/tests/unit/sagemaker/feature_store/test_inputs.py b/tests/unit/sagemaker/feature_store/test_inputs.py index cc3cef1631..322a049309 100644 --- a/tests/unit/sagemaker/feature_store/test_inputs.py +++ b/tests/unit/sagemaker/feature_store/test_inputs.py @@ -19,7 +19,7 @@ S3StorageConfig, DataCatalogConfig, OfflineStoreConfig, - FeatureParameter + FeatureParameter, ) @@ -85,6 +85,7 @@ def test_offline_data_store_config(): } ) + def test_feature_metadata(): config = FeatureParameter(key="key", value="value") assert ordered(config.to_dict()) == ordered({"Key": "key", "Value": "value"}) From 5fe3c252dac3e68d68314005db4dbfdf35892eac Mon Sep 17 00:00:00 2001 From: Mizanur Rahman Date: Tue, 17 May 2022 17:33:13 -0700 Subject: [PATCH 3/6] Bug fix and format --- tests/integ/test_feature_store.py | 21 +++++---------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/tests/integ/test_feature_store.py b/tests/integ/test_feature_store.py index 601ca942ba..449ee1607e 100644 --- a/tests/integ/test_feature_store.py +++ b/tests/integ/test_feature_store.py @@ -25,11 +25,7 @@ from sagemaker.feature_store.feature_group import FeatureGroup, FeatureParameter from sagemaker.feature_store.inputs import FeatureValue from sagemaker.session import get_execution_role, Session -from sagemaker.feature_store.feature_definition import ( - FractionalFeatureDefinition, - IntegralFeatureDefinition, - StringFeatureDefinition, -) +from sagemaker.feature_store.feature_definition import FractionalFeatureDefinition from tests.integ.timeout import timeout BUCKET_POLICY = { @@ -255,7 +251,7 @@ def test_update_feature_group( feature_group.load_feature_definitions(data_frame=pandas_data_frame) with cleanup_feature_group(feature_group): - output = feature_group.create( + feature_group.create( s3_uri=offline_store_s3_uri, record_identifier_name="feature1", event_time_feature_name="feature3", @@ -269,15 +265,8 @@ def test_update_feature_group( feature_group.update(new_features) time.sleep(10) feature_definitions = feature_group.describe().get("FeatureDefinitions") - assert ( - any( - [ - True - for elem in feature_definitions - if new_feature_name in elem.values() - ] - ) - == True + assert any( + [True for elem in feature_definitions if new_feature_name in elem.values()] ) @@ -294,7 +283,7 @@ def test_feature_metadata( feature_group.load_feature_definitions(data_frame=pandas_data_frame) with cleanup_feature_group(feature_group): - output = feature_group.create( + feature_group.create( s3_uri=offline_store_s3_uri, record_identifier_name="feature1", event_time_feature_name="feature3", From f77bd64f0c5743601c38e108146f740f9c49e600 Mon Sep 17 00:00:00 2001 From: Mizanur Rahman Date: Wed, 18 May 2022 13:01:22 -0700 Subject: [PATCH 4/6] Test commit, will not be included in PR --- src/sagemaker/feature_store/feature_group.py | 63 -------------- src/sagemaker/feature_store/inputs.py | 24 ------ src/sagemaker/session.py | 62 -------------- tests/integ/test_feature_store.py | 85 +------------------ .../feature_store/test_feature_store.py | 46 ---------- .../sagemaker/feature_store/test_inputs.py | 6 -- 6 files changed, 1 insertion(+), 285 deletions(-) diff --git a/src/sagemaker/feature_store/feature_group.py b/src/sagemaker/feature_store/feature_group.py index dcd8171169..41bdcd764c 100644 --- a/src/sagemaker/feature_store/feature_group.py +++ b/src/sagemaker/feature_store/feature_group.py @@ -53,7 +53,6 @@ OfflineStoreConfig, DataCatalogConfig, FeatureValue, - FeatureParameter, ) logger = logging.getLogger(__name__) @@ -538,68 +537,6 @@ def describe(self, next_token: str = None) -> Dict[str, Any]: feature_group_name=self.name, next_token=next_token ) - def update(self, feature_additions: Sequence[FeatureDefinition]) -> Dict[str, Any]: - """Update a FeatureGroup and add new features from the given feature definitions. - - Args: - feature_additions (Sequence[Dict[str, str]): list of feature definitions to be updated. - Returns: - Response dict from service. - """ - - return self.sagemaker_session.update_feature_group( - feature_group_name=self.name, - feature_additions=[ - feature_addition.to_dict() for feature_addition in feature_additions - ], - ) - - def update_feature_metadata( - self, - feature_name: str, - description: str = None, - parameter_additions: Sequence[FeatureParameter] = None, - parameter_removals: Sequence[str] = None, - ) -> Dict[str, Any]: - """Update a feature metadata and add/remove metadata. - - Args: - feature_name (str): name of the feature to update. - description (str): description of the feature to update. - parameter_additions (Sequence[Dict[str, str]): list of feature parameter to be added. - parameter_removals (Sequence[str]): list of feature parameter key to be removed. - Returns: - Response dict from service. - """ - - return self.sagemaker_session.update_feature_metadata( - feature_group_name=self.name, - feature_name=feature_name, - description=description, - parameter_additions=[ - parameter_addition.to_dict() - for parameter_addition in parameter_additions - ] - if parameter_additions is not None - else [], - parameter_removals=parameter_removals - if parameter_removals is not None - else [], - ) - - def describe_feature_metadata(self, feature_name: str) -> Dict[str, Any]: - """Describe feature metadata by feature name. - - Args: - feature_name (str): name of the feature. - Returns: - Response dict from service. - """ - - return self.sagemaker_session.describe_feature_metadata( - feature_group_name=self.name, feature_name=feature_name - ) - def load_feature_definitions( self, data_frame: DataFrame, diff --git a/src/sagemaker/feature_store/inputs.py b/src/sagemaker/feature_store/inputs.py index 75cb99b5f6..1f31caa4ae 100644 --- a/src/sagemaker/feature_store/inputs.py +++ b/src/sagemaker/feature_store/inputs.py @@ -207,27 +207,3 @@ def to_dict(self) -> Dict[str, Any]: FeatureName=self.feature_name, ValueAsString=self.value_as_string, ) - - -@attr.s -class FeatureParameter(Config): - """FeatureParameter for FeatureStore. - - Attributes: - key (str): key of the parameter. - value (str): value of the parameter. - """ - - key: str = attr.ib(default=None) - value: str = attr.ib(default=None) - - def to_dict(self) -> Dict[str, Any]: - """Construct a dictionary based on the attributes provided. - - Returns: - dict represents the attributes. - """ - return Config.construct_dict( - Key=self.key, - Value=self.value, - ) diff --git a/src/sagemaker/session.py b/src/sagemaker/session.py index 68e71e870f..b37ac1ca58 100644 --- a/src/sagemaker/session.py +++ b/src/sagemaker/session.py @@ -4064,68 +4064,6 @@ def describe_feature_group( update_args(kwargs, NextToken=next_token) return self.sagemaker_client.describe_feature_group(**kwargs) - def update_feature_group( - self, feature_group_name: str, feature_additions: Sequence[Dict[str, str]] - ) -> Dict[str, Any]: - """Update a FeatureGroup and add new features from the given feature definitions. - - Args: - feature_group_name (str): name of the FeatureGroup to update. - feature_additions (Sequence[Dict[str, str]): list of feature definitions to be updated. - Returns: - Response dict from service. - """ - - return self.sagemaker_client.update_feature_group( - FeatureGroupName=feature_group_name, FeatureAdditions=feature_additions - ) - - def update_feature_metadata( - self, - feature_group_name: str, - feature_name: str, - description: str = None, - parameter_additions: Sequence[Dict[str, str]] = None, - parameter_removals: Sequence[str] = None, - ) -> Dict[str, Any]: - """Update a feature metadata and add/remove metadata. - - Args: - feature_group_name (str): name of the FeatureGroup to update. - feature_name (str): name of the feature to update. - description (str): description of the feature to update. - parameter_additions (Sequence[Dict[str, str]): list of feature parameter to be added. - parameter_removals (Sequence[Dict[str, str]): list of feature parameter to be removed. - Returns: - Response dict from service. - """ - - request = { - "FeatureGroupName": feature_group_name, - "FeatureName": feature_name, - "ParameterAdditions": parameter_additions, - "ParameterRemovals": parameter_removals, - } - if description is not None: - request["Description"] = description - return self.sagemaker_client.update_feature_metadata(**request) - - def describe_feature_metadata( - self, feature_group_name: str, feature_name: str - ) -> Dict[str, Any]: - """Describe feature metadata by feature name in FeatureStore service. - - Args: - feature_group_name (str): name of the FeatureGroup. - feature_name (str): name of the feature. - Returns: - Response dict from service. - """ - - return self.sagemaker_client.describe_feature_metadata( - FeatureGroupName=feature_group_name, FeatureName=feature_name - ) - def put_record( self, feature_group_name: str, diff --git a/tests/integ/test_feature_store.py b/tests/integ/test_feature_store.py index 449ee1607e..15c1db41ab 100644 --- a/tests/integ/test_feature_store.py +++ b/tests/integ/test_feature_store.py @@ -22,10 +22,9 @@ import pytest from pandas import DataFrame -from sagemaker.feature_store.feature_group import FeatureGroup, FeatureParameter +from sagemaker.feature_store.feature_group import FeatureGroup from sagemaker.feature_store.inputs import FeatureValue from sagemaker.session import get_execution_role, Session -from sagemaker.feature_store.feature_definition import FractionalFeatureDefinition from tests.integ.timeout import timeout BUCKET_POLICY = { @@ -238,88 +237,6 @@ def test_create_feature_store( assert output["FeatureGroupArn"].endswith(f"feature-group/{feature_group_name}") -def test_update_feature_group( - feature_store_session, - role, - feature_group_name, - offline_store_s3_uri, - pandas_data_frame, -): - feature_group = FeatureGroup( - name=feature_group_name, sagemaker_session=feature_store_session - ) - feature_group.load_feature_definitions(data_frame=pandas_data_frame) - - with cleanup_feature_group(feature_group): - feature_group.create( - s3_uri=offline_store_s3_uri, - record_identifier_name="feature1", - event_time_feature_name="feature3", - role_arn=role, - enable_online_store=True, - ) - _wait_for_feature_group_create(feature_group) - - new_feature_name = "new_feature" - new_features = [FractionalFeatureDefinition(feature_name=new_feature_name)] - feature_group.update(new_features) - time.sleep(10) - feature_definitions = feature_group.describe().get("FeatureDefinitions") - assert any( - [True for elem in feature_definitions if new_feature_name in elem.values()] - ) - - -def test_feature_metadata( - feature_store_session, - role, - feature_group_name, - offline_store_s3_uri, - pandas_data_frame, -): - feature_group = FeatureGroup( - name=feature_group_name, sagemaker_session=feature_store_session - ) - feature_group.load_feature_definitions(data_frame=pandas_data_frame) - - with cleanup_feature_group(feature_group): - feature_group.create( - s3_uri=offline_store_s3_uri, - record_identifier_name="feature1", - event_time_feature_name="feature3", - role_arn=role, - enable_online_store=True, - ) - _wait_for_feature_group_create(feature_group) - - parameter_additions = [ - FeatureParameter(key="key1", value="value1"), - FeatureParameter(key="key2", value="value2"), - ] - description = "test description" - feature_name = "feature1" - feature_group.update_feature_metadata( - feature_name=feature_name, - description=description, - parameter_additions=parameter_additions, - ) - describe_feature_metadata = feature_group.describe_feature_metadata( - feature_name=feature_name - ) - assert description == describe_feature_metadata.get("Description") - assert 2 == len(describe_feature_metadata.get("Parameters")) - - parameter_removals = ["key1"] - feature_group.update_feature_metadata( - feature_name=feature_name, parameter_removals=parameter_removals - ) - describe_feature_metadata = feature_group.describe_feature_metadata( - feature_name=feature_name - ) - assert description == describe_feature_metadata.get("Description") - assert 1 == len(describe_feature_metadata.get("Parameters")) - - def test_ingest_without_string_feature( feature_store_session, role, diff --git a/tests/unit/sagemaker/feature_store/test_feature_store.py b/tests/unit/sagemaker/feature_store/test_feature_store.py index 31e3ab5085..ef6a36980b 100644 --- a/tests/unit/sagemaker/feature_store/test_feature_store.py +++ b/tests/unit/sagemaker/feature_store/test_feature_store.py @@ -31,7 +31,6 @@ AthenaQuery, IngestionError, ) -from sagemaker.feature_store.inputs import FeatureParameter class PicklableMock(Mock): @@ -155,51 +154,6 @@ def test_feature_store_describe(sagemaker_session_mock): ) -def test_feature_store_update(sagemaker_session_mock, feature_group_dummy_definitions): - feature_group = FeatureGroup( - name="MyFeatureGroup", sagemaker_session=sagemaker_session_mock - ) - feature_group.update(feature_group_dummy_definitions) - sagemaker_session_mock.update_feature_group.assert_called_with( - feature_group_name="MyFeatureGroup", - feature_additions=[fd.to_dict() for fd in feature_group_dummy_definitions], - ) - - -def test_feature_metadata_update(sagemaker_session_mock): - feature_group = FeatureGroup( - name="MyFeatureGroup", sagemaker_session=sagemaker_session_mock - ) - - parameter_additions = [FeatureParameter(key="key1", value="value1")] - parameter_removals = ["key2"] - - feature_group.update_feature_metadata( - feature_name="Feature1", - description="DummyDescription", - parameter_additions=parameter_additions, - parameter_removals=parameter_removals, - ) - sagemaker_session_mock.update_feature_metadata.assert_called_with( - feature_group_name="MyFeatureGroup", - feature_name="Feature1", - description="DummyDescription", - parameter_additions=[pa.to_dict() for pa in parameter_additions], - parameter_removals=parameter_removals, - ) - - -def test_feature_metadata_describe(sagemaker_session_mock): - feature_group = FeatureGroup( - name="MyFeatureGroup", sagemaker_session=sagemaker_session_mock - ) - - feature_group.describe_feature_metadata(feature_name="Feature1") - sagemaker_session_mock.describe_feature_metadata.assert_called_with( - feature_group_name="MyFeatureGroup", feature_name="Feature1" - ) - - def test_put_record(sagemaker_session_mock): feature_group = FeatureGroup(name="MyFeatureGroup", sagemaker_session=sagemaker_session_mock) feature_group.put_record(record=[]) diff --git a/tests/unit/sagemaker/feature_store/test_inputs.py b/tests/unit/sagemaker/feature_store/test_inputs.py index 322a049309..d111cc0c00 100644 --- a/tests/unit/sagemaker/feature_store/test_inputs.py +++ b/tests/unit/sagemaker/feature_store/test_inputs.py @@ -19,7 +19,6 @@ S3StorageConfig, DataCatalogConfig, OfflineStoreConfig, - FeatureParameter, ) @@ -84,8 +83,3 @@ def test_offline_data_store_config(): "DisableGlueTableCreation": False, } ) - - -def test_feature_metadata(): - config = FeatureParameter(key="key", value="value") - assert ordered(config.to_dict()) == ordered({"Key": "key", "Value": "value"}) From 2ba00dfd476fe020bc0336d6c3c93c8bc14a94a9 Mon Sep 17 00:00:00 2001 From: Mizanur Rahman Date: Wed, 18 May 2022 20:17:58 -0700 Subject: [PATCH 5/6] Final version --- src/sagemaker/feature_store/feature_group.py | 61 ++++++++++++++ src/sagemaker/feature_store/inputs.py | 24 ++++++ src/sagemaker/session.py | 66 +++++++++++++++ tests/integ/test_feature_store.py | 80 ++++++++++++++++++- .../feature_store/test_feature_store.py | 47 +++++++++++ .../sagemaker/feature_store/test_inputs.py | 6 ++ tests/unit/test_session.py | 53 ++++++++++++ 7 files changed, 336 insertions(+), 1 deletion(-) diff --git a/src/sagemaker/feature_store/feature_group.py b/src/sagemaker/feature_store/feature_group.py index 41bdcd764c..f09b264e0f 100644 --- a/src/sagemaker/feature_store/feature_group.py +++ b/src/sagemaker/feature_store/feature_group.py @@ -53,6 +53,7 @@ OfflineStoreConfig, DataCatalogConfig, FeatureValue, + FeatureParameter, ) logger = logging.getLogger(__name__) @@ -537,6 +538,66 @@ def describe(self, next_token: str = None) -> Dict[str, Any]: feature_group_name=self.name, next_token=next_token ) + def update(self, feature_additions: Sequence[FeatureDefinition]) -> Dict[str, Any]: + """Update a FeatureGroup and add new features from the given feature definitions. + + Args: + feature_additions (Sequence[Dict[str, str]): list of feature definitions to be updated. + + Returns: + Response dict from service. + """ + + return self.sagemaker_session.update_feature_group( + feature_group_name=self.name, + feature_additions=[ + feature_addition.to_dict() for feature_addition in feature_additions + ], + ) + + def update_feature_metadata( + self, + feature_name: str, + description: str = None, + parameter_additions: Sequence[FeatureParameter] = None, + parameter_removals: Sequence[str] = None, + ) -> Dict[str, Any]: + """Update a feature metadata and add/remove metadata. + + Args: + feature_name (str): name of the feature to update. + description (str): description of the feature to update. + parameter_additions (Sequence[Dict[str, str]): list of feature parameter to be added. + parameter_removals (Sequence[str]): list of feature parameter key to be removed. + + Returns: + Response dict from service. + """ + return self.sagemaker_session.update_feature_metadata( + feature_group_name=self.name, + feature_name=feature_name, + description=description, + parameter_additions=[ + parameter_addition.to_dict() for parameter_addition in parameter_additions + ] + if parameter_additions is not None + else [], + parameter_removals=parameter_removals if parameter_removals is not None else [], + ) + + def describe_feature_metadata(self, feature_name: str) -> Dict[str, Any]: + """Describe feature metadata by feature name. + + Args: + feature_name (str): name of the feature. + Returns: + Response dict from service. + """ + + return self.sagemaker_session.describe_feature_metadata( + feature_group_name=self.name, feature_name=feature_name + ) + def load_feature_definitions( self, data_frame: DataFrame, diff --git a/src/sagemaker/feature_store/inputs.py b/src/sagemaker/feature_store/inputs.py index 1f31caa4ae..75cb99b5f6 100644 --- a/src/sagemaker/feature_store/inputs.py +++ b/src/sagemaker/feature_store/inputs.py @@ -207,3 +207,27 @@ def to_dict(self) -> Dict[str, Any]: FeatureName=self.feature_name, ValueAsString=self.value_as_string, ) + + +@attr.s +class FeatureParameter(Config): + """FeatureParameter for FeatureStore. + + Attributes: + key (str): key of the parameter. + value (str): value of the parameter. + """ + + key: str = attr.ib(default=None) + value: str = attr.ib(default=None) + + def to_dict(self) -> Dict[str, Any]: + """Construct a dictionary based on the attributes provided. + + Returns: + dict represents the attributes. + """ + return Config.construct_dict( + Key=self.key, + Value=self.value, + ) diff --git a/src/sagemaker/session.py b/src/sagemaker/session.py index b37ac1ca58..9ba94c7264 100644 --- a/src/sagemaker/session.py +++ b/src/sagemaker/session.py @@ -4064,6 +4064,72 @@ def describe_feature_group( update_args(kwargs, NextToken=next_token) return self.sagemaker_client.describe_feature_group(**kwargs) + def update_feature_group( + self, feature_group_name: str, feature_additions: Sequence[Dict[str, str]] + ) -> Dict[str, Any]: + """Update a FeatureGroup and add new features from the given feature definitions. + + Args: + feature_group_name (str): name of the FeatureGroup to update. + feature_additions (Sequence[Dict[str, str]): list of feature definitions to be updated. + Returns: + Response dict from service. + """ + + return self.sagemaker_client.update_feature_group( + FeatureGroupName=feature_group_name, FeatureAdditions=feature_additions + ) + + def update_feature_metadata( + self, + feature_group_name: str, + feature_name: str, + description: str = None, + parameter_additions: Sequence[Dict[str, str]] = None, + parameter_removals: Sequence[str] = None, + ) -> Dict[str, Any]: + """Update a feature metadata and add/remove metadata. + + Args: + feature_group_name (str): name of the FeatureGroup to update. + feature_name (str): name of the feature to update. + description (str): description of the feature to update. + parameter_additions (Sequence[Dict[str, str]): list of feature parameter to be added. + parameter_removals (Sequence[Dict[str, str]): list of feature parameter to be removed. + Returns: + Response dict from service. + """ + + request = { + "FeatureGroupName": feature_group_name, + "FeatureName": feature_name, + } + + if description is not None: + request["Description"] = description + if parameter_additions is not None: + request["ParameterAdditions"] = parameter_additions + if parameter_removals is not None: + request["ParameterRemovals"] = parameter_removals + + return self.sagemaker_client.update_feature_metadata(**request) + + def describe_feature_metadata( + self, feature_group_name: str, feature_name: str + ) -> Dict[str, Any]: + """Describe feature metadata by feature name in FeatureStore service. + + Args: + feature_group_name (str): name of the FeatureGroup. + feature_name (str): name of the feature. + Returns: + Response dict from service. + """ + + return self.sagemaker_client.describe_feature_metadata( + FeatureGroupName=feature_group_name, FeatureName=feature_name + ) + def put_record( self, feature_group_name: str, diff --git a/tests/integ/test_feature_store.py b/tests/integ/test_feature_store.py index 15c1db41ab..b94bde1145 100644 --- a/tests/integ/test_feature_store.py +++ b/tests/integ/test_feature_store.py @@ -22,8 +22,9 @@ import pytest from pandas import DataFrame +from sagemaker.feature_store.feature_definition import FractionalFeatureDefinition from sagemaker.feature_store.feature_group import FeatureGroup -from sagemaker.feature_store.inputs import FeatureValue +from sagemaker.feature_store.inputs import FeatureValue, FeatureParameter from sagemaker.session import get_execution_role, Session from tests.integ.timeout import timeout @@ -237,6 +238,83 @@ def test_create_feature_store( assert output["FeatureGroupArn"].endswith(f"feature-group/{feature_group_name}") +def test_update_feature_group( + feature_store_session, + role, + feature_group_name, + offline_store_s3_uri, + pandas_data_frame, +): + feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session) + feature_group.load_feature_definitions(data_frame=pandas_data_frame) + + with cleanup_feature_group(feature_group): + feature_group.create( + s3_uri=offline_store_s3_uri, + record_identifier_name="feature1", + event_time_feature_name="feature3", + role_arn=role, + enable_online_store=True, + ) + _wait_for_feature_group_create(feature_group) + + new_feature_name = "new_feature" + new_features = [FractionalFeatureDefinition(feature_name=new_feature_name)] + feature_group.update(new_features) + time.sleep(10) + feature_definitions = feature_group.describe().get("FeatureDefinitions") + assert any([True for elem in feature_definitions if new_feature_name in elem.values()]) + + +def test_feature_metadata( + feature_store_session, + role, + feature_group_name, + offline_store_s3_uri, + pandas_data_frame, +): + feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session) + feature_group.load_feature_definitions(data_frame=pandas_data_frame) + + with cleanup_feature_group(feature_group): + feature_group.create( + s3_uri=offline_store_s3_uri, + record_identifier_name="feature1", + event_time_feature_name="feature3", + role_arn=role, + enable_online_store=True, + ) + _wait_for_feature_group_create(feature_group) + + parameter_additions = [ + FeatureParameter(key="key1", value="value1"), + FeatureParameter(key="key2", value="value2"), + ] + description = "test description" + feature_name = "feature1" + feature_group.update_feature_metadata( + feature_name=feature_name, + description=description, + parameter_additions=parameter_additions, + ) + describe_feature_metadata = feature_group.describe_feature_metadata( + feature_name=feature_name + ) + print(describe_feature_metadata) + assert description == describe_feature_metadata.get("Description") + assert 2 == len(describe_feature_metadata.get("Parameters")) + + parameter_removals = ["key1"] + feature_group.update_feature_metadata( + feature_name=feature_name, parameter_removals=parameter_removals + ) + describe_feature_metadata = feature_group.describe_feature_metadata( + feature_name=feature_name + ) + assert description == describe_feature_metadata.get("Description") + assert 1 == len(describe_feature_metadata.get("Parameters")) + + def test_ingest_without_string_feature( feature_store_session, role, diff --git a/tests/unit/sagemaker/feature_store/test_feature_store.py b/tests/unit/sagemaker/feature_store/test_feature_store.py index ef6a36980b..8f2f0eb3f9 100644 --- a/tests/unit/sagemaker/feature_store/test_feature_store.py +++ b/tests/unit/sagemaker/feature_store/test_feature_store.py @@ -31,6 +31,7 @@ AthenaQuery, IngestionError, ) +from sagemaker.feature_store.inputs import FeatureParameter class PicklableMock(Mock): @@ -154,6 +155,52 @@ def test_feature_store_describe(sagemaker_session_mock): ) +def test_feature_store_update(sagemaker_session_mock, feature_group_dummy_definitions): + feature_group = FeatureGroup(name="MyFeatureGroup", sagemaker_session=sagemaker_session_mock) + feature_group.update(feature_group_dummy_definitions) + sagemaker_session_mock.update_feature_group.assert_called_with( + feature_group_name="MyFeatureGroup", + feature_additions=[fd.to_dict() for fd in feature_group_dummy_definitions], + ) + + +def test_feature_metadata_update(sagemaker_session_mock): + feature_group = FeatureGroup(name="MyFeatureGroup", sagemaker_session=sagemaker_session_mock) + + parameter_additions = [FeatureParameter(key="key1", value="value1")] + parameter_removals = ["key2"] + + feature_group.update_feature_metadata( + feature_name="Feature1", + description="TestDescription", + parameter_additions=parameter_additions, + parameter_removals=parameter_removals, + ) + sagemaker_session_mock.update_feature_metadata.assert_called_with( + feature_group_name="MyFeatureGroup", + feature_name="Feature1", + description="TestDescription", + parameter_additions=[pa.to_dict() for pa in parameter_additions], + parameter_removals=parameter_removals, + ) + feature_group.update_feature_metadata(feature_name="Feature1", description="TestDescription") + sagemaker_session_mock.update_feature_metadata.assert_called_with( + feature_group_name="MyFeatureGroup", + feature_name="Feature1", + description="TestDescription", + parameter_additions=[], + parameter_removals=[], + ) + + +def test_feature_metadata_describe(sagemaker_session_mock): + feature_group = FeatureGroup(name="MyFeatureGroup", sagemaker_session=sagemaker_session_mock) + feature_group.describe_feature_metadata(feature_name="Feature1") + sagemaker_session_mock.describe_feature_metadata.assert_called_with( + feature_group_name="MyFeatureGroup", feature_name="Feature1" + ) + + def test_put_record(sagemaker_session_mock): feature_group = FeatureGroup(name="MyFeatureGroup", sagemaker_session=sagemaker_session_mock) feature_group.put_record(record=[]) diff --git a/tests/unit/sagemaker/feature_store/test_inputs.py b/tests/unit/sagemaker/feature_store/test_inputs.py index d111cc0c00..322a049309 100644 --- a/tests/unit/sagemaker/feature_store/test_inputs.py +++ b/tests/unit/sagemaker/feature_store/test_inputs.py @@ -19,6 +19,7 @@ S3StorageConfig, DataCatalogConfig, OfflineStoreConfig, + FeatureParameter, ) @@ -83,3 +84,8 @@ def test_offline_data_store_config(): "DisableGlueTableCreation": False, } ) + + +def test_feature_metadata(): + config = FeatureParameter(key="key", value="value") + assert ordered(config.to_dict()) == ordered({"Key": "key", "Value": "value"}) diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index 4523253a7f..591c333d6e 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -2457,6 +2457,59 @@ def test_feature_group_describe(sagemaker_session): ) +def test_feature_group_update(sagemaker_session, feature_group_dummy_definitions): + sagemaker_session.update_feature_group( + feature_group_name="MyFeatureGroup", + feature_additions=feature_group_dummy_definitions, + ) + assert sagemaker_session.sagemaker_client.update_feature_group.called_with( + FeatureGroupName="MyFeatureGroup", + FeatureAdditions=feature_group_dummy_definitions, + ) + + +def test_feature_metadata_update(sagemaker_session): + parameter_additions = [ + { + "key": "TestKey", + "value": "TestValue", + } + ] + parameter_removals = ["TestKey"] + + sagemaker_session.update_feature_metadata( + feature_group_name="TestFeatureGroup", + feature_name="TestFeature", + description="TestDescription", + parameter_additions=parameter_additions, + parameter_removals=parameter_removals, + ) + assert sagemaker_session.sagemaker_client.update_feature_group.called_with( + feature_group_name="TestFeatureGroup", + FeatureName="TestFeature", + Description="TestDescription", + ParameterAdditions=parameter_additions, + ParameterRemovals=parameter_removals, + ) + sagemaker_session.update_feature_metadata( + feature_group_name="TestFeatureGroup", + feature_name="TestFeature", + ) + assert sagemaker_session.sagemaker_client.update_feature_group.called_with( + feature_group_name="TestFeatureGroup", + FeatureName="TestFeature", + ) + + +def test_feature_metadata_describe(sagemaker_session): + sagemaker_session.describe_feature_metadata( + feature_group_name="MyFeatureGroup", feature_name="TestFeature" + ) + assert sagemaker_session.sagemaker_client.describe_feature_metadata.called_with( + FeatureGroupName="MyFeatureGroup", FeatureName="TestFeature" + ) + + def test_start_query_execution(sagemaker_session): athena_mock = Mock() sagemaker_session.boto_session.client( From a740eff3d91caa3a90da0791d0735fef3065fb2c Mon Sep 17 00:00:00 2001 From: Mizanur Rahman Date: Wed, 15 Jun 2022 12:40:37 -0700 Subject: [PATCH 6/6] Added wait for feature group update --- tests/integ/test_feature_store.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/tests/integ/test_feature_store.py b/tests/integ/test_feature_store.py index b94bde1145..73f6cc9104 100644 --- a/tests/integ/test_feature_store.py +++ b/tests/integ/test_feature_store.py @@ -261,7 +261,7 @@ def test_update_feature_group( new_feature_name = "new_feature" new_features = [FractionalFeatureDefinition(feature_name=new_feature_name)] feature_group.update(new_features) - time.sleep(10) + _wait_for_feature_group_update(feature_group) feature_definitions = feature_group.describe().get("FeatureDefinitions") assert any([True for elem in feature_definitions if new_feature_name in elem.values()]) @@ -382,6 +382,18 @@ def _wait_for_feature_group_create(feature_group: FeatureGroup): print(f"FeatureGroup {feature_group.name} successfully created.") +def _wait_for_feature_group_update(feature_group: FeatureGroup): + status = feature_group.describe().get("LastUpdateStatus").get("Status") + while status == "InProgress": + print("Waiting for Feature Group Update") + time.sleep(5) + status = feature_group.describe().get("LastUpdateStatus").get("Status") + if status != "Successful": + print(feature_group.describe()) + raise RuntimeError(f"Failed to update feature group {feature_group.name}") + print(f"FeatureGroup {feature_group.name} successfully updated.") + + @contextmanager def cleanup_feature_group(feature_group: FeatureGroup): try: