diff --git a/.gitignore b/.gitignore index 0e63d2c505..9ac0639356 100644 --- a/.gitignore +++ b/.gitignore @@ -2,7 +2,7 @@ build src/*.egg-info .cache -.coverage +.coverage* sagemaker_venv* *.egg-info .tox diff --git a/doc/api/utility/featuregroup_utils.rst b/doc/api/utility/featuregroup_utils.rst new file mode 100644 index 0000000000..35bc8b9f09 --- /dev/null +++ b/doc/api/utility/featuregroup_utils.rst @@ -0,0 +1,7 @@ +FeatureGroup Utilities +---------------------- + +.. automodule:: sagemaker.feature_store.feature_utils + :members: + :undoc-members: + :show-inheritance: diff --git a/src/sagemaker/feature_store/feature_group.py b/src/sagemaker/feature_store/feature_group.py index 9bcf7e26bd..f271cedfd2 100644 --- a/src/sagemaker/feature_store/feature_group.py +++ b/src/sagemaker/feature_store/feature_group.py @@ -138,9 +138,14 @@ def get_query_execution(self) -> Dict[str, Any]: query_execution_id=self._current_query_execution_id ) - def as_dataframe(self) -> DataFrame: + def as_dataframe(self, **kwargs) -> DataFrame: """Download the result of the current query and load it into a DataFrame. + Args: + **kwargs (object): key arguments used for the method pandas.read_csv to be able to + have a better tuning on data. For more info read: + https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html + Returns: A pandas DataFrame contains the query result. """ @@ -161,7 +166,9 @@ def as_dataframe(self) -> DataFrame: query_execution_id=self._current_query_execution_id, filename=output_filename, ) - return pd.read_csv(output_filename, delimiter=",") + + kwargs.pop("delimiter", None) + return pd.read_csv(filepath_or_buffer=output_filename, delimiter=",", **kwargs) @attr.s diff --git a/src/sagemaker/feature_store/feature_utils.py b/src/sagemaker/feature_store/feature_utils.py new file mode 100644 index 0000000000..ed35555b48 --- /dev/null +++ b/src/sagemaker/feature_store/feature_utils.py @@ -0,0 +1,341 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""Utilities for working with FeatureGroups and FeatureStores.""" +from __future__ import absolute_import + +import re +import logging + +from typing import Union +from pathlib import Path + +import pandas +import boto3 +from pandas import DataFrame, Series, read_csv + +from sagemaker.feature_store.feature_group import FeatureGroup +from sagemaker.session import Session + +logger = logging.getLogger(__name__) + + +def get_session_from_role(region: str, assume_role: str = None) -> Session: + """Method used to get the :class:`sagemaker.session.Session` from a region and/or a role. + + Description: + If invoked from a session with a role that lacks permissions, it can temporarily + assume another role to perform certain tasks. + If `assume_role` is not specified it will attempt to use the default sagemaker + execution role to get the session to use the Feature Store runtime client. + + Args: + assume_role (str): (Optional) role name to be assumed + region (str): region name + + Returns: + :class:`sagemaker.session.Session` + """ + boto_session = boto3.Session(region_name=region) + + # It will try to assume the role specified + if assume_role: + sts = boto_session.client("sts", region_name=region) + + credentials = sts.assume_role( + RoleArn=assume_role, RoleSessionName="SagemakerExecution" + ).get("Credentials", {}) + + access_key_id = credentials.get("AccessKeyId", None) + secret_access_key = credentials.get("SecretAccessKey", None) + session_token = credentials.get("SessionToken", None) + + boto_session = boto3.session.Session( + region_name=region, + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key, + aws_session_token=session_token, + ) + + sagemaker_session = Session( + boto_session=boto_session, + sagemaker_client=boto_session.client("sagemaker"), + sagemaker_runtime_client=boto_session.client("sagemaker-runtime"), + sagemaker_featurestore_runtime_client=boto_session.client( + service_name="sagemaker-featurestore-runtime" + ), + ) + + return sagemaker_session + + +def get_feature_group_as_dataframe( + feature_group_name: str, + athena_bucket: str, + query: str = """SELECT * FROM "sagemaker_featurestore"."#{table}" + WHERE is_deleted=False """, + role: str = None, + region: str = None, + session=None, + event_time_feature_name: str = None, + latest_ingestion: bool = True, + verbose: bool = True, + **kwargs, +) -> DataFrame: + """:class:`sagemaker.feature_store.feature_group.FeatureGroup` as :class:`pandas.DataFrame` + + Examples: + >>> from sagemaker.feature_store.feature_utils import get_feature_group_as_dataframe + >>> + >>> region = "eu-west-1" + >>> fg_data = get_feature_group_as_dataframe(feature_group_name="feature_group", + >>> athena_bucket="s3://bucket/athena_queries", + >>> region=region, + >>> event_time_feature_name="EventTimeId" + >>> ) + >>> + >>> type(fg_data) + + + Description: + Method to run an athena query over a + :class:`sagemaker.feature_store.feature_group.FeatureGroup` in a Feature Store + to retrieve its data. It needs the :class:`sagemaker.session.Session` linked to a role + or the region and/or role used to work with Feature Stores (it uses the module + `sagemaker.feature_store.feature_utils.get_session_from_role` + to get the session). + + Args: + region (str): region of the target Feature Store + feature_group_name (str): feature store name + query (str): query to run. By default, it will take the latest ingest with data that + wasn't deleted. If latest_ingestion is False it will take all the data + in the feature group that wasn't deleted. It needs to use the keyword + "#{table}" to refer to the FeatureGroup name. e.g.: + 'SELECT * FROM "sagemaker_featurestore"."#{table}"' + It must not end by ';'. + athena_bucket (str): Amazon S3 bucket for running the query + role (str): role to be assumed to extract data from feature store. If not specified + the default sagemaker execution role will be used. + session (str): :class:`sagemaker.session.Session` + of SageMaker used to work with the feature store. Optional, with + role and region parameters it will infer the session. + event_time_feature_name (str): eventTimeId feature. Mandatory only if the + latest ingestion is True. + latest_ingestion (bool): if True it will get the data only from the latest ingestion. + If False it will take whatever is specified in the query, or + if not specify it, it will get all the data that wasn't deleted. + verbose (bool): if True show messages, if False is silent. + **kwargs (object): key arguments used for the method pandas.read_csv to be able to + have a better tuning on data. For more info read: + https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html + Returns: + :class:`pandas.DataFrame`: dataset with the data retrieved from feature group + """ + + logger.setLevel(logging.WARNING) + if verbose: + logger.setLevel(logging.INFO) + + if latest_ingestion: + if event_time_feature_name is not None: + query += str( + f"AND {event_time_feature_name}=(SELECT " + f"MAX({event_time_feature_name}) FROM " + '"sagemaker_featurestore"."#{table}")' + ) + else: + exc = Exception( + "Argument event_time_feature_name must be specified " + "when using latest_ingestion=True." + ) + logger.exception(exc) + raise exc + + query += ";" + + if session is not None: + sagemaker_session = session + elif region is not None: + sagemaker_session = get_session_from_role(region=region, assume_role=role) + else: + exc = Exception("Argument Session or role and region must be specified.") + logger.exception(exc) + raise exc + + msg = f"Feature Group used: {feature_group_name}" + logger.info(msg) + + fg = FeatureGroup(name=feature_group_name, sagemaker_session=sagemaker_session) + + sample_query = fg.athena_query() + query_string = re.sub(r"#\{(table)\}", sample_query.table_name, query) + + msg = f"Running query:\n\t{sample_query} \n\n\t-> Save on bucket {athena_bucket}\n" + logger.info(msg) + + sample_query.run(query_string=query_string, output_location=athena_bucket) + + sample_query.wait() + + # run Athena query. The output is loaded to a Pandas dataframe. + dataset = sample_query.as_dataframe(**kwargs) + + msg = f"Data shape retrieve from {feature_group_name}: {dataset.shape}" + logger.info(msg) + + return dataset + + +def _format_column_names(data: pandas.DataFrame) -> pandas.DataFrame: + """Formats the column names for :class:`sagemaker.feature_store.feature_group.FeatureGroup` + + Description: + Module to format correctly the name of the columns of a DataFrame + to later generate the features names of a Feature Group + + Args: + data (:class:`pandas.DataFrame`): dataframe used + + Returns: + :class:`pandas.DataFrame` + """ + data.rename(columns=lambda x: x.replace(" ", "_").replace(".", "").lower()[:62], inplace=True) + return data + + +def _cast_object_to_string(data_frame: pandas.DataFrame) -> pandas.DataFrame: + """Cast properly pandas object types to strings + + Description: + Method to convert 'object' and 'O' column dtypes of a pandas.DataFrame to + a valid string type recognized by Feature Groups. + + Args: + data_frame: dataframe used + Returns: + pandas.DataFrame + """ + for label in data_frame.select_dtypes(["object", "O"]).columns.tolist(): + data_frame[label] = data_frame[label].astype("str").astype("string") + return data_frame + + +def prepare_fg_from_dataframe_or_file( + dataframe_or_path: Union[str, Path, pandas.DataFrame], + feature_group_name: str, + role: str = None, + region: str = None, + session=None, + record_id: str = "record_id", + event_id: str = "data_as_of_date", + verbose: bool = False, + **kwargs, +) -> FeatureGroup: + """Prepares a dataframe to create a :class:`sagemaker.feature_store.feature_group.FeatureGroup` + + Description: + Function to prepare a :class:`pandas.DataFrame` read from a path to a csv file or pass it + directly to create a :class:`sagemaker.feature_store.feature_group.FeatureGroup`. + The path to the file needs proper dtypes, feature names and mandatory features (record_id, + event_id). + It needs the :class:`sagemaker.session.Session` linked to a role + or the region and/or role used to work with Feature Stores (it uses the module + `sagemaker.feature_store.feature_utils.get_session_from_role` + to get the session). + If record_id or event_id are not specified it will create ones + by default with the names 'record_id' and 'data_as_of_date'. + + Args: + feature_group_name (str): feature group name + dataframe_or_path (str, Path, pandas.DataFrame) : pandas.DataFrame or path to the data + verbose (bool) : True for displaying messages, False for silent method. + record_id (str, 'record_id'): (Optional) Feature identifier of the rows. If specified each + value of that feature has to be unique. If not specified or + record_id='record_id', then it will create a new feature from + the index of the pandas.DataFrame. + event_id (str) : (Optional) Feature with the time of the creation of data rows. + If not specified it will create one with the current time + called `data_as_of_date` + role (str) : role used to get the session. + region (str) : region used to get the session. + session (str): session of SageMaker used to work with the feature store + **kwargs (object): key arguments used for the method pandas.read_csv to be able to + have a better tuning on data. For more info read: + https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html + + Returns: + :class:`sagemaker.feature_store.feature_group.FeatureGroup`: + FG prepared with all the methods and definitions properly defined + """ + + logger.setLevel(logging.WARNING) + if verbose: + logger.setLevel(logging.INFO) + + if isinstance(dataframe_or_path, DataFrame): + data = dataframe_or_path + elif isinstance(dataframe_or_path, str): + kwargs.pop("filepath_or_buffer", None) + data = read_csv(filepath_or_buffer=dataframe_or_path, **kwargs) + else: + exc = Exception( + str( + f"Invalid type {type(dataframe_or_path)} for " + "argument dataframe_or_path. \nParameter must be" + " of type pandas.DataFrame or string" + ) + ) + logger.exception(exc) + raise exc + + # Formatting cols + data = _format_column_names(data=data) + data = _cast_object_to_string(data_frame=data) + + if record_id == "record_id" and record_id not in data.columns: + data[record_id] = data.index + + lg_uniq = len(data[record_id].unique()) + lg_id = len(data[record_id]) + + if lg_id != lg_uniq: + exc = Exception( + str( + f"Record identifier {record_id} have {abs(lg_id - lg_uniq)} " + "duplicated rows. \nRecord identifier must be unique" + " in each row." + ) + ) + logger.exception(exc) + raise exc + + if event_id not in data.columns: + import time + + current_time_sec = int(round(time.time())) + data[event_id] = Series([current_time_sec] * lg_id, dtype="float64") + + if session is not None: + sagemaker_session = session + elif role is not None and region is not None: + sagemaker_session = get_session_from_role(region=region) + else: + exc = Exception("Argument Session or role and region must be specified.") + logger.exception(exc) + raise exc + + feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=sagemaker_session) + + feature_group.load_feature_definitions(data_frame=data) + + return feature_group diff --git a/tests/integ/test_feature_store.py b/tests/integ/test_feature_store.py index ec301cce1d..cdb242ff67 100644 --- a/tests/integ/test_feature_store.py +++ b/tests/integ/test_feature_store.py @@ -12,9 +12,9 @@ # language governing permissions and limitations under the License. from __future__ import absolute_import +import datetime import json import time -import datetime import dateutil.parser as date_parser from contextlib import contextmanager @@ -24,6 +24,7 @@ import pytest from pandas import DataFrame +from sagemaker.feature_store.feature_utils import get_feature_group_as_dataframe from sagemaker.feature_store.feature_definition import FractionalFeatureDefinition from sagemaker.feature_store.feature_group import FeatureGroup from sagemaker.feature_store.feature_store import FeatureStore @@ -1348,6 +1349,82 @@ def _wait_for_feature_group_update(feature_group: FeatureGroup): print(f"FeatureGroup {feature_group.name} successfully updated.") +def test_get_feature_group_with_region( + feature_store_session, + region_name, + role, + feature_group_name, + offline_store_s3_uri, + pandas_data_frame, +): + feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session) + feature_group.load_feature_definitions(data_frame=pandas_data_frame) + + with cleanup_feature_group(feature_group): + output = feature_group.create( + s3_uri=offline_store_s3_uri, + record_identifier_name="feature1", + event_time_feature_name="feature3", + role_arn=role, + enable_online_store=True, + ) + _wait_for_feature_group_create(feature_group) + + feature_group.ingest( + data_frame=pandas_data_frame, max_workers=3, max_processes=2, wait=True + ) + + dataset = get_feature_group_as_dataframe( + feature_group_name=feature_group_name, + region=str(region_name), + event_time_feature_name="feature3", + latest_ingestion=True, + athena_bucket=f"{offline_store_s3_uri}/query", + verbose=False, + ) + + assert isinstance(dataset, DataFrame) + assert output["FeatureGroupArn"].endswith(f"feature-group/{feature_group_name}") + + +def test_get_feature_group_with_session( + feature_store_session, + role, + feature_group_name, + offline_store_s3_uri, + pandas_data_frame, +): + feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session) + feature_group.load_feature_definitions(data_frame=pandas_data_frame) + + with cleanup_feature_group(feature_group): + output = feature_group.create( + s3_uri=offline_store_s3_uri, + record_identifier_name="feature1", + event_time_feature_name="feature3", + role_arn=role, + enable_online_store=True, + ) + _wait_for_feature_group_create(feature_group) + + feature_group.ingest( + data_frame=pandas_data_frame, max_workers=3, max_processes=2, wait=True + ) + + dataset = get_feature_group_as_dataframe( + feature_group_name=feature_group_name, + session=feature_store_session, + event_time_feature_name="feature3", + latest_ingestion=True, + athena_bucket=f"{offline_store_s3_uri}/query", + verbose=False, + low_memory=False, + ) # Using kwargs to pass a parameter to pandas.read_csv + + assert isinstance(dataset, DataFrame) + assert output["FeatureGroupArn"].endswith(f"feature-group/{feature_group_name}") + + @contextmanager def cleanup_feature_group(feature_group: FeatureGroup): try: diff --git a/tests/unit/sagemaker/feature_store/test_feature_group.py b/tests/unit/sagemaker/feature_store/test_feature_group.py index d9fd16f313..9f3482ba3b 100644 --- a/tests/unit/sagemaker/feature_store/test_feature_group.py +++ b/tests/unit/sagemaker/feature_store/test_feature_group.py @@ -752,7 +752,7 @@ def test_athena_query_as_dataframe(read_csv, sagemaker_session_mock, query): query_execution_id="query_id", filename="tmp/query_id.csv", ) - read_csv.assert_called_with("tmp/query_id.csv", delimiter=",") + read_csv.assert_called_with(filepath_or_buffer="tmp/query_id.csv", delimiter=",") @patch("tempfile.gettempdir", Mock(return_value="tmp")) diff --git a/tests/unit/sagemaker/feature_store/test_feature_utils.py b/tests/unit/sagemaker/feature_store/test_feature_utils.py new file mode 100644 index 0000000000..db6d90b77b --- /dev/null +++ b/tests/unit/sagemaker/feature_store/test_feature_utils.py @@ -0,0 +1,125 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +# language governing permissions and limitations under the License. +"""Test for Feature Group Utils""" +from __future__ import absolute_import + +import pandas as pd +import pytest +from mock import Mock + +from sagemaker.feature_store.feature_utils import ( + _cast_object_to_string, + prepare_fg_from_dataframe_or_file, + get_feature_group_as_dataframe, +) +from sagemaker.feature_store.feature_definition import ( + FeatureTypeEnum, +) +from sagemaker.feature_store.feature_group import ( + FeatureGroup, +) + + +class PicklableMock(Mock): + """Mock class use for tests""" + + def __reduce__(self): + """Method from class Mock""" + return (Mock, ()) + + +@pytest.fixture +def sagemaker_session_mock(): + """Fixture Mock class""" + return Mock() + + +def test_convert_unsupported_types_to_supported(sagemaker_session_mock): + feature_group = FeatureGroup(name="FailedGroup", sagemaker_session=sagemaker_session_mock) + df = pd.DataFrame( + { + "float": pd.Series([2.0], dtype="float64"), + "int": pd.Series([2], dtype="int64"), + "object": pd.Series(["f1"], dtype="object"), + } + ) + # Converting object or O type to string + df = _cast_object_to_string(data_frame=df) + + feature_definitions = feature_group.load_feature_definitions(data_frame=df) + types = [fd.feature_type for fd in feature_definitions] + + assert types == [ + FeatureTypeEnum.FRACTIONAL, + FeatureTypeEnum.INTEGRAL, + FeatureTypeEnum.STRING, + ] + + +def test_prepare_fg_from_dataframe(sagemaker_session_mock): + very_long_name = "long" * 20 + df = pd.DataFrame( + { + "space feature": pd.Series([2.0], dtype="float64"), + "dot.feature": pd.Series([2], dtype="int64"), + very_long_name: pd.Series(["f1"], dtype="string"), + } + ) + + feature_group = prepare_fg_from_dataframe_or_file( + dataframe_or_path=df, + session=sagemaker_session_mock, + feature_group_name="testFG", + ) + + names = [fd.feature_name for fd in feature_group.feature_definitions] + types = [fd.feature_type for fd in feature_group.feature_definitions] + + assert names == [ + "space_feature", + "dotfeature", + very_long_name[:62], + "record_id", + "data_as_of_date", + ] + assert types == [ + FeatureTypeEnum.FRACTIONAL, + FeatureTypeEnum.INTEGRAL, + FeatureTypeEnum.STRING, + FeatureTypeEnum.INTEGRAL, + FeatureTypeEnum.FRACTIONAL, + ] + + +def test_get_fg_latest_without_eventid(sagemaker_session_mock): + with pytest.raises(Exception): + get_feature_group_as_dataframe( + session=sagemaker_session_mock, + feature_group_name="testFG", + athena_bucket="s3://test", + latest_ingestion=True, + event_time_feature_name=None, + ) + + +def test_get_fg_without_sess_role_region(sagemaker_session_mock): + with pytest.raises(Exception): + get_feature_group_as_dataframe( + session=None, + region=None, + role=None, + feature_group_name="testFG", + athena_bucket="s3://test", + latest_ingestion=False, + )