In [None]:
import os
import re
import sys
import math
import json
import time
import warnings
import logging
import boto3
import botocore
import sagemaker
import numpy as np
import pandas as pd

from __future__ import annotations
from time import gmtime, strftime
from datetime import datetime, timezone, date
from platformdirs import site_config_dir, user_config_dir
from IPython.display import Image
from IPython.display import display
from IPython.display import FileLink, FileLinks
from sklearn.metrics import roc_auc_score
from sklearn.preprocessing import MinMaxScaler, LabelEncoder

# from sagemaker import Session
from sagemaker.session import Session
from sagemaker import get_execution_role
from sagemaker.experiments.run import Run, load_run
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.spark.processing import PySparkProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.debugger import Rule, ProfilerRule, rule_configs
from sagemaker.remote_function import remote, RemoteExecutor
from sagemaker.tuner import (
    CategoricalParameter, ContinuousParameter,
    HyperparameterTuner, IntegerParameter,
)
from sagemaker.feature_store.feature_store import FeatureStore
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.feature_store.inputs import FeatureParameter, TableFormatEnum, DataCatalogConfig, OnlineStoreStorageTypeEnum
from sagemaker.feature_store.feature_definition import StringFeatureDefinition
from sagemaker.feature_store.feature_definition import FeatureDefinition, FeatureTypeEnum
from sagemaker.feature_store.feature_utils import get_feature_group_as_dataframe

In [2]:
logger = logging.getLogger('__name__')
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

In [119]:
boto_session = boto3.Session()
client_sagemaker = boto_session.client("sagemaker")
client_s3 = boto_session.client("s3")
sagemaker_session = sagemaker.Session()
sagemaker_role = sagemaker.get_execution_role()
bucket_name = sagemaker_session.default_bucket()

## Carregando dados

In [None]:
class DataHelper:

    @staticmethod
    def generate_event_timestamp() -> str:
        return datetime.now().isoformat(timespec='seconds')

    @classmethod
    def create_event_time_column(cls, df: pd.DataFrame) -> pd.DataFrame:
        columns = list(df.columns)
        if "event_time" not in columns:
            df['event_time'] = cls.generate_event_timestamp()
            return df[['event_time'] + list(columns)]
        columns.remove('event_time')
        return df[['event_time'] + list(columns)]

    @staticmethod
    def rename_columns(col_name: str) -> str:
        return (
            col_name
            .replace('.', '_')
            .replace('-', '_')
            .rstrip('_')
        )

    @staticmethod
    def convert_dtypes(df: pd.DataFrame) -> pd.DataFrame:
        return pd.DataFrame.convert_dtypes(
            df,
            infer_objects=True,
            convert_string=True,
            convert_integer=True,
            convert_boolean=False,
            convert_floating=True,
            dtype_backend="numpy_nullable",
        )

    @staticmethod
    def convert_object_to_string(df: pd.DataFrame) -> pd.DataFrame:
        object_columns = df.select_dtypes(include=['object']).columns
        for col in object_columns:
            df[col] = df[col].astype("string[python]")
        return df

    @classmethod
    def prepare_date(cls, df: pd.DataFrame, limit: bool = False) -> pd.DataFrame:
        if limit:
            df = df.head(10)
        # df = cls.convert_dtypes(df).rename(columns=cls.rename_columns)
        # return cls.convert_object_to_string(cls.create_event_time_column(df))
        return (
            cls.convert_dtypes(df)
            .rename(columns=cls.rename_columns)
            .pipe(cls.create_event_time_column)
			.pipe(cls.convert_object_to_string)
		)

In [279]:
def generate_event_timestamp() -> str:
    return datetime.now().isoformat(timespec='seconds')


def rename_columns(col_name: str):
    return (
        col_name
        .replace('.', '_')
        .replace('-', '_')
        .rstrip('_')
    )


def convert_dtypes(df: pd.DataFrame) -> pd.DataFrame:
    return pd.DataFrame.convert_dtypes(
        df,
        infer_objects=True,
        convert_string=True,
        convert_integer=True,
        convert_boolean=False,
        convert_floating=True,
        dtype_backend="numpy_nullable",
    )

In [280]:
# df_clientes = convert_dtypes(pd.read_csv('data/clientes.csv')).rename(columns=rename_columns)
# df_pedidos = convert_dtypes(pd.read_csv('data/pedidos.csv')).rename(columns=rename_columns)
# df_produtos = convert_dtypes(pd.read_csv('data/produtos.csv')).rename(columns=rename_columns)

df_clientes = DataHelper.prepare_date(pd.read_csv('data/clientes.csv'))
df_pedidos = DataHelper.prepare_date(pd.read_csv('data/pedidos.csv'))
df_produtos = DataHelper.prepare_date(pd.read_csv('data/produtos.csv'))

In [281]:
print(f"df_clientes: {df_clientes.shape}")
print(f"df_pedidos:  {df_pedidos.shape}")
print(f"df_produtos: {df_produtos.shape}")

df_clientes: (10000, 11)
df_pedidos:  (100000, 7)
df_produtos: (17001, 22)


In [282]:
df_clientes.dtypes

event_time        string[python]
id_cliente        string[python]
genero                     Int64
casado                     Int64
idade_18_29                Int64
idade_30_39                Int64
idade_40_49                Int64
idade_50_59                Int64
idade_60_69                Int64
idade_70_plus              Int64
num_dias_ativo           Float64
dtype: object

## Utils

In [264]:
DataHelper.generate_event_timestamp()

'2025-04-19T15:41:36'

## Classes

In [265]:
# client_fs_runtime = sm_session.boto_session.client(
#     service_name="sagemaker-featurestore-runtime",
#     region_name=sagemaker_session.boto_session.region_name
# )

# feature_store_session = sagemaker_session(
#     boto_session=boto_session,
#     sagemaker_client=client_sagemaker,
#     sagemaker_featurestore_runtime_client=client_fs_runtime,
# )

In [None]:
class FeatureStoreHandler:

    _instance: FeatureStoreHandler | None = None
    _initialized: bool = False

    def __new__(cls, *args, **kwargs) -> FeatureStoreHandler:
        # print("new")
        if cls._instance is None:
            # print("new - instance")
            cls._instance = super().__new__(cls)
        return cls._instance

    def __init__(self, sm_session: sagemaker.Session, sm_role: str):
        # print("init")
        if self._initialized:
            # print("init - instance")
            return
        self._initialized = True
        self.sm_session = sm_session
        self.sm_role = sm_role
        self.client_sm = sm_session.boto_session.client(service_name="sagemaker")
        self.client_sm_fsr = self.create_client_sm_fs()
        self.feature_store_session = Session(
            boto_session=sm_session.boto_session,
            sagemaker_client=self.client_sm,
            sagemaker_featurestore_runtime_client=self.client_sm_fsr,
        )
        self.feature_store = FeatureStore(sagemaker_session=self.feature_store_session)
        self.data_helper = DataHelper()

    def create_client_sm_fs(self):
        return self.sm_session.boto_session.client(
            service_name="sagemaker-featurestore-runtime",
            region_name=self.sm_session.boto_session.region_name
        )

    @classmethod
    def get_dataset(cls, fg_name: str, output_location: str = "s3://athena-query-result-891377318910/feature-store/"):
        df_result, query = cls.feature_store.create_dataset(
            base=fg_name,
            output_path=output_location
        ).to_dataframe()
        return df_result, query

In [290]:
class FeatureGroupHandler:

    def __init__(self, feature_store: FeatureStoreHandler, name: str):
        self.feature_store = feature_store
        self.name = name
        # self.sm_session = sm_session
        # self.client_sm = sm_session.boto_session.client(service_name="sagemaker")
        # self.client_sm_fsr = self.create_client_sm_fs()
        self.feature_group = FeatureGroup(name=name, sagemaker_session=self.feature_store.sm_session)
        self.definitions = None

    # def create_client_sm_fs(self):
    #     return self.sm_session.boto_session.client(
    #         service_name="sagemaker-featurestore-runtime",
    #         region_name=self.sm_session.boto_session.region_name
    #     )

    def create_by_df(self, df: pd.DataFrame, bucket: str, prefix: str, fg_id: str, fg_time: str = "event_time"):
        if self.check_if_exist():
            print("Feature Group existente!")
            return
        assert "_event_time" not in df.columns, KeyError("Renomei a coluna '_event_time'.")
        df = self.feature_store.data_helper.prepare_date(df, limit=True)
        self.definitions = self.feature_group.load_feature_definitions(data_frame=df)
        print(f"definitions: ", self.definitions)
        self.create(bucket, prefix, fg_id, fg_time)
        self.wait_for_creation_complete()

    def check_if_exist(self) -> bool:
        feature_groups = self.feature_store.client_sm.search(Resource="FeatureGroup")
        fg_names = [ft['FeatureGroup']['FeatureGroupName'] for ft in feature_groups['Results']]
        return self.name in fg_names

    def create(self, bucket: str, prefix: str, fg_id: str, fg_time: str) -> None:
        self.feature_group.create(
            feature_group_name=self.name,
            s3_uri=f's3://{bucket}/{prefix}/{self.name}',
            record_identifier_name=fg_id,
            event_time_feature_name=fg_time,
            role_arn=self.feature_store.sm_role,
            enable_online_store=False,
            disable_glue_table_creation=False,
            table_format=TableFormatEnum.GLUE,  # ICEBERG | GLUE
            # data_catalog_config = "",
        )

    def wait_for_creation_complete(self) -> None:
        status = self.feature_group.describe().get('FeatureGroupStatus')
        while status == 'Creating':
            print(f'Waiting for feature group: {self.feature_group.name} to be created ...')
            time.sleep(5)
            status = self.feature_group.describe().get('FeatureGroupStatus')
        if status != 'Created':
            raise SystemExit(f'Failed to create feature group {self.feature_group.name}: {status}')
        print(f'FeatureGroup {self.feature_group.name} was successfully created.')

    def ingest_data(self, df: pd.DataFrame, max_workers: int | None = None, wait: bool = True) -> None:
        if not max_workers:
            max_workers = os.cpu_count()
        df = self.feature_store.data_helper.prepare_date(df)
        # Validade schema before ingest
        self.feature_group.ingest(
            data_frame=df,
            max_workers=max_workers,
            wait=wait
        )

    def get_record(self, fg_name: str) -> dict:
        return self.feature_store.client_sm_fsr.get_record(
            FeatureGroupName=self.name,
            RecordIdentifierValueAsString=str(fg_name)
        )

    def get_record_as_df(self, fg_name: str) -> pd.DataFrame:
        record = self.get_record(fg_name)
        return self.feature_store.data_helper.convert_dtypes(
            pd.DataFrame.from_records(record['Record'])
        )

    def list_features_group(self) -> list:
        response = self.client_sm.list_features_group()
        return response.get('FeatureGroupSummaries', [])

    def delete(self):
        self.feature_group.delete()

    def get_description(self):
        return self.feature_group.describe()

    def get_description_as_hive(self):
        return self.feature_group.as_hive_ddl()

    def get_feature_description(self, fg_name: str):
        return self.feature_group.describe_feature_metadata(fg_name)

    def query_athena(self, query: str, output_location: str = "s3://athena-query-result-891377318910/feature-store/") -> pd.DataFrame:
        fg_query = self.feature_group.athena_query()
        formated_query = query.format(fg=f'"{fg_query.table_name}"')
        print(formated_query)
        fg_query.run(query_string=formated_query, output_location=output_location)
        fg_query.wait()
        return self.feature_store.data_helper.convert_dtypes(fg_query.as_dataframe())

    def update_feature(self, fg_name: str, fg_desc: str) -> None:
        self.feature_group.update_feature_metadata(
            feature_name=fg_name,
            description=fg_desc,
            # feature_additions = [StringFeatureDefinition(ft_name)]
            # parameter_additions = [FeatureParameter(key="idType", value="primarykey")]
        )

    # @property
    # def feature_group(self):
    #     return self._feature_group

## Feature Group

In [291]:
print(f"df_clientes: {df_clientes.shape}")
print(f"df_pedidos:  {df_pedidos.shape}")
print(f"df_produtos: {df_produtos.shape}")

df_clientes: (10000, 11)
df_pedidos:  (100000, 7)
df_produtos: (17001, 22)


In [292]:
df_clientes.head(3)

Unnamed: 0,event_time,id_cliente,genero,casado,idade_18_29,idade_30_39,idade_40_49,idade_50_59,idade_60_69,idade_70_plus,num_dias_ativo
0,2024-11-27T04:57:49.698Z,C1,0,0,0,0,0,1,0,0,0.083562
1,2024-11-27T04:57:49.702Z,C2,1,0,1,0,0,0,0,0,0.659589
2,2024-11-27T04:57:49.708Z,C3,1,1,0,0,0,0,1,0,0.402055


In [270]:
feature_store = FeatureStoreHandler(sm_session=sagemaker_session, sm_role=sagemaker_role)

In [293]:
feature_group_name = "clientes_feature_group"
fs_prefix = f"workshop_v2/feature-store/{feature_group_name}"
fg_id = "id_cliente"
fg_time = "event_time"

In [294]:
feature_group = FeatureGroupHandler(feature_store=feature_store, name=feature_group_name)

In [295]:
feature_group.create_by_df(
    df_clientes,
    bucket_name,
    fs_prefix,
    fg_id,
    fg_time
)

Feature Group existente!


In [109]:
# feature_group.list_features_group()

In [305]:
# feature_group.delete()

In [302]:
query = 'SELECT * FROM {fg} LIMIT 100'

df = feature_group.query_athena(query)

SELECT * FROM "clientes_feature_group_1745077301" LIMIT 100


In [303]:
df.dtypes

id_cliente             object
genero                 object
casado                 object
event_time             object
idade_18_29            object
idade_30_39            object
idade_40_49            object
idade_50_59            object
idade_60_69            object
idade_70_plus          object
num_dias_ativo         object
write_time             object
api_invocation_time    object
is_deleted             object
dtype: object

In [304]:
df.shape

(0, 14)

In [299]:
df.head()

Unnamed: 0,id_cliente,genero,casado,event_time,idade_18_29,idade_30_39,idade_40_49,idade_50_59,idade_60_69,idade_70_plus,num_dias_ativo,write_time,api_invocation_time,is_deleted


In [301]:
feature_group.ingest_data(df_clientes)

In [None]:
fg_data = get_feature_group_as_dataframe(
    feature_group_name="feature_group",
     athena_bucket="s3://bucket/athena_queries",
     region="us-east-1",
     event_time_feature_name="event_time"
 )