From 1ed07beb1ea09ebd376ebe39d8860262065dd44a Mon Sep 17 00:00:00 2001 From: Jose Juan Pena Date: Thu, 9 Mar 2023 12:16:49 +0100 Subject: [PATCH 01/30] feature: utilities to faciliate working with Feature Groups --- src/sagemaker/feature_store/feature_utils.py | 296 +++++++++++++++++++ 1 file changed, 296 insertions(+) create mode 100644 src/sagemaker/feature_store/feature_utils.py diff --git a/src/sagemaker/feature_store/feature_utils.py b/src/sagemaker/feature_store/feature_utils.py new file mode 100644 index 0000000000..addfdc1334 --- /dev/null +++ b/src/sagemaker/feature_store/feature_utils.py @@ -0,0 +1,296 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""Utilities for working with FeatureGroups and FeatureStores.""" +from __future__ import absolute_import + +import re +import logging + +from typing import Union +from pathlib import Path + +import pandas +import boto3 +from pandas import DataFrame, Series, read_csv + +from sagemaker.feature_store.feature_group import FeatureGroup +from sagemaker.session import Session + +logger = logging.getLogger(__name__) + + +def get_session_from_role(region: str, assume_role: str = None) -> Session: + """Method use to get the :class:`sagemaker.session.Session` from a role and a region. + Helpful in case it's invoke from a session with a role without permission it can assume + another role temporarily to perform certain tasks. + Args: + assume_role: role name + region: region name + Returns: + """ + boto_session = boto3.Session(region_name=region) + + # It will try to assume the role specified + if assume_role: + sts = boto_session.client( + "sts", region_name=region, endpoint_url="https://sts.eu-west-1.amazonaws.com" + ) + + metadata = sts.assume_role(RoleArn=assume_role, RoleSessionName="SagemakerExecution") + + access_key_id = metadata["Credentials"]["AccessKeyId"] + secret_access_key = metadata["Credentials"]["SecretAccessKey"] + session_token = metadata["Credentials"]["SessionToken"] + + boto_session = boto3.session.Session( + region_name=region, + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key, + aws_session_token=session_token, + ) + + # Sessions + sagemaker_client = boto_session.client("sagemaker") + sagemaker_runtime = boto_session.client("sagemaker-runtime") + runtime_client = boto_session.client(service_name="sagemaker-featurestore-runtime") + sagemaker_session = Session( + boto_session=boto_session, + sagemaker_client=sagemaker_client, + sagemaker_runtime_client=sagemaker_runtime, + sagemaker_featurestore_runtime_client=runtime_client, + ) + + return sagemaker_session + + +def get_feature_group_as_dataframe( + feature_group_name: str, + athena_bucket: str, + query: str = """SELECT * FROM "sagemaker_featurestore"."#{table}" + WHERE is_deleted=False """, + role: str = None, + region: str = None, + session=None, + event_time_feature_name: str = None, + latest_ingestion: bool = True, + verbose: bool = True, + **pandas_read_csv_kwargs, +) -> DataFrame: + """Get a :class:`sagemaker.feature_store.feature_group.FeatureGroup` as a pandas.DataFrame + Description: + Method to run an athena query over a Feature Group in a Feature Store + to retrieve its data.It needs the sagemaker.Session linked to a role + or the role and region used to work Feature Stores.Returns a dataframe + with the data. + Args: + region (str): region of the target Feature Store + feature_group_name (str): feature store name + query (str): query to run. By default, it will take the latest ingest with data that + wasn't deleted. If latest_ingestion is False it will take all the data + in the feature group that wasn't deleted. It needs to use the keyword + "#{table}" to refer to the FeatureGroup name. e.g.: + 'SELECT * FROM "sagemaker_featurestore"."#{table}"' + athena_bucket (str): Amazon S3 bucket for running the query + role (str): role of the account used to extract data from feature store + session (str): :class:`sagemaker.session.Session` + of SageMaker used to work with the feature store + event_time_feature_name (str): eventTimeId feature. Mandatory only if the + latest ingestion is True + latest_ingestion (bool): if True it will get the data only from the latest ingestion. + If False it will take whatever is specified in the query, or + if not specify it, it will get all the data that wasn't deleted. + verbose (bool): if True show messages, if False is silent. + Returns: + dataset (pandas.DataFrame): dataset with the data retrieved from feature group + """ + + logger.setLevel(logging.WARNING) + if verbose: + logger.setLevel(logging.INFO) + + if latest_ingestion: + if event_time_feature_name is not None: + query += str( + f"AND {event_time_feature_name}=(SELECT " + + f"MAX({event_time_feature_name}) FROM " + + '"sagemaker_featurestore"."#{table}")' + ) + else: + exc = Exception( + "Argument event_time_feature_name must be specified " + "when using latest_ingestion=True." + ) + logger.exception(exc) + raise exc + query += ";" + + if session is not None: + sagemaker_session = session + elif role is not None and region is not None: + sagemaker_session = get_session_from_role(region=region) + else: + exc = Exception("Argument Session or role and region must be specified.") + logger.exception(exc) + raise exc + + msg = f"Feature Group used: {feature_group_name}" + logger.info(msg) + + fg = FeatureGroup(name=feature_group_name, sagemaker_session=sagemaker_session) + + sample_query = fg.athena_query() + query_string = re.sub(r"#\{(table)\}", sample_query.table_name, query) + + msg = f"Running query:\n\t{sample_query} \n\n\t-> Save on bucket {athena_bucket}\n" + logger.info(msg) + + sample_query.run(query_string=query_string, output_location=athena_bucket) + + sample_query.wait() + + # run Athena query. The output is loaded to a Pandas dataframe. + dataset = sample_query.as_dataframe(**pandas_read_csv_kwargs) + + msg = f"Data shape retrieve from {feature_group_name}: {dataset.shape}" + logger.info(msg) + + return dataset + + +def _format_column_names(data: pandas.DataFrame) -> pandas.DataFrame: + """Formats the column names for :class:`sagemaker.feature_store.feature_group.FeatureGroup` + Description: + Module to format correctly the name of the columns of a DataFrame + to later generate the features names of a Feature Group + Args: + data (pandas.DataFrame): dataframe used + Returns: + pandas.DataFrame + """ + data.rename(columns=lambda x: x.replace(" ", "_").replace(".", "").lower()[:62], inplace=True) + return data + + +def _cast_object_to_string(data_frame: pandas.DataFrame) -> pandas.DataFrame: + """Cast properly pandas object types to strings + Method to convert 'object' and 'O' column dtypes of a pandas.DataFrame to + a valid string type recognized by Feature Groups. + Args: + data_frame: dataframe used + Returns: + pandas.DataFrame + """ + for label in data_frame.select_dtypes(["object", "O"]).columns.tolist(): + data_frame[label] = data_frame[label].astype("str").astype("string") + return data_frame + + +def prepare_fg_from_dataframe_or_file( + dataframe_or_path: Union[str, Path, pandas.DataFrame], + feature_group_name: str, + role: str = None, + region: str = None, + session=None, + record_id: str = "record_id", + event_id: str = "data_as_of_date", + verbose: bool = False, + **pandas_read_csv_kwargs +) -> FeatureGroup: + """Prepares a dataframe to create a :class:`sagemaker.feature_store.feature_group.FeatureGroup` + Description: + Function to prepare a dataframe for creating a Feature Group from a pandas.DataFrame + or a path to a file with proper dtypes, feature names and mandatory features (record_id, + event_id). It needs the sagemaker.Session linked to a role or the role and region used + to work Feature Stores. If record_id or event_id are not specified it will create ones + by default with the names 'record_id' and 'data_as_of_date'. + Args: + **pandas_read_csv_kwargs (object): + feature_group_name (str): feature group name + dataframe_or_path (str, Path, pandas.DataFrame) : pandas.DataFrame or path to the data + verbose (bool) : True for displaying messages, False for silent method. + record_id (str, 'record_id'): (Optional) Feature identifier of the rows. If specified each + value of that feature has to be unique. If not specified or + record_id='record_id', then it will create a new feature from + the index of the pandas.DataFrame. + event_id (str) : (Optional) Feature with the time of the creation of data rows. + If not specified it will create one with the current time + called `data_as_of_date` + role (str) : role used to get the session. + region (str) : region used to get the session. + session (str): session of SageMaker used to work with the feature store + Returns: + :class:`sagemaker.feature_store.feature_group.FeatureGroup`: FG prepared with all + the methods and definitions properly defined + """ + + logger.setLevel(logging.WARNING) + if verbose: + logger.setLevel(logging.INFO) + + if isinstance(dataframe_or_path, DataFrame): + data = dataframe_or_path + elif isinstance(dataframe_or_path, str): + pandas_read_csv_kwargs.pop("filepath_or_buffer", None) + data = read_csv(filepath_or_buffer=dataframe_or_path, **pandas_read_csv_kwargs) + else: + exc = Exception( + str( + f"Invalid type {type(dataframe_or_path)} for " + "argument dataframe_or_path. \nParameter must be" + " of type pandas.DataFrame or string" + ) + ) + logger.exception(exc) + raise exc + + # Formatting cols + data = _format_column_names(data=data) + data = _cast_object_to_string(data_frame=data) + + if record_id == "record_id" and record_id not in data.columns: + data[record_id] = data.index + + lg_uniq = len(data[record_id].unique()) + lg_id = len(data[record_id]) + + if lg_id != lg_uniq: + exc = Exception( + str( + f"Record identifier {record_id} have {abs(lg_id - lg_uniq)} " + "duplicated rows. \nRecord identifier must be unique" + " in each row." + ) + ) + logger.exception(exc) + raise exc + + if event_id not in data.columns: + import time + + current_time_sec = int(round(time.time())) + data[event_id] = Series([current_time_sec] * lg_id, dtype="float64") + + if session is not None: + sagemaker_session = session + elif role is not None and region is not None: + sagemaker_session = get_session_from_role(region=region) + else: + exc = Exception("Argument Session or role and region must be specified.") + logger.exception(exc) + raise exc + + feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=sagemaker_session) + + feature_group.load_feature_definitions(data_frame=data) + + return feature_group From cd6a48cc1c1c909df832229c3054d074b91cb0c4 Mon Sep 17 00:00:00 2001 From: Jose Juan Pena Date: Thu, 9 Mar 2023 12:31:04 +0100 Subject: [PATCH 02/30] feature: utilities to facilitate working with Feature Groups --- doc/api/utility/featuregroup_utils.rst | 7 ++ src/sagemaker/feature_store/feature_group.py | 12 ++- tests/integ/test_feature_store.py | 79 +++++++++++++- .../feature_store/test_feature_utils.py | 101 ++++++++++++++++++ 4 files changed, 196 insertions(+), 3 deletions(-) create mode 100644 doc/api/utility/featuregroup_utils.rst create mode 100644 tests/unit/sagemaker/feature_store/test_feature_utils.py diff --git a/doc/api/utility/featuregroup_utils.rst b/doc/api/utility/featuregroup_utils.rst new file mode 100644 index 0000000000..a41fc6ee9d --- /dev/null +++ b/doc/api/utility/featuregroup_utils.rst @@ -0,0 +1,7 @@ +FeatureGroup Utilities +---------------------- + +.. automodule:: sagemaker.feature_group_utils + :members: + :undoc-members: + :show-inheritance: \ No newline at end of file diff --git a/src/sagemaker/feature_store/feature_group.py b/src/sagemaker/feature_store/feature_group.py index 9bcf7e26bd..bebda84690 100644 --- a/src/sagemaker/feature_store/feature_group.py +++ b/src/sagemaker/feature_store/feature_group.py @@ -138,9 +138,15 @@ def get_query_execution(self) -> Dict[str, Any]: query_execution_id=self._current_query_execution_id ) - def as_dataframe(self) -> DataFrame: + def as_dataframe(self, **kwargs) -> DataFrame: """Download the result of the current query and load it into a DataFrame. + Args: + kwargs: key arguments used for the method pandas.read_csv to be able to have + a better tuning on data. + For more info read + https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html + Returns: A pandas DataFrame contains the query result. """ @@ -161,7 +167,9 @@ def as_dataframe(self) -> DataFrame: query_execution_id=self._current_query_execution_id, filename=output_filename, ) - return pd.read_csv(output_filename, delimiter=",") + + kwargs.pop("delimiter", None) + return pd.read_csv(filepath_or_buffer=output_filename, delimiter=",", **kwargs) @attr.s diff --git a/tests/integ/test_feature_store.py b/tests/integ/test_feature_store.py index ec301cce1d..36005fe12a 100644 --- a/tests/integ/test_feature_store.py +++ b/tests/integ/test_feature_store.py @@ -12,9 +12,9 @@ # language governing permissions and limitations under the License. from __future__ import absolute_import +import datetime import json import time -import datetime import dateutil.parser as date_parser from contextlib import contextmanager @@ -24,6 +24,7 @@ import pytest from pandas import DataFrame +from sagemaker.feature_store.feature_utils import get_feature_group_as_dataframe from sagemaker.feature_store.feature_definition import FractionalFeatureDefinition from sagemaker.feature_store.feature_group import FeatureGroup from sagemaker.feature_store.feature_store import FeatureStore @@ -1348,6 +1349,82 @@ def _wait_for_feature_group_update(feature_group: FeatureGroup): print(f"FeatureGroup {feature_group.name} successfully updated.") +def test_get_feature_group_with_role_region( + feature_store_session, + role, + feature_group_name, + offline_store_s3_uri, + pandas_data_frame, +): + feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session) + feature_group.load_feature_definitions(data_frame=pandas_data_frame) + + with cleanup_feature_group(feature_group): + output = feature_group.create( + s3_uri=offline_store_s3_uri, + record_identifier_name="feature1", + event_time_feature_name="feature3", + role_arn=role, + enable_online_store=True, + ) + _wait_for_feature_group_create(feature_group) + + feature_group.ingest( + data_frame=pandas_data_frame, max_workers=3, max_processes=2, wait=True + ) + + dataset = get_feature_group_as_dataframe( + feature_group_name=feature_group_name, + region=region_name, + role=role, + event_time_feature_name="feature3", + latest_ingestion=True, + athena_bucket=f"{offline_store_s3_uri}/query", + ) + + assert output["FeatureGroupArn"].endswith(f"feature-group/{feature_group_name}") + assert not dataset.empty + assert isinstance(dataset, DataFrame) + + +def test_get_feature_group_with_session( + feature_store_session, + role, + feature_group_name, + offline_store_s3_uri, + pandas_data_frame, +): + feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session) + feature_group.load_feature_definitions(data_frame=pandas_data_frame) + + with cleanup_feature_group(feature_group): + output = feature_group.create( + s3_uri=offline_store_s3_uri, + record_identifier_name="feature1", + event_time_feature_name="feature3", + role_arn=role, + enable_online_store=True, + ) + _wait_for_feature_group_create(feature_group) + + feature_group.ingest( + data_frame=pandas_data_frame, max_workers=3, max_processes=2, wait=True + ) + + dataset = get_feature_group_as_dataframe( + feature_group_name=feature_group_name, + session=feature_store_session, + event_time_feature_name="feature3", + latest_ingestion=True, + athena_bucket=f"{offline_store_s3_uri}/query", + low_memory=False, + ) # Using kwargs to pass a parameter to pandas.read_csv + + assert output["FeatureGroupArn"].endswith(f"feature-group/{feature_group_name}") + assert not dataset.empty + assert isinstance(dataset, DataFrame) + + @contextmanager def cleanup_feature_group(feature_group: FeatureGroup): try: diff --git a/tests/unit/sagemaker/feature_store/test_feature_utils.py b/tests/unit/sagemaker/feature_store/test_feature_utils.py new file mode 100644 index 0000000000..31058bc37d --- /dev/null +++ b/tests/unit/sagemaker/feature_store/test_feature_utils.py @@ -0,0 +1,101 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +# language governing permissions and limitations under the License. +"""Test for Feature Group Utils""" +from __future__ import absolute_import + +import pandas as pd +import pytest +from mock import Mock + +from sagemaker.feature_store.feature_utils import ( + _cast_object_to_string, + prepare_fg_from_dataframe_or_file, +) +from sagemaker.feature_store.feature_definition import ( + FeatureTypeEnum, +) +from sagemaker.feature_store.feature_group import ( + FeatureGroup, +) + + +class PicklableMock(Mock): + """Mock class use for tests""" + + def __reduce__(self): + """Method from class Mock""" + return (Mock, ()) + + +@pytest.fixture +def sagemaker_session_mock(): + """Fixture Mock class""" + return Mock() + + +def test_convert_unsupported_types_to_supported(sagemaker_session_mock): + feature_group = FeatureGroup(name="FailedGroup", sagemaker_session=sagemaker_session_mock) + df = pd.DataFrame( + { + "float": pd.Series([2.0], dtype="float64"), + "int": pd.Series([2], dtype="int64"), + "object": pd.Series(["f1"], dtype="object"), + } + ) + # Converting object or O type to string + df = _cast_object_to_string(data_frame=df) + + feature_definitions = feature_group.load_feature_definitions(data_frame=df) + types = [fd.feature_type for fd in feature_definitions] + + assert types == [ + FeatureTypeEnum.FRACTIONAL, + FeatureTypeEnum.INTEGRAL, + FeatureTypeEnum.STRING, + ] + + +def test_prepare_fg_from_dataframe(sagemaker_session_mock): + very_long_name = "long" * 20 + df = pd.DataFrame( + { + "space feature": pd.Series([2.0], dtype="float64"), + "dot.feature": pd.Series([2], dtype="int64"), + very_long_name: pd.Series(["f1"], dtype="string"), + } + ) + + feature_group = prepare_fg_from_dataframe_or_file( + dataframe_or_path=df, + session=sagemaker_session_mock, + feature_group_name="testFG", + ) + + names = [fd.feature_name for fd in feature_group.feature_definitions] + types = [fd.feature_type for fd in feature_group.feature_definitions] + + assert names == [ + "space_feature", + "dotfeature", + very_long_name[:62], + "record_id", + "data_as_of_date", + ] + assert types == [ + FeatureTypeEnum.FRACTIONAL, + FeatureTypeEnum.INTEGRAL, + FeatureTypeEnum.STRING, + FeatureTypeEnum.INTEGRAL, + FeatureTypeEnum.FRACTIONAL, + ] From 6aa67bc6025064118045975028d1a068b1573537 Mon Sep 17 00:00:00 2001 From: Jose Juan Pena Date: Thu, 9 Mar 2023 13:02:34 +0100 Subject: [PATCH 03/30] fix: docstring style and linting --- doc/api/utility/featuregroup_utils.rst | 2 +- src/sagemaker/feature_store/feature_utils.py | 68 ++++++++++++-------- 2 files changed, 43 insertions(+), 27 deletions(-) diff --git a/doc/api/utility/featuregroup_utils.rst b/doc/api/utility/featuregroup_utils.rst index a41fc6ee9d..41683ccf04 100644 --- a/doc/api/utility/featuregroup_utils.rst +++ b/doc/api/utility/featuregroup_utils.rst @@ -1,7 +1,7 @@ FeatureGroup Utilities ---------------------- -.. automodule:: sagemaker.feature_group_utils +.. automodule:: sagemaker.feature_store.feature_utils :members: :undoc-members: :show-inheritance: \ No newline at end of file diff --git a/src/sagemaker/feature_store/feature_utils.py b/src/sagemaker/feature_store/feature_utils.py index addfdc1334..899161fde1 100644 --- a/src/sagemaker/feature_store/feature_utils.py +++ b/src/sagemaker/feature_store/feature_utils.py @@ -31,11 +31,15 @@ def get_session_from_role(region: str, assume_role: str = None) -> Session: """Method use to get the :class:`sagemaker.session.Session` from a role and a region. - Helpful in case it's invoke from a session with a role without permission it can assume - another role temporarily to perform certain tasks. + + Description: + Helpful in case it's invoke from a session with a role without permission it can assume + another role temporarily to perform certain tasks. + Args: assume_role: role name region: region name + Returns: """ boto_session = boto3.Session(region_name=region) @@ -74,24 +78,26 @@ def get_session_from_role(region: str, assume_role: str = None) -> Session: def get_feature_group_as_dataframe( - feature_group_name: str, - athena_bucket: str, - query: str = """SELECT * FROM "sagemaker_featurestore"."#{table}" - WHERE is_deleted=False """, - role: str = None, - region: str = None, - session=None, - event_time_feature_name: str = None, - latest_ingestion: bool = True, - verbose: bool = True, - **pandas_read_csv_kwargs, + feature_group_name: str, + athena_bucket: str, + query: str = """SELECT * FROM "sagemaker_featurestore"."#{table}" + WHERE is_deleted=False """, + role: str = None, + region: str = None, + session=None, + event_time_feature_name: str = None, + latest_ingestion: bool = True, + verbose: bool = True, + **pandas_read_csv_kwargs, ) -> DataFrame: """Get a :class:`sagemaker.feature_store.feature_group.FeatureGroup` as a pandas.DataFrame + Description: Method to run an athena query over a Feature Group in a Feature Store to retrieve its data.It needs the sagemaker.Session linked to a role or the role and region used to work Feature Stores.Returns a dataframe with the data. + Args: region (str): region of the target Feature Store feature_group_name (str): feature store name @@ -110,6 +116,7 @@ def get_feature_group_as_dataframe( If False it will take whatever is specified in the query, or if not specify it, it will get all the data that wasn't deleted. verbose (bool): if True show messages, if False is silent. + Returns: dataset (pandas.DataFrame): dataset with the data retrieved from feature group """ @@ -121,8 +128,8 @@ def get_feature_group_as_dataframe( if latest_ingestion: if event_time_feature_name is not None: query += str( - f"AND {event_time_feature_name}=(SELECT " + - f"MAX({event_time_feature_name}) FROM " + + f"AND {event_time_feature_name}=(SELECT " + f"MAX({event_time_feature_name}) FROM " '"sagemaker_featurestore"."#{table}")' ) else: @@ -169,11 +176,14 @@ def get_feature_group_as_dataframe( def _format_column_names(data: pandas.DataFrame) -> pandas.DataFrame: """Formats the column names for :class:`sagemaker.feature_store.feature_group.FeatureGroup` + Description: Module to format correctly the name of the columns of a DataFrame to later generate the features names of a Feature Group + Args: data (pandas.DataFrame): dataframe used + Returns: pandas.DataFrame """ @@ -183,8 +193,11 @@ def _format_column_names(data: pandas.DataFrame) -> pandas.DataFrame: def _cast_object_to_string(data_frame: pandas.DataFrame) -> pandas.DataFrame: """Cast properly pandas object types to strings - Method to convert 'object' and 'O' column dtypes of a pandas.DataFrame to - a valid string type recognized by Feature Groups. + + Description: + Method to convert 'object' and 'O' column dtypes of a pandas.DataFrame to + a valid string type recognized by Feature Groups. + Args: data_frame: dataframe used Returns: @@ -196,23 +209,25 @@ def _cast_object_to_string(data_frame: pandas.DataFrame) -> pandas.DataFrame: def prepare_fg_from_dataframe_or_file( - dataframe_or_path: Union[str, Path, pandas.DataFrame], - feature_group_name: str, - role: str = None, - region: str = None, - session=None, - record_id: str = "record_id", - event_id: str = "data_as_of_date", - verbose: bool = False, - **pandas_read_csv_kwargs + dataframe_or_path: Union[str, Path, pandas.DataFrame], + feature_group_name: str, + role: str = None, + region: str = None, + session=None, + record_id: str = "record_id", + event_id: str = "data_as_of_date", + verbose: bool = False, + **pandas_read_csv_kwargs, ) -> FeatureGroup: """Prepares a dataframe to create a :class:`sagemaker.feature_store.feature_group.FeatureGroup` + Description: Function to prepare a dataframe for creating a Feature Group from a pandas.DataFrame or a path to a file with proper dtypes, feature names and mandatory features (record_id, event_id). It needs the sagemaker.Session linked to a role or the role and region used to work Feature Stores. If record_id or event_id are not specified it will create ones by default with the names 'record_id' and 'data_as_of_date'. + Args: **pandas_read_csv_kwargs (object): feature_group_name (str): feature group name @@ -228,6 +243,7 @@ def prepare_fg_from_dataframe_or_file( role (str) : role used to get the session. region (str) : region used to get the session. session (str): session of SageMaker used to work with the feature store + Returns: :class:`sagemaker.feature_store.feature_group.FeatureGroup`: FG prepared with all the methods and definitions properly defined From fabd4fd977f192131a8e43d858bfb8de48e75d59 Mon Sep 17 00:00:00 2001 From: Jose Juan Pena Date: Thu, 9 Mar 2023 13:42:08 +0100 Subject: [PATCH 04/30] doc: added more doc and examples --- src/sagemaker/feature_store/feature_group.py | 5 +- src/sagemaker/feature_store/feature_utils.py | 48 ++++++++++++++------ 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/src/sagemaker/feature_store/feature_group.py b/src/sagemaker/feature_store/feature_group.py index bebda84690..f271cedfd2 100644 --- a/src/sagemaker/feature_store/feature_group.py +++ b/src/sagemaker/feature_store/feature_group.py @@ -142,9 +142,8 @@ def as_dataframe(self, **kwargs) -> DataFrame: """Download the result of the current query and load it into a DataFrame. Args: - kwargs: key arguments used for the method pandas.read_csv to be able to have - a better tuning on data. - For more info read + **kwargs (object): key arguments used for the method pandas.read_csv to be able to + have a better tuning on data. For more info read: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html Returns: diff --git a/src/sagemaker/feature_store/feature_utils.py b/src/sagemaker/feature_store/feature_utils.py index 899161fde1..3c3936d231 100644 --- a/src/sagemaker/feature_store/feature_utils.py +++ b/src/sagemaker/feature_store/feature_utils.py @@ -88,10 +88,24 @@ def get_feature_group_as_dataframe( event_time_feature_name: str = None, latest_ingestion: bool = True, verbose: bool = True, - **pandas_read_csv_kwargs, + **kwargs, ) -> DataFrame: """Get a :class:`sagemaker.feature_store.feature_group.FeatureGroup` as a pandas.DataFrame + Examples: + >>> from sagemaker.feature_store.feature_utils import get_feature_group_as_dataframe + >>> + >>> region = "eu-west-1" + >>> fg_data = get_feature_group_as_dataframe(feature_group_name="feature_group", + >>> athena_bucket="s3://bucket/athena_queries", + >>> region=region, + >>> event_time_feature_name="EventTimeId" + >>> ) + >>> + >>> type(fg_data) + + >>> + Description: Method to run an athena query over a Feature Group in a Feature Store to retrieve its data.It needs the sagemaker.Session linked to a role @@ -106,17 +120,22 @@ def get_feature_group_as_dataframe( in the feature group that wasn't deleted. It needs to use the keyword "#{table}" to refer to the FeatureGroup name. e.g.: 'SELECT * FROM "sagemaker_featurestore"."#{table}"' + It must not end by ';'. athena_bucket (str): Amazon S3 bucket for running the query - role (str): role of the account used to extract data from feature store - session (str): :class:`sagemaker.session.Session` - of SageMaker used to work with the feature store + role (str): role to be assumed to extract data from feature store. If not specified + the default sagemaker execution role will be used. + session (str): `:obj:sagemaker.session.Session` + of SageMaker used to work with the feature store. Optional, with + role and region parameters it will infer the session. event_time_feature_name (str): eventTimeId feature. Mandatory only if the - latest ingestion is True + latest ingestion is True. latest_ingestion (bool): if True it will get the data only from the latest ingestion. If False it will take whatever is specified in the query, or if not specify it, it will get all the data that wasn't deleted. verbose (bool): if True show messages, if False is silent. - + **kwargs (object): key arguments used for the method pandas.read_csv to be able to + have a better tuning on data. For more info read: + https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html Returns: dataset (pandas.DataFrame): dataset with the data retrieved from feature group """ @@ -139,12 +158,13 @@ def get_feature_group_as_dataframe( ) logger.exception(exc) raise exc + query += ";" if session is not None: sagemaker_session = session - elif role is not None and region is not None: - sagemaker_session = get_session_from_role(region=region) + elif region is not None: + sagemaker_session = get_session_from_role(region=region, assume_role=role) else: exc = Exception("Argument Session or role and region must be specified.") logger.exception(exc) @@ -166,7 +186,7 @@ def get_feature_group_as_dataframe( sample_query.wait() # run Athena query. The output is loaded to a Pandas dataframe. - dataset = sample_query.as_dataframe(**pandas_read_csv_kwargs) + dataset = sample_query.as_dataframe(**kwargs) msg = f"Data shape retrieve from {feature_group_name}: {dataset.shape}" logger.info(msg) @@ -217,7 +237,7 @@ def prepare_fg_from_dataframe_or_file( record_id: str = "record_id", event_id: str = "data_as_of_date", verbose: bool = False, - **pandas_read_csv_kwargs, + **kwargs, ) -> FeatureGroup: """Prepares a dataframe to create a :class:`sagemaker.feature_store.feature_group.FeatureGroup` @@ -229,7 +249,9 @@ def prepare_fg_from_dataframe_or_file( by default with the names 'record_id' and 'data_as_of_date'. Args: - **pandas_read_csv_kwargs (object): + **kwargs (object): key arguments used for the method pandas.read_csv to be able to + have a better tuning on data. For more info read: + https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html feature_group_name (str): feature group name dataframe_or_path (str, Path, pandas.DataFrame) : pandas.DataFrame or path to the data verbose (bool) : True for displaying messages, False for silent method. @@ -256,8 +278,8 @@ def prepare_fg_from_dataframe_or_file( if isinstance(dataframe_or_path, DataFrame): data = dataframe_or_path elif isinstance(dataframe_or_path, str): - pandas_read_csv_kwargs.pop("filepath_or_buffer", None) - data = read_csv(filepath_or_buffer=dataframe_or_path, **pandas_read_csv_kwargs) + kwargs.pop("filepath_or_buffer", None) + data = read_csv(filepath_or_buffer=dataframe_or_path, **kwargs) else: exc = Exception( str( From 6ee4e0340c1b8f42b8a56a682ed8717db60f6adb Mon Sep 17 00:00:00 2001 From: JoseJuan98 Date: Thu, 9 Mar 2023 19:16:27 +0100 Subject: [PATCH 05/30] fix: black check --- src/sagemaker/feature_store/feature_utils.py | 26 ++++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/sagemaker/feature_store/feature_utils.py b/src/sagemaker/feature_store/feature_utils.py index 3c3936d231..9cd950a0c7 100644 --- a/src/sagemaker/feature_store/feature_utils.py +++ b/src/sagemaker/feature_store/feature_utils.py @@ -47,14 +47,16 @@ def get_session_from_role(region: str, assume_role: str = None) -> Session: # It will try to assume the role specified if assume_role: sts = boto_session.client( - "sts", region_name=region, endpoint_url="https://sts.eu-west-1.amazonaws.com" + "sts", region_name=region, endpoint_url=f"https://sts.{region}.amazonaws.com" ) - metadata = sts.assume_role(RoleArn=assume_role, RoleSessionName="SagemakerExecution") + credentials = sts.assume_role( + RoleArn=assume_role, RoleSessionName="SagemakerExecution" + ).get("Credentials", {}) - access_key_id = metadata["Credentials"]["AccessKeyId"] - secret_access_key = metadata["Credentials"]["SecretAccessKey"] - session_token = metadata["Credentials"]["SessionToken"] + access_key_id = credentials.get("AccessKeyId", None) + secret_access_key = credentials.get("SecretAccessKey", None) + session_token = credentials.get("SessionToken", None) boto_session = boto3.session.Session( region_name=region, @@ -63,15 +65,13 @@ def get_session_from_role(region: str, assume_role: str = None) -> Session: aws_session_token=session_token, ) - # Sessions - sagemaker_client = boto_session.client("sagemaker") - sagemaker_runtime = boto_session.client("sagemaker-runtime") - runtime_client = boto_session.client(service_name="sagemaker-featurestore-runtime") sagemaker_session = Session( boto_session=boto_session, - sagemaker_client=sagemaker_client, - sagemaker_runtime_client=sagemaker_runtime, - sagemaker_featurestore_runtime_client=runtime_client, + sagemaker_client=boto_session.client("sagemaker"), + sagemaker_runtime_client=boto_session.client("sagemaker-runtime"), + sagemaker_featurestore_runtime_client=boto_session.client( + service_name="sagemaker-featurestore-runtime" + ), ) return sagemaker_session @@ -81,7 +81,7 @@ def get_feature_group_as_dataframe( feature_group_name: str, athena_bucket: str, query: str = """SELECT * FROM "sagemaker_featurestore"."#{table}" - WHERE is_deleted=False """, + WHERE is_deleted=False """, role: str = None, region: str = None, session=None, From 5560de3c5efe39ddd21801b06bead54de34aadbd Mon Sep 17 00:00:00 2001 From: JoseJuan98 Date: Thu, 9 Mar 2023 19:36:13 +0100 Subject: [PATCH 06/30] fix: integ tests --- tests/integ/test_feature_store.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/integ/test_feature_store.py b/tests/integ/test_feature_store.py index 36005fe12a..8cda668303 100644 --- a/tests/integ/test_feature_store.py +++ b/tests/integ/test_feature_store.py @@ -1382,9 +1382,9 @@ def test_get_feature_group_with_role_region( athena_bucket=f"{offline_store_s3_uri}/query", ) + assert not dataset.empty + assert isinstance(dataset, DataFrame) assert output["FeatureGroupArn"].endswith(f"feature-group/{feature_group_name}") - assert not dataset.empty - assert isinstance(dataset, DataFrame) def test_get_feature_group_with_session( @@ -1420,9 +1420,9 @@ def test_get_feature_group_with_session( low_memory=False, ) # Using kwargs to pass a parameter to pandas.read_csv + assert not dataset.empty + assert isinstance(dataset, DataFrame) assert output["FeatureGroupArn"].endswith(f"feature-group/{feature_group_name}") - assert not dataset.empty - assert isinstance(dataset, DataFrame) @contextmanager From d936c29de30a89fe67bd70f9bed3d9313b1791b7 Mon Sep 17 00:00:00 2001 From: JoseJuan98 Date: Thu, 9 Mar 2023 19:40:00 +0100 Subject: [PATCH 07/30] fix: linting --- src/sagemaker/feature_store/feature_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sagemaker/feature_store/feature_utils.py b/src/sagemaker/feature_store/feature_utils.py index 9cd950a0c7..af1a4d90d1 100644 --- a/src/sagemaker/feature_store/feature_utils.py +++ b/src/sagemaker/feature_store/feature_utils.py @@ -81,7 +81,7 @@ def get_feature_group_as_dataframe( feature_group_name: str, athena_bucket: str, query: str = """SELECT * FROM "sagemaker_featurestore"."#{table}" - WHERE is_deleted=False """, + WHERE is_deleted=False """, role: str = None, region: str = None, session=None, From 5390e499393a6f8b7cfd14045c14aa491c5e5218 Mon Sep 17 00:00:00 2001 From: JoseJuan98 Date: Thu, 9 Mar 2023 23:00:30 +0100 Subject: [PATCH 08/30] fix: boto session default endpoint url --- src/sagemaker/feature_store/feature_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sagemaker/feature_store/feature_utils.py b/src/sagemaker/feature_store/feature_utils.py index af1a4d90d1..25fe22ff74 100644 --- a/src/sagemaker/feature_store/feature_utils.py +++ b/src/sagemaker/feature_store/feature_utils.py @@ -47,7 +47,7 @@ def get_session_from_role(region: str, assume_role: str = None) -> Session: # It will try to assume the role specified if assume_role: sts = boto_session.client( - "sts", region_name=region, endpoint_url=f"https://sts.{region}.amazonaws.com" + "sts", region_name=region ) credentials = sts.assume_role( From b587fc5203ddc5a87eddc4e9ad720f25274df5ab Mon Sep 17 00:00:00 2001 From: JoseJuan98 Date: Fri, 10 Mar 2023 21:37:01 +0100 Subject: [PATCH 09/30] fix: black checks --- src/sagemaker/feature_store/feature_utils.py | 4 +--- tests/integ/test_feature_store.py | 2 ++ 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/sagemaker/feature_store/feature_utils.py b/src/sagemaker/feature_store/feature_utils.py index 25fe22ff74..1195a61a49 100644 --- a/src/sagemaker/feature_store/feature_utils.py +++ b/src/sagemaker/feature_store/feature_utils.py @@ -46,9 +46,7 @@ def get_session_from_role(region: str, assume_role: str = None) -> Session: # It will try to assume the role specified if assume_role: - sts = boto_session.client( - "sts", region_name=region - ) + sts = boto_session.client("sts", region_name=region) credentials = sts.assume_role( RoleArn=assume_role, RoleSessionName="SagemakerExecution" diff --git a/tests/integ/test_feature_store.py b/tests/integ/test_feature_store.py index 8cda668303..ec6a48aff9 100644 --- a/tests/integ/test_feature_store.py +++ b/tests/integ/test_feature_store.py @@ -1380,6 +1380,7 @@ def test_get_feature_group_with_role_region( event_time_feature_name="feature3", latest_ingestion=True, athena_bucket=f"{offline_store_s3_uri}/query", + verbose=False, ) assert not dataset.empty @@ -1417,6 +1418,7 @@ def test_get_feature_group_with_session( event_time_feature_name="feature3", latest_ingestion=True, athena_bucket=f"{offline_store_s3_uri}/query", + verbose=False, low_memory=False, ) # Using kwargs to pass a parameter to pandas.read_csv From 553200990e412ef74041b7f667e292ca4d69725c Mon Sep 17 00:00:00 2001 From: JoseJuan98 Date: Fri, 10 Mar 2023 21:51:27 +0100 Subject: [PATCH 10/30] fix: integ test --- tests/integ/test_feature_store.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integ/test_feature_store.py b/tests/integ/test_feature_store.py index ec6a48aff9..248d7d424b 100644 --- a/tests/integ/test_feature_store.py +++ b/tests/integ/test_feature_store.py @@ -1351,6 +1351,7 @@ def _wait_for_feature_group_update(feature_group: FeatureGroup): def test_get_feature_group_with_role_region( feature_store_session, + region_name, role, feature_group_name, offline_store_s3_uri, @@ -1358,6 +1359,7 @@ def test_get_feature_group_with_role_region( ): feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session) feature_group.load_feature_definitions(data_frame=pandas_data_frame) + region = feature_store_session.boto_session.region_name with cleanup_feature_group(feature_group): output = feature_group.create( @@ -1375,7 +1377,7 @@ def test_get_feature_group_with_role_region( dataset = get_feature_group_as_dataframe( feature_group_name=feature_group_name, - region=region_name, + region=str(region), role=role, event_time_feature_name="feature3", latest_ingestion=True, @@ -1383,7 +1385,6 @@ def test_get_feature_group_with_role_region( verbose=False, ) - assert not dataset.empty assert isinstance(dataset, DataFrame) assert output["FeatureGroupArn"].endswith(f"feature-group/{feature_group_name}") @@ -1422,7 +1423,6 @@ def test_get_feature_group_with_session( low_memory=False, ) # Using kwargs to pass a parameter to pandas.read_csv - assert not dataset.empty assert isinstance(dataset, DataFrame) assert output["FeatureGroupArn"].endswith(f"feature-group/{feature_group_name}") From b5b511fce701ee575324ce9333d270cc792b8d7c Mon Sep 17 00:00:00 2001 From: JoseJuan98 Date: Fri, 10 Mar 2023 21:58:30 +0100 Subject: [PATCH 11/30] fix: integ test --- tests/integ/test_feature_store.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/integ/test_feature_store.py b/tests/integ/test_feature_store.py index 248d7d424b..537fbdb672 100644 --- a/tests/integ/test_feature_store.py +++ b/tests/integ/test_feature_store.py @@ -1359,7 +1359,6 @@ def test_get_feature_group_with_role_region( ): feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session) feature_group.load_feature_definitions(data_frame=pandas_data_frame) - region = feature_store_session.boto_session.region_name with cleanup_feature_group(feature_group): output = feature_group.create( @@ -1377,7 +1376,7 @@ def test_get_feature_group_with_role_region( dataset = get_feature_group_as_dataframe( feature_group_name=feature_group_name, - region=str(region), + region=str(region_name), role=role, event_time_feature_name="feature3", latest_ingestion=True, From ffcb3c219958807487dde06ce8a624f57e78d0cc Mon Sep 17 00:00:00 2001 From: JoseJuan98 Date: Fri, 10 Mar 2023 22:34:07 +0100 Subject: [PATCH 12/30] docs: added better docs --- src/sagemaker/feature_store/feature_utils.py | 23 ++++++++++---------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/sagemaker/feature_store/feature_utils.py b/src/sagemaker/feature_store/feature_utils.py index 1195a61a49..a212c5cc0b 100644 --- a/src/sagemaker/feature_store/feature_utils.py +++ b/src/sagemaker/feature_store/feature_utils.py @@ -33,14 +33,15 @@ def get_session_from_role(region: str, assume_role: str = None) -> Session: """Method use to get the :class:`sagemaker.session.Session` from a role and a region. Description: - Helpful in case it's invoke from a session with a role without permission it can assume - another role temporarily to perform certain tasks. + If invoked from a session with a role that lacks permissions, it can temporarily + assume another role to perform certain tasks. Args: - assume_role: role name - region: region name + assume_role (str): (Optional) role name to be assumed + region (str): region name Returns: + :class:`sagemaker.session.Session` """ boto_session = boto3.Session(region_name=region) @@ -135,7 +136,7 @@ def get_feature_group_as_dataframe( have a better tuning on data. For more info read: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html Returns: - dataset (pandas.DataFrame): dataset with the data retrieved from feature group + :class:`pandas.DataFrame`: dataset with the data retrieved from feature group """ logger.setLevel(logging.WARNING) @@ -203,7 +204,7 @@ def _format_column_names(data: pandas.DataFrame) -> pandas.DataFrame: data (pandas.DataFrame): dataframe used Returns: - pandas.DataFrame + :class:`pandas.DataFrame` """ data.rename(columns=lambda x: x.replace(" ", "_").replace(".", "").lower()[:62], inplace=True) return data @@ -247,9 +248,6 @@ def prepare_fg_from_dataframe_or_file( by default with the names 'record_id' and 'data_as_of_date'. Args: - **kwargs (object): key arguments used for the method pandas.read_csv to be able to - have a better tuning on data. For more info read: - https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html feature_group_name (str): feature group name dataframe_or_path (str, Path, pandas.DataFrame) : pandas.DataFrame or path to the data verbose (bool) : True for displaying messages, False for silent method. @@ -263,10 +261,13 @@ def prepare_fg_from_dataframe_or_file( role (str) : role used to get the session. region (str) : region used to get the session. session (str): session of SageMaker used to work with the feature store + **kwargs (object): key arguments used for the method pandas.read_csv to be able to + have a better tuning on data. For more info read: + https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html Returns: - :class:`sagemaker.feature_store.feature_group.FeatureGroup`: FG prepared with all - the methods and definitions properly defined + :class:`sagemaker.feature_store.feature_group.FeatureGroup`: + FG prepared with all the methods and definitions properly defined """ logger.setLevel(logging.WARNING) From ee59c3bc652f4158fddbc8c2129578123c243367 Mon Sep 17 00:00:00 2001 From: JoseJuan98 Date: Fri, 10 Mar 2023 22:40:10 +0100 Subject: [PATCH 13/30] docs: added better docs --- src/sagemaker/feature_store/feature_utils.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/sagemaker/feature_store/feature_utils.py b/src/sagemaker/feature_store/feature_utils.py index a212c5cc0b..b10775b993 100644 --- a/src/sagemaker/feature_store/feature_utils.py +++ b/src/sagemaker/feature_store/feature_utils.py @@ -30,11 +30,13 @@ def get_session_from_role(region: str, assume_role: str = None) -> Session: - """Method use to get the :class:`sagemaker.session.Session` from a role and a region. + """Method used to get the :class:`sagemaker.session.Session` from a region and/or a role. Description: If invoked from a session with a role that lacks permissions, it can temporarily assume another role to perform certain tasks. + If `assume_role` is not specified it will attempt to use the default sagemaker + execution role to get the session to use the Feature Store runtime client. Args: assume_role (str): (Optional) role name to be assumed @@ -103,7 +105,6 @@ def get_feature_group_as_dataframe( >>> >>> type(fg_data) - >>> Description: Method to run an athena query over a Feature Group in a Feature Store From 1d10a0ddcd9f53d1e1ab237884c80786d2c74c54 Mon Sep 17 00:00:00 2001 From: JoseJuan98 Date: Fri, 10 Mar 2023 23:03:52 +0100 Subject: [PATCH 14/30] docs: added more docs --- src/sagemaker/feature_store/feature_utils.py | 30 +++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/src/sagemaker/feature_store/feature_utils.py b/src/sagemaker/feature_store/feature_utils.py index b10775b993..13205bf8e2 100644 --- a/src/sagemaker/feature_store/feature_utils.py +++ b/src/sagemaker/feature_store/feature_utils.py @@ -91,7 +91,8 @@ def get_feature_group_as_dataframe( verbose: bool = True, **kwargs, ) -> DataFrame: - """Get a :class:`sagemaker.feature_store.feature_group.FeatureGroup` as a pandas.DataFrame + """Get data from a :class:`sagemaker.feature_store.feature_group.FeatureGroup` as + a :class:`pandas.DataFrame` Examples: >>> from sagemaker.feature_store.feature_utils import get_feature_group_as_dataframe @@ -107,10 +108,12 @@ def get_feature_group_as_dataframe( Description: - Method to run an athena query over a Feature Group in a Feature Store - to retrieve its data.It needs the sagemaker.Session linked to a role - or the role and region used to work Feature Stores.Returns a dataframe - with the data. + Method to run an athena query over a + :class:`sagemaker.feature_store.feature_group.FeatureGroup` in a Feature Store + to retrieve its data. It needs the :class:`sagemaker.session.Session` linked to a role + or the region and/or role used to work with Feature Stores (it uses the module + :module:`sagemaker.feature_store.feature_utils.get_session_from_role` + to get the session). Args: region (str): region of the target Feature Store @@ -124,7 +127,7 @@ def get_feature_group_as_dataframe( athena_bucket (str): Amazon S3 bucket for running the query role (str): role to be assumed to extract data from feature store. If not specified the default sagemaker execution role will be used. - session (str): `:obj:sagemaker.session.Session` + session (str): :class:`sagemaker.session.Session` of SageMaker used to work with the feature store. Optional, with role and region parameters it will infer the session. event_time_feature_name (str): eventTimeId feature. Mandatory only if the @@ -202,7 +205,7 @@ def _format_column_names(data: pandas.DataFrame) -> pandas.DataFrame: to later generate the features names of a Feature Group Args: - data (pandas.DataFrame): dataframe used + data (:class:`pandas.DataFrame`): dataframe used Returns: :class:`pandas.DataFrame` @@ -242,10 +245,15 @@ def prepare_fg_from_dataframe_or_file( """Prepares a dataframe to create a :class:`sagemaker.feature_store.feature_group.FeatureGroup` Description: - Function to prepare a dataframe for creating a Feature Group from a pandas.DataFrame - or a path to a file with proper dtypes, feature names and mandatory features (record_id, - event_id). It needs the sagemaker.Session linked to a role or the role and region used - to work Feature Stores. If record_id or event_id are not specified it will create ones + Function to prepare a :class:`pandas.DataFrame` read from a path to a csv file or pass it + directly to create a :class:`sagemaker.feature_store.feature_group.FeatureGroup`. + The path to the file needs proper dtypes, feature names and mandatory features (record_id, + event_id). + It needs the :class:`sagemaker.session.Session` linked to a role + or the region and/or role used to work with Feature Stores (it uses the module + :module:`sagemaker.feature_store.feature_utils.get_session_from_role` + to get the session). + If record_id or event_id are not specified it will create ones by default with the names 'record_id' and 'data_as_of_date'. Args: From 9eb872e23f67030c459167ce2833a0963238ea3e Mon Sep 17 00:00:00 2001 From: JoseJuan98 Date: Fri, 10 Mar 2023 23:07:41 +0100 Subject: [PATCH 15/30] fix: docstyle --- src/sagemaker/feature_store/feature_utils.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/sagemaker/feature_store/feature_utils.py b/src/sagemaker/feature_store/feature_utils.py index 13205bf8e2..8ab800eb77 100644 --- a/src/sagemaker/feature_store/feature_utils.py +++ b/src/sagemaker/feature_store/feature_utils.py @@ -91,8 +91,7 @@ def get_feature_group_as_dataframe( verbose: bool = True, **kwargs, ) -> DataFrame: - """Get data from a :class:`sagemaker.feature_store.feature_group.FeatureGroup` as - a :class:`pandas.DataFrame` + """:class:`sagemaker.feature_store.feature_group.FeatureGroup` as :class:`pandas.DataFrame` Examples: >>> from sagemaker.feature_store.feature_utils import get_feature_group_as_dataframe From e26a6c92ba73d7d1a765d99ffafbf923b3cd1a7a Mon Sep 17 00:00:00 2001 From: JoseJuan98 Date: Fri, 10 Mar 2023 23:20:40 +0100 Subject: [PATCH 16/30] fix: sphinx --- src/sagemaker/feature_store/feature_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sagemaker/feature_store/feature_utils.py b/src/sagemaker/feature_store/feature_utils.py index 8ab800eb77..ed35555b48 100644 --- a/src/sagemaker/feature_store/feature_utils.py +++ b/src/sagemaker/feature_store/feature_utils.py @@ -111,7 +111,7 @@ def get_feature_group_as_dataframe( :class:`sagemaker.feature_store.feature_group.FeatureGroup` in a Feature Store to retrieve its data. It needs the :class:`sagemaker.session.Session` linked to a role or the region and/or role used to work with Feature Stores (it uses the module - :module:`sagemaker.feature_store.feature_utils.get_session_from_role` + `sagemaker.feature_store.feature_utils.get_session_from_role` to get the session). Args: @@ -250,7 +250,7 @@ def prepare_fg_from_dataframe_or_file( event_id). It needs the :class:`sagemaker.session.Session` linked to a role or the region and/or role used to work with Feature Stores (it uses the module - :module:`sagemaker.feature_store.feature_utils.get_session_from_role` + `sagemaker.feature_store.feature_utils.get_session_from_role` to get the session). If record_id or event_id are not specified it will create ones by default with the names 'record_id' and 'data_as_of_date'. From ee1db55ce93fbdc2d24a0042f0c0a62d8c9a00cb Mon Sep 17 00:00:00 2001 From: JoseJuan98 Date: Mon, 13 Mar 2023 21:40:17 +0100 Subject: [PATCH 17/30] fix: doc8 missing EOL --- doc/api/utility/featuregroup_utils.rst | 2 +- tests/integ/test_feature_store.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/doc/api/utility/featuregroup_utils.rst b/doc/api/utility/featuregroup_utils.rst index 41683ccf04..35bc8b9f09 100644 --- a/doc/api/utility/featuregroup_utils.rst +++ b/doc/api/utility/featuregroup_utils.rst @@ -4,4 +4,4 @@ FeatureGroup Utilities .. automodule:: sagemaker.feature_store.feature_utils :members: :undoc-members: - :show-inheritance: \ No newline at end of file + :show-inheritance: diff --git a/tests/integ/test_feature_store.py b/tests/integ/test_feature_store.py index 537fbdb672..cdb242ff67 100644 --- a/tests/integ/test_feature_store.py +++ b/tests/integ/test_feature_store.py @@ -1349,7 +1349,7 @@ def _wait_for_feature_group_update(feature_group: FeatureGroup): print(f"FeatureGroup {feature_group.name} successfully updated.") -def test_get_feature_group_with_role_region( +def test_get_feature_group_with_region( feature_store_session, region_name, role, @@ -1377,7 +1377,6 @@ def test_get_feature_group_with_role_region( dataset = get_feature_group_as_dataframe( feature_group_name=feature_group_name, region=str(region_name), - role=role, event_time_feature_name="feature3", latest_ingestion=True, athena_bucket=f"{offline_store_s3_uri}/query", From 418da0e221001ce2b04e419c5116289f29c04ea3 Mon Sep 17 00:00:00 2001 From: JoseJuan98 Date: Tue, 14 Mar 2023 16:39:50 +0100 Subject: [PATCH 18/30] fix: test_feature_group --- tests/unit/sagemaker/feature_store/test_feature_group.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/sagemaker/feature_store/test_feature_group.py b/tests/unit/sagemaker/feature_store/test_feature_group.py index d9fd16f313..9f3482ba3b 100644 --- a/tests/unit/sagemaker/feature_store/test_feature_group.py +++ b/tests/unit/sagemaker/feature_store/test_feature_group.py @@ -752,7 +752,7 @@ def test_athena_query_as_dataframe(read_csv, sagemaker_session_mock, query): query_execution_id="query_id", filename="tmp/query_id.csv", ) - read_csv.assert_called_with("tmp/query_id.csv", delimiter=",") + read_csv.assert_called_with(filepath_or_buffer="tmp/query_id.csv", delimiter=",") @patch("tempfile.gettempdir", Mock(return_value="tmp")) From b5669f62593c5b5994a762b293971c6e7803b80f Mon Sep 17 00:00:00 2001 From: JoseJuan98 Date: Tue, 14 Mar 2023 17:05:02 +0100 Subject: [PATCH 19/30] feature: added more unit tests --- .../feature_store/test_feature_utils.py | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/tests/unit/sagemaker/feature_store/test_feature_utils.py b/tests/unit/sagemaker/feature_store/test_feature_utils.py index 31058bc37d..e095b42e11 100644 --- a/tests/unit/sagemaker/feature_store/test_feature_utils.py +++ b/tests/unit/sagemaker/feature_store/test_feature_utils.py @@ -21,6 +21,7 @@ from sagemaker.feature_store.feature_utils import ( _cast_object_to_string, prepare_fg_from_dataframe_or_file, + get_feature_group_as_dataframe ) from sagemaker.feature_store.feature_definition import ( FeatureTypeEnum, @@ -99,3 +100,26 @@ def test_prepare_fg_from_dataframe(sagemaker_session_mock): FeatureTypeEnum.INTEGRAL, FeatureTypeEnum.FRACTIONAL, ] + + +def test_get_fg_latest_without_eventid(sagemaker_session_mock): + with pytest.raises(Exception): + get_feature_group_as_dataframe( + session=sagemaker_session_mock, + feature_group_name="testFG", + athena_bucket="s3://test", + latest_ingestion=True, + event_time_feature_name=None + ) + + +def test_get_fg_without_sess_role_region(sagemaker_session_mock): + with pytest.raises(Exception): + get_feature_group_as_dataframe( + session=None, + region=None, + role=None, + feature_group_name="testFG", + athena_bucket="s3://test", + latest_ingestion=False + ) From b588b6280e37a33a058bb5694b868078acd9edbf Mon Sep 17 00:00:00 2001 From: JoseJuan98 Date: Tue, 14 Mar 2023 17:08:19 +0100 Subject: [PATCH 20/30] fix: black --- tests/unit/sagemaker/feature_store/test_feature_utils.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unit/sagemaker/feature_store/test_feature_utils.py b/tests/unit/sagemaker/feature_store/test_feature_utils.py index e095b42e11..db6d90b77b 100644 --- a/tests/unit/sagemaker/feature_store/test_feature_utils.py +++ b/tests/unit/sagemaker/feature_store/test_feature_utils.py @@ -21,7 +21,7 @@ from sagemaker.feature_store.feature_utils import ( _cast_object_to_string, prepare_fg_from_dataframe_or_file, - get_feature_group_as_dataframe + get_feature_group_as_dataframe, ) from sagemaker.feature_store.feature_definition import ( FeatureTypeEnum, @@ -109,7 +109,7 @@ def test_get_fg_latest_without_eventid(sagemaker_session_mock): feature_group_name="testFG", athena_bucket="s3://test", latest_ingestion=True, - event_time_feature_name=None + event_time_feature_name=None, ) @@ -121,5 +121,5 @@ def test_get_fg_without_sess_role_region(sagemaker_session_mock): role=None, feature_group_name="testFG", athena_bucket="s3://test", - latest_ingestion=False + latest_ingestion=False, ) From a14fd6565598cb4a97835a5bd4360145525dc1d6 Mon Sep 17 00:00:00 2001 From: Jose Juan Pena Date: Thu, 18 May 2023 10:48:25 +0200 Subject: [PATCH 21/30] fix: version invalid format --- .coverage.fedora.13188.403474 | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 .coverage.fedora.13188.403474 diff --git a/.coverage.fedora.13188.403474 b/.coverage.fedora.13188.403474 new file mode 100644 index 0000000000..e69de29bb2 From 22d8228d1dc034436d19559e9446feeaff4bff2b Mon Sep 17 00:00:00 2001 From: Jose Juan Pena Date: Thu, 18 May 2023 10:51:58 +0200 Subject: [PATCH 22/30] fix: rm .coverage files --- .coverage.fedora.13188.403474 | 0 .gitignore | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) delete mode 100644 .coverage.fedora.13188.403474 diff --git a/.coverage.fedora.13188.403474 b/.coverage.fedora.13188.403474 deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/.gitignore b/.gitignore index 0e63d2c505..9ac0639356 100644 --- a/.gitignore +++ b/.gitignore @@ -2,7 +2,7 @@ build src/*.egg-info .cache -.coverage +.coverage* sagemaker_venv* *.egg-info .tox From f12767beaa56d7a7687a89369a4da010ba31d935 Mon Sep 17 00:00:00 2001 From: Jose Juan Pena Date: Thu, 18 May 2023 10:48:25 +0200 Subject: [PATCH 23/30] fix: version invalid format --- .coverage.fedora.13188.403474 | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 .coverage.fedora.13188.403474 diff --git a/.coverage.fedora.13188.403474 b/.coverage.fedora.13188.403474 new file mode 100644 index 0000000000..e69de29bb2 From 314fd1691fb2935b345994a2b8891f4d4c17c194 Mon Sep 17 00:00:00 2001 From: Jose Juan Pena Date: Thu, 18 May 2023 10:51:58 +0200 Subject: [PATCH 24/30] fix: rm .coverage files --- .coverage.fedora.13188.403474 | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 .coverage.fedora.13188.403474 diff --git a/.coverage.fedora.13188.403474 b/.coverage.fedora.13188.403474 deleted file mode 100644 index e69de29bb2..0000000000 From c450cfd62ced77e433f03a59632eec88f7630791 Mon Sep 17 00:00:00 2001 From: Jose Juan Pena Date: Tue, 6 Jun 2023 11:54:57 +0200 Subject: [PATCH 25/30] fix: DeprecationWarning: 'source deactivate' is deprecated. Use 'conda deactivate'. --- tests/scripts/run-notebook-test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/scripts/run-notebook-test.sh b/tests/scripts/run-notebook-test.sh index 8f33fbc97c..22cbc36792 100755 --- a/tests/scripts/run-notebook-test.sh +++ b/tests/scripts/run-notebook-test.sh @@ -75,7 +75,7 @@ for env in base /home/ec2-user/anaconda3/envs/*; do $ENV_PYTHON -m pip install --upgrade pip $ENV_PYTHON -m pip install "$TARBALL_DIRECTORY/sagemaker.tar.gz" - sudo -u ec2-user -E sh -c 'source /home/ec2-user/anaconda3/bin/deactivate' + sudo -u ec2-user -E sh -c 'conda deactivate' echo "Update of $env is complete." done From 45e407da9932835be90c86fd6eb9697de24fab31 Mon Sep 17 00:00:00 2001 From: Jose Juan Pena Date: Thu, 18 May 2023 10:48:25 +0200 Subject: [PATCH 26/30] fix: version invalid format --- .coverage.fedora.13188.403474 | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 .coverage.fedora.13188.403474 diff --git a/.coverage.fedora.13188.403474 b/.coverage.fedora.13188.403474 new file mode 100644 index 0000000000..e69de29bb2 From 52ff5ab9088283cc91c507278f8235e7733a9d4c Mon Sep 17 00:00:00 2001 From: Jose Juan Pena Date: Thu, 18 May 2023 10:51:58 +0200 Subject: [PATCH 27/30] fix: rm .coverage files --- .coverage.fedora.13188.403474 | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 .coverage.fedora.13188.403474 diff --git a/.coverage.fedora.13188.403474 b/.coverage.fedora.13188.403474 deleted file mode 100644 index e69de29bb2..0000000000 From b6012e12b6afcf33b31de5615d46ad562f554749 Mon Sep 17 00:00:00 2001 From: Jose Juan Pena Date: Thu, 18 May 2023 10:48:25 +0200 Subject: [PATCH 28/30] fix: version invalid format --- .coverage.fedora.13188.403474 | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 .coverage.fedora.13188.403474 diff --git a/.coverage.fedora.13188.403474 b/.coverage.fedora.13188.403474 new file mode 100644 index 0000000000..e69de29bb2 From eae39afefdbb6b014435a05af6e2caf205850fba Mon Sep 17 00:00:00 2001 From: Jose Juan Pena Date: Thu, 18 May 2023 10:51:58 +0200 Subject: [PATCH 29/30] fix: rm .coverage files --- .coverage.fedora.13188.403474 | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 .coverage.fedora.13188.403474 diff --git a/.coverage.fedora.13188.403474 b/.coverage.fedora.13188.403474 deleted file mode 100644 index e69de29bb2..0000000000 From ca543d9f501a642f1501bdac0476cf8fa0bf81ba Mon Sep 17 00:00:00 2001 From: Jose Juan Pena Date: Fri, 15 Sep 2023 20:28:55 +0200 Subject: [PATCH 30/30] fix: revert notebooks test command --- tests/scripts/run-notebook-test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/scripts/run-notebook-test.sh b/tests/scripts/run-notebook-test.sh index 22cbc36792..8f33fbc97c 100755 --- a/tests/scripts/run-notebook-test.sh +++ b/tests/scripts/run-notebook-test.sh @@ -75,7 +75,7 @@ for env in base /home/ec2-user/anaconda3/envs/*; do $ENV_PYTHON -m pip install --upgrade pip $ENV_PYTHON -m pip install "$TARBALL_DIRECTORY/sagemaker.tar.gz" - sudo -u ec2-user -E sh -c 'conda deactivate' + sudo -u ec2-user -E sh -c 'source /home/ec2-user/anaconda3/bin/deactivate' echo "Update of $env is complete." done