In [None]:
import pandas as pd

In [None]:
df = pd.read_csv("realtime_simulation.csv")
df

In [None]:
# Read the dataframe
# Get count of appearances for each customer_id
customer_counts = df['customer_id'].value_counts()

# Filter for customers appearing once
single_transaction_customers = customer_counts[customer_counts == 2]

print(f"Number of customers with single transaction: {len(single_transaction_customers)}")
print("\nThese customers are:")
print(single_transaction_customers)

# # If you want to see the actual transactions for these customers:
# single_customer_transactions = df[df['customer_id'].isin(single_transaction_customers.index)]
# print("\nTheir transactions:")
# print(single_customer_transactions)

# As a percentage of total customers
total_customers = df['customer_id'].nunique()
percentage = (len(single_transaction_customers) / total_customers) * 100
print(f"\nPercentage of customers with single transaction: {percentage:.2f}%")

In [None]:
df[df["customer_id"]==16]

In [None]:
!pip install scikit-learn

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split

def split_data(input_file, train_ratio=0.7):
    """Split input CSV into historical and real-time simulation data"""
    df = pd.read_csv(input_file)
    df['purchase_timestamp'] = pd.to_datetime(df['purchase_timestamp'])  # Ensure timestamp is datetime
    train_df, test_df = train_test_split(df, train_size=train_ratio, random_state=42)
    
    # Save splits to CSV
    train_df.to_csv('historical_data.csv', index=False)
    test_df.to_csv('realtime_simulation.csv', index=False)
    
    return train_df, test_df

split_data(input_file="data/test_task_data.csv", train_ratio=0.7)

links

https://www.youtube.com/watch?v=mHEUlPFT6xg&ab_channel=AmazonWebServices

https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_featurestore.html

https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store-create-a-dataset.html

In [None]:
historical_df

In [None]:
import pandas as pd

# Read the CSV file
historical_df = pd.read_csv("historical_data.csv")

# Convert purchase_timestamp to datetime
historical_df['purchase_timestamp'] = pd.to_datetime(historical_df['purchase_timestamp'])

# Sort the dataframe by customer_id and purchase_timestamp
historical_df = historical_df.sort_values(['customer_id', 'purchase_timestamp'])

# Function to calculate average of past purchases
def past_purchases_avg(group):
    return group.shift(1).expanding().mean()

# Calculate the average of past purchases for each customer
historical_df['avg_past_purchases'] = historical_df.groupby('customer_id')['purchase_value'].transform(past_purchases_avg)

# Replace NaN (for first purchases) with 0
historical_df['avg_past_purchases'] = historical_df['avg_past_purchases'].fillna(0)

historical_df = historical_df[['customer_id', 'purchase_timestamp', 'purchase_value', 'avg_past_purchases', 'loyalty_score']]

historical_df.head(50)
# # Display the first few rows of the updated dataframe
# print(historical_df.head(20))

# # Display summary statistics
# print(historical_df.describe())

# # Verify first purchases
# first_purchases = historical_df.groupby('customer_id').first()
# print("\nFirst purchases for each customer:")
# print(first_purchases[['purchase_value', 'avg_past_purchases']].head())

# # Verify subsequent purchases
# print("\nSubsequent purchases for a sample customer:")
# print(historical_df[historical_df['customer_id'] == historical_df['customer_id'].iloc[0]].head())

In [None]:
historical_df.columns

In [None]:
print("\nSubsequent purchases for a sample customer:")
print(historical_df[historical_df['customer_id'] == historical_df['customer_id'].iloc[0]].head())

In [None]:
import pandas as pd
historical_df = pd.read_csv("historical_data.csv")
historical_df = historical_df.sort_values("purchase_timestamp")
historical_df
# # Calculate features per customer
customer_features = (
    historical_df.groupby("customer_id")
    .agg(
        {
            "purchase_value": ["mean", "last"],
        }
    )
    .reset_index()
)

customer_features.columns = [
    "customer_id",
    "avg_purchase_value",
    "latest_purchase_value",
    "loyalty_score",
]

# # Add event time
# customer_features["purchase_timestamp"] = datetime.now().isoformat()


In [None]:
customer_features

In [None]:
import pandas as pd
import time
from datetime import datetime
import boto3
import sagemaker
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.feature_store.feature_definition import (
    FeatureDefinition,
    FeatureTypeEnum,
)
from sagemaker.session import Session

region = boto3.Session().region_name

boto_session = boto3.Session(region_name=region)

sagemaker_client = boto_session.client(service_name="sagemaker", region_name=region)
featurestore_runtime = boto_session.client(
    service_name="sagemaker-featurestore-runtime", region_name=region
)

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime,
)

In [None]:
import boto3
import time
from datetime import datetime
import pandas as pd
from botocore.exceptions import ClientError
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.feature_store.feature_definition import FeatureDefinition, FeatureTypeEnum

In [None]:
class CustomerMappingStore:
    def __init__(self, table_name, region_name):
        self.table_name = table_name
        self.dynamodb = boto3.resource('dynamodb', region_name=region_name)
        self.table = self.dynamodb.Table(table_name)

    def create_table(self):
        """Create DynamoDB table for customer_id to record_id mapping"""
        try:
            self.dynamodb.create_table(
                TableName=self.table_name,
                KeySchema=[
                    {'AttributeName': 'customer_id', 'KeyType': 'HASH'}
                ],
                AttributeDefinitions=[
                    {'AttributeName': 'customer_id', 'AttributeType': 'S'}
                ],
                BillingMode='PAY_PER_REQUEST'
            )
            print(f"Created mapping table {self.table_name}")
            time.sleep(10)  # Wait for table creation
            self.table = self.dynamodb.Table(self.table_name)
        except ClientError as e:
            if e.response['Error']['Code'] != 'ResourceInUseException':
                raise

    def batch_update_mappings(self, mappings):
        """Batch update customer_id to record_id mappings"""
        try:
            with self.table.batch_writer() as batch:
                for item in mappings:
                    batch.put_item(
                        Item={
                            'customer_id': str(item['customer_id']),
                            'record_id': str(item['record_id']),
                            'last_updated': int(time.time())
                        }
                    )
            print(f"Updated {len(mappings)} customer mappings")
        except Exception as e:
            print(f"Error batch updating mappings: {e}")
            raise

    def get_latest_record_id(self, customer_id):
        """Get the latest record_id for a customer"""
        try:
            response = self.table.get_item(
                Key={'customer_id': str(customer_id)}
            )
            if 'Item' in response:
                return response['Item']['record_id']
            return None
        except Exception as e:
            print(f"Error getting mapping for customer {customer_id}: {e}")
            return None

In [None]:
class SageMakerFeatureStore:
    def __init__(self, sagemaker_session, s3_bucket=None):
        self.sagemaker_session = sagemaker_session
        self.feature_group = None
        self.s3_bucket = s3_bucket or f"sagemaker-{self.sagemaker_session.boto_region_name}"

    @staticmethod
    def create_record_id(customer_id, timestamp):
        """Create a composite record_id from customer_id and timestamp"""
        if isinstance(timestamp, str):
            timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ")
        timestamp_str = timestamp.strftime('%Y%m%d%H%M%S')
        return f"CUST{customer_id}_{timestamp_str}"

    def wait_for_feature_group_creation_complete(self, feature_group):
        status = feature_group.describe().get("FeatureGroupStatus")
        while status == "Creating":
            print("Waiting for Feature Group Creation")
            time.sleep(5)
            status = feature_group.describe().get("FeatureGroupStatus")
        if status != "Created":
            raise RuntimeError(f"Failed to create feature group {feature_group.name}")
        print(f"FeatureGroup {feature_group.name} successfully created.")

    def create_feature_group(self, feature_group_name, role_arn):
        """Create feature group with feature definitions"""
        self.feature_group = FeatureGroup(
            name=feature_group_name,
            feature_definitions=[
                FeatureDefinition("record_id", FeatureTypeEnum.STRING),
                FeatureDefinition("customer_id", FeatureTypeEnum.STRING),
                FeatureDefinition("purchase_timestamp", FeatureTypeEnum.STRING),
                FeatureDefinition("purchase_value", FeatureTypeEnum.FRACTIONAL),
                FeatureDefinition("avg_purchase_value", FeatureTypeEnum.FRACTIONAL),
                FeatureDefinition("loyalty_score", FeatureTypeEnum.FRACTIONAL),
            ],
            sagemaker_session=self.sagemaker_session,
        )

        self.feature_group.create(
            s3_uri=f"s3://{self.s3_bucket}/feature-store/{feature_group_name}",
            record_identifier_name="record_id",
            event_time_feature_name="purchase_timestamp",
            role_arn=role_arn,
            enable_online_store=True,
        )

        self.wait_for_feature_group_creation_complete(self.feature_group)

    def prepare_features(self, historical_df):
        """Calculate features from historical data"""
        # Convert and format timestamp
        historical_df["purchase_timestamp"] = pd.to_datetime(historical_df["purchase_timestamp"])
        historical_df["purchase_timestamp"] = historical_df["purchase_timestamp"].dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

        # Sort the dataframe
        historical_df = historical_df.sort_values(["customer_id", "purchase_timestamp"])

        # Calculate average purchase values
        historical_df["avg_purchase_value"] = historical_df.groupby("customer_id")["purchase_value"].transform(
            lambda x: x.shift(1).expanding().mean()
        ).fillna(0)

        # Generate record IDs
        historical_df['record_id'] = historical_df.apply(
            lambda row: self.create_record_id(
                row['customer_id'],
                datetime.strptime(row['purchase_timestamp'], "%Y-%m-%dT%H:%M:%S.%fZ")
            ),
            axis=1
        )

        return historical_df[
            ["record_id", "customer_id", "purchase_timestamp", "purchase_value", 
             "avg_purchase_value", "loyalty_score"]
        ]

    def ingest_features(self, features_df):
        """Ingest features into Feature Store"""
        if self.feature_group is None:
            raise ValueError("Feature group not created. Call create_feature_group first.")
        
        self.feature_group.ingest(data_frame=features_df, max_workers=1, wait=True)
        return features_df[['customer_id', 'record_id']].to_dict('records')

    def get_record(self, record_id):
        """Retrieve a specific record from the feature store"""
        feature_store_runtime = boto3.Session().client(
            service_name="sagemaker-featurestore-runtime",
            region_name=self.sagemaker_session.boto_region_name
        )

        try:
            response = feature_store_runtime.get_record(
                FeatureGroupName=self.feature_group.name,
                RecordIdentifierValueAsString=record_id
            )
            
            features = {}
            for feature in response["Record"]:
                features[feature["FeatureName"]] = feature["ValueAsString"]
            return features

        except Exception as e:
            print(f"Error retrieving record {record_id}: {e}")
            return None

In [None]:
import sagemaker
sagemaker_session = sagemaker.Session()

# Configuration
FEATURE_GROUP_NAME = "customer_purchase_features"
MAPPING_TABLE_NAME = "customer_record_mapping"
ROLE_ARN = "arn:aws:iam::YOUR_ACCOUNT_ID:role/YOUR_ROLE_NAME"

feature_store = SageMakerFeatureStore(sagemaker_session)
mapping_store = CustomerMappingStore(MAPPING_TABLE_NAME, sagemaker_session.boto_region_name)

mapping_store.create_table()
feature_store.create_feature_group(FEATURE_GROUP_NAME, ROLE_ARN)


features_df = feature_store.prepare_features(historical_df)       
# Ingest to feature store and get mappings
mappings = feature_store.ingest_features(features_df)
# Update mapping store
mapping_store.batch_update_mappings(mappings)


record_id = mapping_store.get_latest_record_id('100')
if not record_id:
    print(None)
feature_store.get_record(record_id)


In [None]:
# from botocore.exceptions import ClientError
# class FeatureStoreManager:
#     def __init__(self, sagemaker_session, mapping_table_name="customer_record_mapping"):
#         self.sagemaker_session = sagemaker_session
#         self.feature_group = None
#         self.s3_bucket = default_s3_bucket_name
#         self.mapping_table_name = mapping_table_name
        
#         # Initialize DynamoDB client
#         self.dynamodb = boto3.resource('dynamodb', region_name=self.sagemaker_session.boto_region_name)
#         self.mapping_table = self.dynamodb.Table(mapping_table_name)

#     def create_mapping_table(self):
#         """Create DynamoDB table for customer_id to record_id mapping if it doesn't exist"""
#         try:
#             self.dynamodb.create_table(
#                 TableName=self.mapping_table_name,
#                 KeySchema=[
#                     {'AttributeName': 'customer_id', 'KeyType': 'HASH'}
#                 ],
#                 AttributeDefinitions=[
#                     {'AttributeName': 'customer_id', 'AttributeType': 'S'}
#                 ],
#                 BillingMode='PAY_PER_REQUEST'
#             )
#             print(f"Created mapping table {self.mapping_table_name}")
#             # Wait for table to be created
#             time.sleep(10)
#         except ClientError as e:
#             if e.response['Error']['Code'] != 'ResourceInUseException':
#                 raise

#     def update_customer_mapping(self, customer_id, record_id):
#         """Update the mapping of customer_id to latest record_id"""
#         try:
#             self.mapping_table.put_item(
#                 Item={
#                     'customer_id': str(customer_id),
#                     'record_id': str(record_id),
#                     'last_updated': int(time.time())
#                 }
#             )
#         except Exception as e:
#             print(f"Error updating mapping for customer {customer_id}: {e}")
#             raise

#     def batch_update_mappings(self, mappings):
#         """
#         Batch update customer_id to record_id mappings
        
#         Args:
#             mappings: List of dictionaries containing customer_id and record_id
#         """
#         try:
#             with self.mapping_table.batch_writer() as batch:
#                 for item in mappings:
#                     batch.put_item(
#                         Item={
#                             'customer_id': str(item['customer_id']),
#                             'record_id': str(item['record_id']),
#                             'last_updated': int(time.time())
#                         }
#                     )
#         except Exception as e:
#             print(f"Error batch updating mappings: {e}")
#             raise

#     def get_latest_record_id(self, customer_id):
#         """Get the latest record_id for a customer"""
#         try:
#             response = self.mapping_table.get_item(
#                 Key={'customer_id': str(customer_id)}
#             )
#             if 'Item' in response:
#                 return response['Item']['record_id']
#             return None
#         except Exception as e:
#             print(f"Error getting mapping for customer {customer_id}: {e}")
#             return None

#     @staticmethod
#     def create_record_id(customer_id, timestamp):
#         """
#         Create a composite record_id from customer_id and timestamp
#         Format: CUST{customer_id}_{timestamp}
#         """
#         if isinstance(timestamp, str):
#             # Convert ISO format to datetime
#             timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ")
        
#         timestamp_str = timestamp.strftime('%Y%m%d%H%M%S')
#         return f"CUST{customer_id}_{timestamp_str}"

#     def wait_for_feature_group_creation_complete(self, feature_group):
#         status = feature_group.describe().get("FeatureGroupStatus")
#         while status == "Creating":
#             print("Waiting for Feature Group Creation")
#             time.sleep(5)
#             status = feature_group.describe().get("FeatureGroupStatus")
#         if status != "Created":
#             raise RuntimeError(f"Failed to create feature group {feature_group.name}")
#         print(f"FeatureGroup {feature_group.name} successfully created.")

#     def create_feature_group(self):
#         """Create feature group with updated feature definitions"""
#         self.feature_group = FeatureGroup(
#             name="customer_purchase_features",
#             feature_definitions=[
#                 FeatureDefinition("record_id", FeatureTypeEnum.STRING),
#                 FeatureDefinition("customer_id", FeatureTypeEnum.INTEGRAL),
#                 FeatureDefinition("purchase_timestamp", FeatureTypeEnum.STRING),
#                 FeatureDefinition("purchase_value", FeatureTypeEnum.FRACTIONAL),
#                 FeatureDefinition("avg_purchase_value", FeatureTypeEnum.FRACTIONAL),
#                 FeatureDefinition("loyalty_score", FeatureTypeEnum.FRACTIONAL),
#             ],
#             sagemaker_session=self.sagemaker_session,
#         )

#         self.feature_group.create(
#             s3_uri=f"s3://{self.s3_bucket}/feature-store/customer_features",
#             record_identifier_name="record_id",
#             event_time_feature_name="purchase_timestamp",
#             role_arn=role,
#             enable_online_store=True,
#         )

#         self.wait_for_feature_group_creation_complete(self.feature_group)
        
#         # Create the mapping table after feature group is created
#         self.create_mapping_table()

#     def prepare_initial_features(self, historical_df):
#         """Calculate initial features from historical data"""
#         # Convert purchase_timestamp to datetime
#         historical_df["purchase_timestamp"] = pd.to_datetime(
#             historical_df["purchase_timestamp"]
#         )

#         # Ensure purchase_timestamp is in correct ISO format
#         historical_df["purchase_timestamp"] = historical_df["purchase_timestamp"].dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

#         # Sort the dataframe by customer_id and purchase_timestamp
#         historical_df = historical_df.sort_values(["customer_id", "purchase_timestamp"])

#         # Function to calculate average of past purchases
#         def past_purchases_avg(group):
#             return group.shift(1).expanding().mean()

#         # Calculate the average of past purchases for each customer
#         historical_df["avg_purchase_value"] = historical_df.groupby("customer_id")[
#             "purchase_value"
#         ].transform(past_purchases_avg)

#         # Replace NaN (for first purchases) with 0
#         historical_df["avg_purchase_value"] = historical_df[
#             "avg_purchase_value"
#         ].fillna(0)

#         # Generate record IDs using the new format
#         historical_df['record_id'] = historical_df.apply(
#             lambda row: self.create_record_id(
#                 row['customer_id'], 
#                 datetime.strptime(row['purchase_timestamp'], "%Y-%m-%dT%H:%M:%S.%fZ")
#             ),
#             axis=1
#         )

#         historical_df = historical_df[
#             [   
#                 "record_id",
#                 "customer_id",
#                 "purchase_timestamp",
#                 "purchase_value",
#                 "avg_purchase_value",
#                 "loyalty_score",
#             ]
#         ]

#         return historical_df

#     def ingest_features(self, features_df):
#         """
#         Ingest features into Feature Store and update DynamoDB mappings
        
#         Args:
#             features_df: DataFrame containing features to ingest
#         """
#         # First ingest the features
#         self.feature_group.ingest(data_frame=features_df, max_workers=1, wait=True)
        
#         # After successful ingestion, update the DynamoDB mappings
#         # Group by customer_id and get the latest record for each customer
#         latest_records = features_df.sort_values('purchase_timestamp').groupby('customer_id').last().reset_index()
        
#         # Prepare mappings for batch update
#         mappings = [
#             {
#                 'customer_id': str(row['customer_id']),
#                 'record_id': str(row['record_id'])
#             }
#             for _, row in latest_records.iterrows()
#         ]
        
#         # Update DynamoDB mappings in batch
#         self.batch_update_mappings(mappings)
        
#         print(f"Ingested {len(features_df)} records and updated {len(mappings)} customer mappings")

#     def get_latest_features(self, customer_id):
#         """Retrieve latest features for a customer from online store"""
#         feature_store_runtime = boto3.Session().client(
#             service_name="sagemaker-featurestore-runtime", 
#             region_name=self.sagemaker_session.boto_region_name
#         )

#         try:
#             # Get the latest record_id from DynamoDB
#             record_id = self.get_latest_record_id(customer_id)
#             if not record_id:
#                 print(f"No record_id mapping found for customer {customer_id}")
#                 return None

#             # Get the record using the record_id
#             response = feature_store_runtime.get_record(
#                 FeatureGroupName=self.feature_group.name,
#                 RecordIdentifierValueAsString=record_id
#             )
            
#             # Convert to dictionary format
#             features = {}
#             for feature in response["Record"]:
#                 features[feature["FeatureName"]] = feature["ValueAsString"]
#             return features

#         except Exception as e:
#             print(f"Error retrieving features for customer {customer_id}: {e}")
#             return None

#     def update_customer_features(self, customer_id, new_purchase_value, new_loyalty_score):
#         """Update customer features with new purchase data"""
#         current_features = self.get_latest_features(customer_id)

#         if current_features:
#             # Calculate new averages
#             old_avg_purchase = float(current_features["avg_purchase_value"])
#             new_avg_purchase = (old_avg_purchase + new_purchase_value) / 2
#         else:
#             new_avg_purchase = new_purchase_value

#         # Format current timestamp in ISO-8601 format
#         current_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
        
#         # Generate new record ID using the composite format
#         new_record_id = self.create_record_id(customer_id, datetime.now())

#         # Prepare new record
#         record = {
#             "record_id": new_record_id,
#             "customer_id": str(customer_id),
#             "purchase_timestamp": current_time,
#             "purchase_value": str(new_purchase_value),
#             "avg_purchase_value": str(new_avg_purchase),
#             "loyalty_score": str(new_loyalty_score),
#         }

#         # Update feature store
#         feature_store_runtime = boto3.Session().client(
#             service_name="sagemaker-featurestore-runtime",
#             region_name=self.sagemaker_session.boto_region_name
#         )

#         feature_store_runtime.put_record(
#             FeatureGroupName=self.feature_group.name,
#             Record=[{"FeatureName": k, "ValueAsString": v} for k, v in record.items()],
#         )

#         # Update the DynamoDB mapping
#         self.update_customer_mapping(customer_id, new_record_id)

#         return record

In [None]:
import boto3
import time
import pickle
import pandas as pd
from datetime import datetime
from sklearn.linear_model import LinearRegression
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.feature_store.feature_definition import FeatureDefinition, FeatureTypeEnum
from sagemaker.session import Session

In [None]:
region = boto3.Session().region_name

boto_session = boto3.Session(region_name=region)

sagemaker_client = boto_session.client(service_name="sagemaker", region_name=region)
featurestore_runtime = boto_session.client(
    service_name="sagemaker-featurestore-runtime", region_name=region
)

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime,
)

In [None]:
default_s3_bucket_name = feature_store_session.default_bucket()
prefix = "sagemaker-featurestore-demo"
role = "arn:aws:iam::210399391398:role/service-role/AmazonSageMaker-ExecutionRole-20241221T151049"

print(default_s3_bucket_name)

In [None]:
class FeatureStoreManager:
    def __init__(self, sagemaker_session):
        self.sagemaker_session = sagemaker_session
        self.feature_group = None
        self.s3_bucket = default_s3_bucket_name

    # @staticmethod
    # def create_record_id(customer_id, timestamp):
    #     """Create a composite record_id from customer_id and timestamp"""
    #     if isinstance(timestamp, str):
    #         timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ")
    #     timestamp_str = timestamp.strftime('%Y%m%d%H%M%S')
    #     return f"CUST{customer_id}_{timestamp_str}"

    def wait_for_feature_group_creation_complete(self, feature_group):
        status = feature_group.describe().get("FeatureGroupStatus")
        while status == "Creating":
            print("Waiting for Feature Group Creation")
            time.sleep(5)
            status = feature_group.describe().get("FeatureGroupStatus")
        if status != "Created":
            raise RuntimeError(f"Failed to create feature group {feature_group.name}")
        print(f"FeatureGroup {feature_group.name} successfully created.")

    def create_feature_group(self):
        """Create feature group with updated feature definitions"""
        self.feature_group = FeatureGroup(
            name="customer_purchase_features",
            feature_definitions=[
                #FeatureDefinition("record_id", FeatureTypeEnum.STRING),
                FeatureDefinition("customer_id", FeatureTypeEnum.STRING),
                FeatureDefinition("purchase_timestamp", FeatureTypeEnum.STRING),
                FeatureDefinition("latest_purchase_value", FeatureTypeEnum.FRACTIONAL),
                FeatureDefinition("avg_purchase_value", FeatureTypeEnum.FRACTIONAL),
                FeatureDefinition("latest_loyalty_score", FeatureTypeEnum.FRACTIONAL),
                FeatureDefinition("avg_loyalty_score", FeatureTypeEnum.FRACTIONAL),
            ],
            sagemaker_session=self.sagemaker_session,
        )

        self.feature_group.create(
            s3_uri=f"s3://{self.s3_bucket}/feature-store/customer_features",
            record_identifier_name="customer_id",
            event_time_feature_name="purchase_timestamp",
            role_arn=role,
            enable_online_store=True,
        )

        self.wait_for_feature_group_creation_complete(self.feature_group)

    def prepare_initial_features(self, historical_df):
        """Calculate initial features from historical data"""
        # Convert purchase_timestamp to datetime
        historical_df["purchase_timestamp"] = pd.to_datetime(historical_df["purchase_timestamp"])
        
        # Group by customer_id to calculate features
        customer_features = historical_df.groupby('customer_id').agg({
            'purchase_timestamp': 'max',  # Get the latest timestamp
            'purchase_value': ['last', 'mean'],  # Get latest and average purchase
            'loyalty_score': ['last', 'mean']  # Get latest and average loyalty score
        }).reset_index()
        
        # Flatten column names
        customer_features.columns = [
            'customer_id', 'purchase_timestamp', 
            'latest_purchase_value', 'avg_purchase_value',
            'latest_loyalty_score', 'avg_loyalty_score'
        ]
        
        # Format timestamp
        customer_features["purchase_timestamp"] = customer_features["purchase_timestamp"].dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
        
        # Generate record IDs
        # customer_features['record_id'] = customer_features.apply(
        #     lambda row: self.create_record_id(
        #         row['customer_id'], 
        #         datetime.strptime(row['purchase_timestamp'], "%Y-%m-%dT%H:%M:%S.%fZ")
        #     ),
        #     axis=1
        # )

        return customer_features

    def ingest_features(self, features_df):
        """Ingest features into Feature Store"""
        self.feature_group.ingest(data_frame=features_df, max_workers=1, wait=True)
    
    # Helper to parse the feature value from the record.
    def get_feature_value(self, record, feature_name):
        return str(list(filter(lambda r: r["FeatureName"] == feature_name, record))[0]["ValueAsString"])

    def get_latest_features(self, customer_id):
        """Retrieve latest features for a customer from online store"""
        feature_store_runtime = boto3.Session().client(
            service_name="sagemaker-featurestore-runtime", 
            region_name=self.sagemaker_session.boto_region_name
        )

        # record_identifier_value = "100"

        # featurestore_runtime.get_record(
        #     FeatureGroupName="customer_purchase_features",
        #     RecordIdentifierValueAsString=record_identifier_value,
        # )

        #try:
        print(customer_id)
        response = feature_store_runtime.get_record(FeatureGroupName="customer_purchase_features", RecordIdentifierValueAsString=customer_id)
        if 'Record' not in response:
            print(f"No records found for customer {customer_id}")
            return None 
        record = response["Record"]

        customer_features = {
            "customer_id": self.get_feature_value(record, "customer_id"),
            "purchase_timestamp": self.get_feature_value(record, "purchase_timestamp"),
            "latest_purchase_value": self.get_feature_value(record, "latest_purchase_value"),
            "avg_purchase_value": self.get_feature_value(record, "avg_purchase_value"),
            "latest_loyalty_score": self.get_feature_value(record, "latest_loyalty_score"),
            "avg_loyalty_score": self.get_feature_value(record, "avg_loyalty_score"),
        }
        
        # # Sort records by record_id (which includes timestamp) to get the latest
        # records = response['Records'][0]
        # latest_record = max(records, key=lambda x: x['RecordIdentifier'])
        
        # Convert to dictionary format
        # features = {}
        # for feature in response["Record"]:
        #     features[feature["FeatureName"]] = feature["ValueAsString"]
        
        return customer_features

        # except Exception as e:
        #     print(f"Customer {customer_id} not found in feature store: {e}")
        #     return None

    def update_customer_features(self, customer_id, new_purchase_value, new_loyalty_score):
        """Update customer features with new purchase data and predicted loyalty score"""
        current_features = self.get_latest_features(customer_id)

        if current_features:
            # Calculate new averages
            old_avg_purchase = float(current_features["avg_purchase_value"])
            new_avg_purchase = (old_avg_purchase + new_purchase_value) / 2
            
            old_avg_loyalty = float(current_features["avg_loyalty_score"])
            new_avg_loyalty = (old_avg_loyalty + new_loyalty_score) / 2
        else:
            new_avg_purchase = new_purchase_value
            new_avg_loyalty = new_loyalty_score

        # Format current timestamp
        current_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
        
        # Generate new record ID
        #new_record_id = self.create_record_id(customer_id, datetime.now())

        # Prepare new record
        record = {
            "customer_id": str(customer_id),
            "purchase_timestamp": current_time,
            "latest_purchase_value": str(new_purchase_value),
            "avg_purchase_value": str(new_avg_purchase),
            "latest_loyalty_score": str(new_loyalty_score),
            "avg_loyalty_score": str(new_avg_loyalty)
        }

        # Update feature store
        feature_store_runtime = boto3.Session().client(
            service_name="sagemaker-featurestore-runtime", 
            region_name=self.sagemaker_session.boto_region_name
        )

        feature_store_runtime.put_record(
            FeatureGroupName=self.feature_group.name,
            Record=[{"FeatureName": k, "ValueAsString": v} for k, v in record.items()],
        )

        return record

In [None]:
historical_df = pd.read_csv("historical_data.csv")

In [None]:
historical_df.head()

In [None]:
feature_store_manager = FeatureStoreManager(feature_store_session)

In [None]:
feature_store_manager.create_feature_group()

In [None]:
# feature_store_manager.feature_group.describe()

In [None]:
initial_features = feature_store_manager.prepare_initial_features(historical_df)
initial_features.head()

In [None]:
initial_features[initial_features['customer_id']==100]

In [None]:
feature_store_manager.ingest_features(initial_features)

In [None]:
# ## Get data of one record to verify insertion

record_identifier_value = "100"

featurestore_runtime.get_record(
    FeatureGroupName="customer_purchase_features",
    RecordIdentifierValueAsString=record_identifier_value,
)

In [None]:
# featurestore_runtime.batch_get_record(
#     Identifiers=[
#         {
#             "FeatureGroupName": "customer_purchase_features",
#             "RecordIdentifiersValueAsString": ["100"],
#         },
#     ]
# )

In [None]:
s3_client = boto3.client("s3", region_name=region)
account_id = boto3.client("sts").get_caller_identity()["Account"]
print(account_id)

feature_group_resolved_output_s3_uri = (
    feature_store_manager.feature_group.describe()
    .get("OfflineStoreConfig")
    .get("S3StorageConfig")
    .get("ResolvedOutputS3Uri")
)
feature_group_resolved_output_s3_uri

In [None]:
feature_group_s3_prefix = feature_group_resolved_output_s3_uri.replace(
    f"s3://{default_s3_bucket_name}/", ""
)
feature_group_s3_prefix

In [None]:
from time import sleep
offline_store_contents = None
while offline_store_contents is None:
    objects_in_bucket = s3_client.list_objects(
        Bucket=default_s3_bucket_name, Prefix=feature_group_s3_prefix
    )
    if "Contents" in objects_in_bucket and len(objects_in_bucket["Contents"]) > 1:
        offline_store_contents = objects_in_bucket["Contents"]
    else:
        print("Waiting for data in offline store...\n")
        sleep(60)

print("Data available.")

In [None]:
class ModelTrainer:
    @staticmethod
    def build_training_dataset(feature_store_manager):
        """Build training dataset from Feature Store"""
        try:
            customer_query = feature_store_manager.feature_group.athena_query()
            customer_table = customer_query.table_name
            
            query_string = f"""
            SELECT 
                customer_id,
                latest_purchase_value,
                avg_purchase_value,
                latest_loyalty_score,
                avg_loyalty_score
            FROM "{customer_table}"
            ORDER BY customer_id, purchase_timestamp DESC
            """
            
            customer_query.run(
                query_string=query_string,
                output_location=f"s3://{default_s3_bucket_name}/query_results/"
            )
            customer_query.wait()
            train_df = customer_query.as_dataframe()
            
            # Prepare features and target
            X = train_df[['latest_purchase_value', 'avg_purchase_value', 'avg_loyalty_score']]
            y = train_df['latest_loyalty_score']  # Current loyalty score becomes target
            
            return X, y
            
        except Exception as e:
            print(f"Error in build_training_dataset: {str(e)}")
            raise

    @staticmethod
    def train_model(X, y):
        """Train a simple linear regression model"""
        model = LinearRegression()
        model.fit(X, y)
        
        # Save model locally
        with open('loyalty_predictor.pkl', 'wb') as f:
            pickle.dump(model, f)
        
        return model

# Usage example
print("\nTraining model using Feature Store data...")
trainer = ModelTrainer()
X, y = trainer.build_training_dataset(feature_store_manager)
model = trainer.train_model(X, y)

In [None]:
X

In [None]:
feature_store_manager.get_latest_features(customer_id=str(1))

In [None]:
feature_store_manager.update_customer_features(
str(1),
400,
3  # Use predicted loyalty score as the new latest_loyalty_score
)

In [None]:
class RealTimeInference:
    def __init__(self, feature_store_manager, model_path='loyalty_predictor.pkl'):
        self.feature_store_manager = feature_store_manager
        with open(model_path, 'rb') as f:
            self.model = pickle.load(f)

    def process_event(self, event):
        """Process a single real-time event"""
        customer_id = event['customer_id']
        purchase_amount = float(event['purchase_value'])
        #real_loyalty = float(event['loyalty_score'])

        # Get historical features
        customer_features = self.feature_store_manager.get_latest_features(customer_id)
        
        if customer_features:
            # in case customer already processed we enrich the data from feature store
            features = {
                'latest_purchase_value': purchase_amount,
                'avg_purchase_value': float(customer_features['avg_purchase_value']),
                #'latest_loyalty_score': float(customer_features['latest_loyalty_score']),
                'avg_loyalty_score': float(customer_features['avg_loyalty_score'])
            }
        else:
            new_customer_loyalty_score = 0
            features = {
                'latest_purchase_value': purchase_amount,
                'avg_purchase_value': purchase_amount,
                #'latest_loyalty_score': new_customer_loyalty_score,
                'avg_loyalty_score': new_customer_loyalty_score
            }

        # Make prediction
        prediction = self.model.predict([[
            features['latest_purchase_value'],
            features['avg_purchase_value'],
            #features['latest_loyalty_score'],
            features['avg_loyalty_score']
        ]])[0]

        # Update feature store with predicted loyalty score
        self.feature_store_manager.update_customer_features(
            customer_id,
            purchase_amount,
            prediction  # Use predicted loyalty score as the new latest_loyalty_score
        )

        return {
            'customer_id': customer_id,
            'predicted_loyalty': prediction,
            #'ground_truth_loyalty': real_loyalty
        }

In [None]:
df = pd.read_csv("realtime_simulation.csv")
# First, ensure the purchase_timestamp is in datetime format
df['purchase_timestamp'] = pd.to_datetime(df['purchase_timestamp'])

# Now, sort the DataFrame by purchase_timestamp in ascending order
df_sorted = df.sort_values('purchase_timestamp', ascending=True)
# If you want to update the original DataFrame
df = df_sorted

# Optionally, reset the index if needed
#df = df.reset_index(drop=True)
df

In [None]:

inferencer = RealTimeInference(feature_store_manager)
results =[]
# Iterate over the DataFrame and create a list of dictionaries
events = []
for _, row in df.iterrows():
    event = {
        'customer_id': str(row['customer_id']),
        'purchase_value': row['purchase_value'],
        #'loyalty_score': row['loyalty_score']
    }
    result = inferencer.process_event(event)
    results.append(result['predicted_loyalty'])
    events.append(event)

df['prediction'] = results

In [None]:
df

In [None]:
feature_store_manager.get_latest_features(customer_id=str(20))

In [None]:
# First, ensure the purchase_timestamp is in datetime format
df['purchase_timestamp'] = pd.to_datetime(df['purchase_timestamp'])

# Group by customer_id and get the index of the latest purchase for each customer
latest_purchase_indices = df.groupby('customer_id')['purchase_timestamp'].idxmax()

# Use these indices to select the rows with the latest purchase for each customer
latest_purchases = df.loc[latest_purchase_indices]

# Sort the result by customer_id for better readability
latest_purchases = latest_purchases.sort_values('customer_id')

# Reset the index if needed
latest_purchases = latest_purchases.reset_index(drop=True)

latest_purchases

In [None]:
feature_store_manager.get_latest_features(customer_id=str(95))

In [None]:
for _, row in latest_purchases.iterrows():
    customer = str(row['customer_id'])
    d = feature_store_manager.get_latest_features(customer_id=str(customer))
    assert d['customer_id'] == customer
    assert d['latest_purchase_value'] == str(row['purchase_value'])
    assert d['latest_loyalty_score'] == str(row['prediction'])


In [None]:
df[df['customer_id']==20]

In [None]:
d['latest_purchase_value']

In [None]:
row['purchase_value']

In [None]:
df

In [None]:
class RealTimeInference:
    def __init__(self, feature_store_manager, model_path='loyalty_predictor.pkl'):
        self.feature_store_manager = feature_store_manager
        with open(model_path, 'rb') as f:
            self.model = pickle.load(f)
    
    def process_event(self, event):
        """Process a single real-time event"""
        customer_id = event['customer_id']
        purchase_amount = event['purchase_value']
        current_loyalty = event['loyalty_score']
        
        # Get historical features
        customer_features = self.feature_store_manager.get_latest_features(customer_id)
        
        if customer_features:
            features = {
                'avg_purchase_amount': float(customer_features['avg_purchase_amount']),
                'latest_purchase_amount': purchase_amount,  # Use current purchase as latest
                'avg_loyalty_score': float(customer_features['avg_loyalty_score']),
                'latest_loyalty_score': float(customer_features['latest_loyalty_score'])
            }
        else:
            features = {
                'avg_purchase_amount': purchase_amount,
                'latest_purchase_amount': purchase_amount,
                'avg_loyalty_score': current_loyalty,
                'latest_loyalty_score': current_loyalty
            }
        
        # Make prediction
        prediction = self.model.predict([[
            features['avg_purchase_amount'],
            features['latest_purchase_amount'],
            features['avg_loyalty_score'],
            features['latest_loyalty_score']
        ]])[0]
        
        # Update feature store
        self.feature_store_manager.update_customer_features(
            customer_id,
            purchase_amount,
            current_loyalty
        )
        
        return {
            'customer_id': customer_id,
            'predicted_loyalty': prediction,
            'actual_loyalty': current_loyalty
        }


In [1]:
import pandas as pd
f=pd.read_csv("inspection_data.csv")

In [2]:
f

Unnamed: 0,customer_id,latest_purchase_timestamp,latest_purchase_value_df,predicted_loyalty_score_df,latest_purchase_value_fs,latest_loyalty_score_fs,avg_purchase_value_fs,avg_loyalty_score_fs
0,1,2022-03-16 07:23:59,398.50,4.481954,398.50,4.481954,367.421667,3.985977
1,2,2022-08-09 00:54:07,448.05,2.698069,448.05,2.698069,448.050000,2.698069
2,4,2022-04-13 22:26:04,284.28,6.958193,284.28,6.958193,218.560000,7.664097
3,5,2022-10-14 15:20:27,414.73,6.958654,414.73,6.958654,335.588333,7.609327
4,6,2022-03-08 02:39:55,44.12,4.413678,44.12,4.413678,232.135000,3.961839
...,...,...,...,...,...,...,...,...
59,93,2022-11-01 20:34:21,17.05,6.561586,17.05,6.561586,69.110000,7.162222
60,95,2022-03-17 20:43:04,36.65,4.843353,36.65,4.843353,200.718333,4.601676
61,96,2022-07-13 14:39:11,30.75,6.282132,30.75,6.282132,136.958333,6.726066
62,98,2022-01-08 15:44:58,444.19,6.312584,444.19,6.312584,443.410000,6.621292


In [63]:
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.session import Session
import boto3


feature_group_name = "customer_purchase_features"

boto_session = boto3.Session()
sagemaker_client = boto_session.client(service_name="sagemaker")
featurestore_runtime = boto_session.client(service_name="sagemaker-featurestore-runtime")



feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)
s3_bucket_name = feature_store_session.default_bucket()
# Create a FeatureGroup object for the existing feature group
feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session)



# Get the Athena query results as a dataframe
query = feature_group.athena_query()
customer_table = query.table_name
query.run(query_string=f"SELECT * FROM {customer_table}", output_location=f"s3://{s3_bucket_name}/feature-store/customer_features")
query.wait()
train_df = query.as_dataframe()



In [64]:
train_df

Unnamed: 0,customer_id,purchase_timestamp,latest_purchase_value,avg_purchase_value,avg_loyalty_score,latest_loyalty_score,write_time,api_invocation_time,is_deleted
0,17,2022-09-16T08:09:01.000000Z,167.33,354.890000,7.750000,5.246667,2024-12-28 19:32:24.894,2024-12-28 19:27:04.000,False
1,50,2022-08-29T10:12:49.000000Z,476.20,360.220000,9.950000,5.972500,2024-12-28 19:32:24.879,2024-12-28 19:27:09.000,False
2,46,2024-12-28T16:34:16.061517Z,328.97,328.970000,2.639977,2.639977,2024-12-28 19:39:15.433,2024-12-28 19:34:16.000,False
3,48,2022-06-24T17:25:48.000000Z,466.00,466.000000,5.480000,5.480000,2024-12-28 19:32:24.914,2024-12-28 19:27:09.000,False
4,92,2022-09-11T12:48:34.000000Z,20.99,260.225000,4.240000,4.090000,2024-12-28 19:32:24.830,2024-12-28 19:27:17.000,False
...,...,...,...,...,...,...,...,...,...
178,66,2022-11-16T09:40:56.000000Z,278.41,263.106667,9.810000,6.263333,2024-12-28 19:32:24.866,2024-12-28 19:27:12.000,False
179,33,2022-06-08T20:12:20.000000Z,115.17,115.170000,4.750000,4.750000,2024-12-28 19:32:24.833,2024-12-28 19:27:07.000,False
180,35,2024-12-28T16:33:52.196862Z,206.71,241.827500,2.880485,3.680970,2024-12-28 19:38:45.275,2024-12-28 19:33:52.000,False
181,82,2024-12-28T16:34:57.731470Z,252.89,272.420000,2.218766,3.237531,2024-12-28 19:38:45.275,2024-12-28 19:34:57.000,False


In [60]:
from core import utils
customer_id = str(44)
response = featurestore_runtime.get_record(
            FeatureGroupName="customer_purchase_features",
            RecordIdentifierValueAsString=customer_id,
        )

record = response["Record"]

customer_features = {
    "customer_id": utils.get_feature_value(record, "customer_id"),
    "purchase_timestamp": utils.get_feature_value(record, "purchase_timestamp"),
    "latest_purchase_value": utils.get_feature_value(
        record, "latest_purchase_value"
    ),
    "avg_purchase_value": utils.get_feature_value(record, "avg_purchase_value"),
    "avg_loyalty_score": utils.get_feature_value(record, "avg_loyalty_score"),
    "latest_loyalty_score": utils.get_feature_value(
        record, "latest_loyalty_score"
    ),
}
customer_features

{'customer_id': '44',
 'purchase_timestamp': '2024-12-28T16:34:38.788282Z',
 'latest_purchase_value': '405.51',
 'avg_purchase_value': '243.85',
 'avg_loyalty_score': '3.702876239800732',
 'latest_loyalty_score': '4.2531591263570885'}

In [53]:
train_df[train_df["customer_id"]==75]

Unnamed: 0,customer_id,purchase_timestamp,latest_purchase_value,avg_purchase_value,avg_loyalty_score,latest_loyalty_score,write_time,api_invocation_time,is_deleted
11,75,2024-12-28T13:34:50.309102Z,108.54,219.79,3.831694,4.323389,2024-12-28 16:39:49.254,2024-12-28 16:34:50.000,False
13,75,2024-12-28T13:36:02.414682Z,41.36,130.575,4.184726,4.537759,2024-12-28 16:39:49.254,2024-12-28 16:36:02.000,False
14,75,2024-12-28T13:36:03.727881Z,464.28,297.4275,4.501495,4.818263,2024-12-28 16:39:49.254,2024-12-28 16:36:03.000,False
57,75,2022-11-08T02:13:37.000000Z,331.04,331.04,3.34,3.34,2024-12-28 16:33:08.652,2024-12-28 16:27:35.000,False


In [23]:
len(train_df.customer_id.unique())

100

In [47]:
import pandas as pd
dd =pd.read_csv("data/inference_data.csv")

dd

Unnamed: 0,customer_id,purchase_timestamp,purchase_value,loyalty_score
0,80,2022-07-29 10:31:41,442.01,2.09
1,20,2022-06-26 19:55:11,284.25,1.95
2,13,2022-05-13 22:06:27,61.07,4.46
3,32,2022-10-23 02:07:22,465.32,4.65
4,63,2022-03-20 00:13:33,224.45,3.11
...,...,...,...,...
87,100,2022-01-29 16:35:12,40.66,2.89
88,43,2022-06-21 03:57:59,204.85,7.86
89,39,2022-04-11 17:14:50,222.57,9.19
90,93,2022-11-01 20:34:21,17.05,7.88


customer_id
75    3
44    3
69    3
39    2
28    2
     ..
79    1
95    1
25    1
37    1
43    1
Name: count, Length: 64, dtype: int64

In [49]:
dd[dd["customer_id"]==16]

Unnamed: 0,customer_id,purchase_timestamp,purchase_value,loyalty_score
76,16,2022-06-11 03:17:36,69.3,4.77
82,16,2022-04-20 11:47:32,413.66,9.6


In [None]:
# 1- Test historical data storage and retrieval:
## All the customer id in the test data must be in the feature store
## All transactions made by each customer must be in the feature store

# 2- 

In [43]:
import pandas as pd
import numpy as np
import boto3
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.session import Session

def get_offline_store_data(feature_group):
    """Fetch all data from offline feature store"""
    query = feature_group.athena_query()
    customer_table = query.table_name
    query.run(
        query_string=f"SELECT * FROM {customer_table}", 
        output_location=f"s3://{feature_group.sagemaker_session.default_bucket()}/feature-store/test_queries"
    )
    query.wait()
    return query.as_dataframe()

def get_random_customers_with_multiple_records(csv_path, min_records=2, num_customers=8):
    """Find random customers that appear multiple times in the CSV"""
    df = pd.read_csv(csv_path)
    customer_counts = df['customer_id'].value_counts()
    eligible_customers = customer_counts[customer_counts >= min_records].index.tolist()
    
    if len(eligible_customers) < num_customers:
        raise ValueError(f"Not enough customers with {min_records} or more records")
    
    return np.random.choice(eligible_customers, num_customers, replace=False).tolist()

def check_customer_records(customer_id, offline_df):
    """Check if a customer has enough records in the offline store"""
    customer_records = offline_df[offline_df['customer_id'] == customer_id]
    return len(customer_records) >= 2

# Setup Feature Store connection
feature_group_name = "customer_purchase_features"
boto_session = boto3.Session()
sagemaker_client = boto_session.client(service_name="sagemaker")
featurestore_runtime = boto_session.client(service_name="sagemaker-featurestore-runtime")
feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

# Create Feature Group object
feature_group = FeatureGroup(
    name=feature_group_name, 
    sagemaker_session=feature_store_session
)

# Test data consistency
def test_feature_store_data():
    # Get random customers from CSV
    csv_path = "data/inference_data.csv"  # Replace with your CSV path
    try:
        test_customers = get_random_customers_with_multiple_records(csv_path)
        print(f"Selected customers for testing: {test_customers}")
    except ValueError as e:
        print(f"Error: {str(e)}")
        return

    # Get offline store data
    offline_df = get_offline_store_data(feature_group)
    
    # Check each customer's records
    failed_customers = []
    for customer_id in test_customers:
        if not check_customer_records(customer_id, offline_df):
            failed_customers.append(customer_id)
    
    # Print results
    if failed_customers:
        print(f"The following customers have fewer than 2 records: {failed_customers}")
    else:
        print("All selected customers have at least 2 records in the offline store")

    return len(failed_customers) == 0


success = test_feature_store_data()
print(f"Test {'passed' if success else 'failed'}")

Selected customers for testing: [20, 65, 44, 39, 69, 28, 15, 86]


All selected customers have at least 2 records in the offline store
Test passed


In [54]:
import pandas as pd
import numpy as np
from core import utils

def get_random_customers(csv_path, num_customers=10):
    """Get random customers from CSV that appear more than once"""
    df = pd.read_csv(csv_path)
    customer_counts = df['customer_id'].value_counts()
    multiple_records = customer_counts[customer_counts > 1].index.tolist()
    return np.random.choice(multiple_records, num_customers, replace=False).tolist()

def test_online_vs_csv():
    # Get random customers from CSV
    csv_path = "data/inference_data.csv"  # Update with your CSV path
    test_customers = get_random_customers(csv_path)
    df = pd.read_csv(csv_path)
    test_customers.append(9)
    test_customers.append(18)
    print("Testing customers:", test_customers)
    
    for customer_id in test_customers:
        # Get latest CSV record
        customer_df = df[df['customer_id'] == customer_id]
        latest_csv = customer_df.sort_values('purchase_timestamp', ascending=False).iloc[0]
        
        # Get online store record
        response = featurestore_runtime.get_record(
            FeatureGroupName="customer_purchase_features",
            RecordIdentifierValueAsString=str(customer_id),
        )
        record = response["Record"]
        online_features = {
            "customer_id": utils.get_feature_value(record, "customer_id"),
            "purchase_timestamp": utils.get_feature_value(record, "purchase_timestamp"),
            "latest_purchase_value": float(utils.get_feature_value(record, "latest_purchase_value")),
        }
        
        # Compare values
        csv_purchase = float(latest_csv['purchase_value'])
        online_purchase = online_features['latest_purchase_value']
        
        print(f"\nCustomer: {customer_id}")
        print(f"CSV Purchase Value: {csv_purchase}")
        print(f"Online Purchase Value: {online_purchase}")
        print(f"CSV Timestamp: {latest_csv['purchase_timestamp']}")
        print(f"Online Timestamp: {online_features['purchase_timestamp']}")
        
        if abs(csv_purchase - online_purchase) > 0.0001:
            print("❌ Values don't match!")
        else:
            print("✅ Values match!")


test_online_vs_csv()

Testing customers: [63, 16, 88, 15, 9, 86, 56, 44, 65, 13, 9, 18]

Customer: 63
CSV Purchase Value: 22.34
Online Purchase Value: 22.34
CSV Timestamp: 2022-04-11 03:17:48
Online Timestamp: 2024-12-28T13:34:29.238626Z
✅ Values match!

Customer: 16
CSV Purchase Value: 69.3
Online Purchase Value: 69.3
CSV Timestamp: 2022-06-11 03:17:36
Online Timestamp: 2024-12-28T13:34:52.254019Z
✅ Values match!

Customer: 88
CSV Purchase Value: 432.14
Online Purchase Value: 432.14
CSV Timestamp: 2022-11-10 07:44:58
Online Timestamp: 2024-12-28T13:35:46.681431Z
✅ Values match!

Customer: 15
CSV Purchase Value: 52.68
Online Purchase Value: 52.68
CSV Timestamp: 2022-11-07 08:08:23
Online Timestamp: 2024-12-28T13:35:45.806961Z
✅ Values match!

Customer: 9
CSV Purchase Value: 459.86
Online Purchase Value: 459.86
CSV Timestamp: 2022-10-09 07:23:13
Online Timestamp: 2024-12-28T13:36:05.058671Z
✅ Values match!

Customer: 86
CSV Purchase Value: 385.59
Online Purchase Value: 385.59
CSV Timestamp: 2022-07-27 15:24: