<a href="https://colab.research.google.com/github/friedelj/ML540/blob/main/JFriedel_USD540_Assignment3_1b.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# JFriedel                                                     Assignment 3_1                                                     6-2-25

In [None]:
import boto3
import sagemaker

original_boto3_version = boto3.__version__
%pip install 'boto3>1.17.21'

In [None]:
from sagemaker.session import Session

region = boto3.Session().region_name

boto_session = boto3.Session(region_name=region)

sagemaker_client = boto_session.client(service_name="sagemaker", region_name=region)
featurestore_runtime = boto_session.client(
    service_name="sagemaker-featurestore-runtime", region_name=region
)

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime,
)

In [None]:
# You can modify the following to use a bucket of your choosing
default_s3_bucket_name = feature_store_session.default_bucket()
prefix = "sagemaker-featurestore-asmt3"

print(default_s3_bucket_name)

In [None]:
from sagemaker import get_execution_role

# You can modify the following to use a role of your choosing. See the documentation for how to create this.
role = get_execution_role()
print(role)

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import io

s3_client = boto3.client("s3", region_name=region)

fraud_detection_bucket_name = f"sagemaker-example-files-prod-{region}"
identity_file_key = ("housing_gmaps_data_raw.csv")
transaction_file_key = ("housing.csv")

# ------------DATA CLEANING

In [None]:
import pandas as pd
import numpy as np
from scipy.spatial import cKDTree
from sklearn.neighbors import NearestNeighbors
import re
from datetime import datetime
import time

In [None]:
# Load the CSV file
df_housing = pd.read_csv('housing.csv')

# Load the CSV file
df_gmaps = pd.read_csv('housing_gmaps_data_raw.csv')

In [None]:
# Merge df_housing with df_gmaps on 'longitude' and 'latitude'
df_housing = df_housing.merge(
    df_gmaps[['longitude', 'latitude', 'postal_code']],
    on=['longitude', 'latitude'],
    how='left'
)

# Save the updated df_housing to a CSV file
df_housing.to_csv('housing_with_postal_code.csv', index=False)

In [None]:
# Display total missing values per column
missing_per_column = df_housing.isnull().sum()
print("Missing values per column:\n", missing_per_column)

# Check if any value is missing in the entire DataFrame
any_missing = df_housing.isnull().values.any()
print("\nIs there any missing data in the file?:", any_missing)

In [None]:
# Compute the rounded average total_bedrooms for each postal_code
avg_bedrooms_by_postal = df_housing.groupby('postal_code')['total_bedrooms'].mean().round()

# Define a function to apply the group average to missing values
def fill_bedrooms(row):
    if pd.isna(row['total_bedrooms']):
        return avg_bedrooms_by_postal.get(row['postal_code'], np.nan)
    else:
        return row['total_bedrooms']

# Apply the function
df_housing['total_bedrooms'] = df_housing.apply(fill_bedrooms, axis=1)

# Save the modified DataFrame to CSV
df_housing.to_csv('housing_bedrooms_filled.csv', index=False)

In [None]:
# Display total missing values per column
missing_per_column = df_housing.isnull().sum()
print("Missing values per column:\n", missing_per_column)

# Check if any value is missing in the entire DataFrame
any_missing = df_housing.isnull().values.any()
print("\nIs there any missing data in the file?:", any_missing)

In [None]:
# Separate known and unknown postal_code rows
known = df_housing[df_housing['postal_code'].notna()].copy()
unknown = df_housing[df_housing['postal_code'].isna()].copy()

# Build a KDTree from known locations
tree = cKDTree(known[['latitude', 'longitude']])

# Query the closest known point for each unknown point
distances, indices = tree.query(unknown[['latitude', 'longitude']], k=1)

# Assign the closest known postal_code to the unknown rows
closest_postal_codes = known.iloc[indices]['postal_code'].values
df_housing.loc[unknown.index, 'postal_code'] = closest_postal_codes

# Save the updated DataFrame to CSV
df_housing.to_csv('housing_postal_filled.csv', index=False)

In [None]:
# Display total missing values per column
missing_per_column = df_housing.isnull().sum()
print("Missing values per column:\n", missing_per_column)

# Check if any value is missing in the entire DataFrame
any_missing = df_housing.isnull().values.any()
print("\nIs there any missing data in the file?:", any_missing)

In [None]:
# Add new column by dividing total_bedrooms by households and rounding the result
df_housing['bedrooms_per_household'] = (df_housing['total_bedrooms'] / df_housing['households']).round()

# Save the updated DataFrame to CSV
df_housing.to_csv('housing_bedrooms_per_household.csv', index=False)

In [None]:
df_housing.head(10)

In [None]:
# Display total missing values per column
missing_per_column = df_housing.isnull().sum()
print("Missing values per column:\n", missing_per_column)

# Check if any value is missing in the entire DataFrame
any_missing = df_housing.isnull().values.any()
print("\nIs there any missing data in the file?:", any_missing)

In [None]:
# Display total missing values per column
missing_per_column = df_gmaps.isnull().sum()
print("Missing values per column:\n", missing_per_column)

# Check if any value is missing in the entire DataFrame
any_missing = df_gmaps.isnull().values.any()
print("\nIs there any missing data in the file?:", any_missing)

In [None]:
# Replace empty strings with NaN (if needed)
df_gmaps['administrative_area_level_1-political'].replace('', pd.NA, inplace=True)

# Fill missing values with 'California'
df_gmaps['administrative_area_level_1-political'].fillna('California', inplace=True)

# Save the modified DataFrame to a CSV file
df_gmaps.to_csv('df_gmaps_filled.csv', index=False)

In [None]:
# Replace empty strings with NaN (if needed)
df_gmaps['postal_code_suffix'].replace('', pd.NA, inplace=True)

# Fill missing values with 9999 and convert the column to integer
df_gmaps['postal_code_suffix'] = df_gmaps['postal_code_suffix'].fillna(9999).astype(int)

# Save the modified DataFrame to a CSV file
df_gmaps.to_csv('df_gmaps_filled.csv', index=False)

In [None]:
# Replace empty strings with NaN (if needed)
df_gmaps['street_number'].replace('', pd.NA, inplace=True)

# Fill missing values with "0" (as a string to match text-based route values)
df_gmaps['street_number'] = df_gmaps['street_number'].fillna('0')

# Save the modified DataFrame to a CSV file
df_gmaps.to_csv('df_gmaps_filled.csv', index=False)

In [None]:
# Separate rows with and without the target value
df_missing = df_gmaps[df_gmaps['administrative_area_level_2-political'].isna()]
df_known = df_gmaps[df_gmaps['administrative_area_level_2-political'].notna()]

# Fit NearestNeighbors on known locations
nn = NearestNeighbors(n_neighbors=1, algorithm='ball_tree')
nn.fit(df_known[['latitude', 'longitude']])

# Find closest matches for missing rows
distances, indices = nn.kneighbors(df_missing[['latitude', 'longitude']])

# Fill missing values
df_gmaps.loc[df_missing.index, 'administrative_area_level_2-political'] = \
    df_known.iloc[indices.flatten()]['administrative_area_level_2-political'].values

# Save the modified DataFrame to a CSV
df_gmaps.to_csv('df_gmaps_filled.csv', index=False)

In [None]:
# Function to extract the route from the address
def extract_route(address):
    if pd.isna(address):
        return None
    match = re.search(r'\d+\s+([^,]+)', address)
    return match.group(1).strip() if match else None

# Replace empty strings with NaN if necessary
df_gmaps['route'].replace('', pd.NA, inplace=True)

# Fill missing 'route' values
df_gmaps['route'] = df_gmaps.apply(
    lambda row: extract_route(row['address']) if pd.isna(row['route']) else row['route'],
    axis=1
)

# Save the modified DataFrame to a CSV file
df_gmaps.to_csv('df_gmaps_filled.csv', index=False)

In [None]:
# Replace empty strings with NaN (if applicable)
df_gmaps['route'].replace('', pd.NA, inplace=True)

# Fill missing values in 'route' with the default string
df_gmaps['route'] = df_gmaps['route'].fillna('strret name not filled in')

# Save the modified DataFrame to a CSV file
df_gmaps.to_csv('df_gmaps_filled.csv', index=False)

In [None]:
# Convert empty strings to NaN
df_gmaps['locality-political'].replace('', np.nan, inplace=True)

# Identify rows with missing and non-missing 'locality-political'
df_missing = df_gmaps[df_gmaps['locality-political'].isna()]
df_known = df_gmaps[df_gmaps['locality-political'].notna()]

# Use NearestNeighbors to find closest known point
nbrs = NearestNeighbors(n_neighbors=1, algorithm='ball_tree').fit(df_known[['latitude', 'longitude']])
distances, indices = nbrs.kneighbors(df_missing[['latitude', 'longitude']])

# Fill missing 'locality-political' with nearest neighbor's value
for i, idx in enumerate(df_missing.index):
    nearest_index = df_known.index[indices[i][0]]
    df_gmaps.at[idx, 'locality-political'] = df_known.at[nearest_index, 'locality-political']

# Save the modified DataFrame
df_gmaps.to_csv('df_gmaps_filled.csv', index=False)

In [None]:
# Convert empty strings to NaN
df_gmaps['postal_code'].replace('', np.nan, inplace=True)

# Split data into rows with missing and non-missing postal_code
df_missing = df_gmaps[df_gmaps['postal_code'].isna()]
df_known = df_gmaps[df_gmaps['postal_code'].notna()]

# Fit NearestNeighbors using non-missing data
nbrs = NearestNeighbors(n_neighbors=1, algorithm='ball_tree').fit(df_known[['latitude', 'longitude']])
distances, indices = nbrs.kneighbors(df_missing[['latitude', 'longitude']])

# Fill missing postal_code values from nearest neighbor
for i, idx in enumerate(df_missing.index):
    nearest_idx = df_known.index[indices[i][0]]
    df_gmaps.at[idx, 'postal_code'] = df_known.at[nearest_idx, 'postal_code']

# Save the updated DataFrame
df_gmaps.to_csv('df_gmaps_filled.csv', index=False)

In [None]:
# Convert empty strings to NaN
df_gmaps['neighborhood-political'].replace('', np.nan, inplace=True)

# Split the DataFrame into rows with and without missing 'neighborhood-political'
df_missing = df_gmaps[df_gmaps['neighborhood-political'].isna()]
df_known = df_gmaps[df_gmaps['neighborhood-political'].notna()]

# Fit NearestNeighbors model on known values
nbrs = NearestNeighbors(n_neighbors=1, algorithm='ball_tree').fit(df_known[['latitude', 'longitude']])
distances, indices = nbrs.kneighbors(df_missing[['latitude', 'longitude']])

# Fill in missing 'neighborhood-political' values
for i, idx in enumerate(df_missing.index):
    nearest_idx = df_known.index[indices[i][0]]
    df_gmaps.at[idx, 'neighborhood-political'] = df_known.at[nearest_idx, 'neighborhood-political']

# Save the updated DataFrame
df_gmaps.to_csv('df_gmaps_filled.csv', index=False)

In [None]:
# Display total missing values per column
missing_per_column = df_gmaps.isnull().sum()
print("Missing values per column:\n", missing_per_column)

# Check if any value is missing in the entire DataFrame
any_missing = df_gmaps.isnull().values.any()
print("\nIs there any missing data in the file?:", any_missing)

In [None]:
# Keep only the first 12 columns
df_gmaps = df_gmaps.iloc[:, :12]

# Save the result to a new CSV
df_gmaps.to_csv('housing_gmaps_data_trimmed.csv', index=False)

# Display the result
print(df_gmaps.head())

In [None]:
# Display total missing values per column
missing_per_column = df_gmaps.isnull().sum()
print("Missing values per column:\n", missing_per_column)

# Check if any value is missing in the entire DataFrame
any_missing = df_gmaps.isnull().values.any()
print("\nIs there any missing data in the file?:", any_missing)

In [None]:
df_gmaps.head(10)

In [None]:
# Extract unique values (including NaNs if needed) and create the new DataFrame
df_neighborhood = pd.DataFrame({
    'neighborhood': df_gmaps['neighborhood-political'].dropna().unique()
})

# Sort for readability
df_neighborhood = df_neighborhood.sort_values(by='neighborhood').reset_index(drop=True)

# Display the first 10 rows
print(df_neighborhood.head(10))

# Save to CSV
df_neighborhood.to_csv('neighborhood.csv', index=False)

In [None]:
# Prepare coordinate data
gmaps_coords = df_gmaps[['latitude', 'longitude']].values
housing_coords = df_housing[['latitude', 'longitude']].values

# Fit NearestNeighbors model on df_gmaps
nn_model = NearestNeighbors(n_neighbors=1, algorithm='ball_tree')
nn_model.fit(gmaps_coords)

# Find the index of the closest neighbor in df_gmaps for each row in df_housing
distances, indices = nn_model.kneighbors(housing_coords)

# Get the corresponding neighborhood values from df_gmaps
matched_neighborhoods = df_gmaps.iloc[indices.flatten()]['neighborhood-political'].values

# Add the matched neighborhoods to df_housing
df_housing['neighborhood'] = matched_neighborhoods

# Save to CSV
df_housing.to_csv('df_housing_appended.csv', index=False)

In [None]:
# Group df_housing by 'neighborhood' and calculate the average of 'median_house_age'
average_house_age = df_housing.groupby('neighborhood')['housing_median_age'].mean().reset_index()

# Rename the column to match the target column name
average_house_age.columns = ['neighborhood', 'median-house-age']

# Merge the average values into df_neighborhood
df_neighborhood = df_neighborhood.merge(average_house_age, on='neighborhood', how='left')

# Save the updated DataFrame to a CSV file
df_neighborhood.to_csv('df_neighborhood.csv', index=False)

In [None]:
# Get current PC time
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

# Add 'event_time' column with current time for each row
df_neighborhood['event_time'] = current_time

# Save to CSV
df_neighborhood.to_csv('df_neighborhood.csv', index=False)

In [None]:
# Define a function to convert a number to a decade range string
def convert_to_age_range(age):
    if pd.isna(age):
        return None  # or a default value like "Unknown"
    lower = int(age) // 10 * 10
    upper = lower + 9
    return f"{lower} to {upper} years"

# Apply the function to the 'median-house-age' column
df_neighborhood['median-house-age'] = df_neighborhood['median-house-age'].apply(convert_to_age_range)

# Save the modified DataFrame to a CSV file
df_neighborhood.to_csv('df_neighborhood.csv', index=False)

In [None]:
# Group df_housing by 'neighborhood' and compute the average of 'total_households'
average_total_households = df_housing.groupby('neighborhood')['households'].mean().reset_index()

# Rename the column for merging
average_total_households.columns = ['neighborhood', 'total-households']

# Merge the average values into df_neighborhood
df_neighborhood = df_neighborhood.merge(average_total_households, on='neighborhood', how='left')

# Save the updated DataFrame to a CSV file
df_neighborhood.to_csv('df_neighborhood.csv', index=False)

In [None]:
# Calculate average median_house_value per neighborhood, capped at 500000
avg_median_house_value = (
    df_housing.groupby('neighborhood')['median_house_value']
    .mean()
    .clip(upper=500000)  # Cap at 500,000
    .reset_index()
)

# Rename column to match the target in df_neighborhood
avg_median_house_value.columns = ['neighborhood', 'median-house-value']

# Merge this result into df_neighborhood
df_neighborhood = df_neighborhood.merge(avg_median_house_value, on='neighborhood', how='left')

# Save the modified DataFrame to CSV
df_neighborhood.to_csv('df_neighborhood.csv', index=False)

In [None]:
# Round the 'medium-house-value' column to 0 decimal places and convert to int
df_neighborhood['median-house-value'] = df_neighborhood['median-house-value'].round(0).astype('Int64')

# Save the modified DataFrame to CSV
df_neighborhood.to_csv('df_neighborhood.csv', index=False)

In [None]:
# Group df_housing by 'neighborhood' and compute the average of 'total-households'
average_total_households = df_housing.groupby('neighborhood')['households'].mean().reset_index()

# Rename the column for merging
average_total_households.columns = ['neighborhood', 'total-households']

# Merge the average values into df_neighborhood
df_neighborhood = df_neighborhood.merge(average_total_households, on='neighborhood', how='left')

# Save the updated DataFrame to a CSV file
df_neighborhood.to_csv('df_neighborhood.csv', index=False)

In [None]:
# Compute average bedrooms_per_household per neighborhood in df_housing
avg_bedrooms_per_household = (
    df_housing.groupby('neighborhood')['bedrooms_per_household']
    .mean()
    .reset_index()
)

# Rename the column for clarity
avg_bedrooms_per_household.columns = ['neighborhood', 'bedrooms-per-household']

# Merge into df_neighborhood
df_neighborhood = df_neighborhood.merge(avg_bedrooms_per_household, on='neighborhood', how='left')

# Save the modified DataFrame
df_neighborhood.to_csv('df_neighborhood.csv', index=False)

In [None]:
# Round the column to the nearest integer
df_neighborhood['bedrooms-per-household'] = df_neighborhood['bedrooms-per-household'].round(0).astype('Int64')

# Save the modified DataFrame to CSV
df_neighborhood.to_csv('df_neighborhood.csv', index=False)

In [None]:
# One-hot encode the 'ocean_proximity' column
df_housing = pd.get_dummies(df_housing, columns=['ocean_proximity'])

# Save the expanded DataFrame to a CSV file
df_housing.to_csv('housing_ocean_proximity_encoded.csv', index=False)

# Display the result
print(df_housing.head())

# Save the updated DataFrame
df_housing.to_csv('df_housing.csv', index=False)

In [None]:
# Group df_housing to ensure one row per neighborhood, taking the first occurrence or mode
df_ocean = df_housing[['neighborhood', 'ocean_proximity_<1H OCEAN']].dropna().drop_duplicates(subset='neighborhood')

# Merge with df_neighborhood on the 'neighborhood' column
df_neighborhood = df_neighborhood.merge(df_ocean, on='neighborhood', how='left')

# Save to CSV
df_neighborhood.to_csv('df_neighborhood.csv', index=False)

In [None]:
# Group df_housing to ensure one row per neighborhood, taking the first occurrence or mode
df_ocean = df_housing[['neighborhood', 'ocean_proximity_INLAND']].dropna().drop_duplicates(subset='neighborhood')

# Merge with df_neighborhood on the 'neighborhood' column
df_neighborhood = df_neighborhood.merge(df_ocean, on='neighborhood', how='left')

# Save to CSV
df_neighborhood.to_csv('df_neighborhood.csv', index=False)

In [None]:
# Group df_housing to ensure one row per neighborhood, taking the first occurrence or mode
df_ocean = df_housing[['neighborhood', 'ocean_proximity_ISLAND']].dropna().drop_duplicates(subset='neighborhood')

# Merge with df_neighborhood on the 'neighborhood' column
df_neighborhood = df_neighborhood.merge(df_ocean, on='neighborhood', how='left')

# Save to CSV
df_neighborhood.to_csv('df_neighborhood.csv', index=False)

In [None]:
# Group df_housing to ensure one row per neighborhood, taking the first occurrence or mode
df_ocean = df_housing[['neighborhood', 'ocean_proximity_NEAR BAY']].dropna().drop_duplicates(subset='neighborhood')

# Merge with df_neighborhood on the 'neighborhood' column
df_neighborhood = df_neighborhood.merge(df_ocean, on='neighborhood', how='left')

# Save to CSV
df_neighborhood.to_csv('df_neighborhood.csv', index=False)

In [None]:
# Group df_housing to ensure one row per neighborhood, taking the first occurrence or mode
df_ocean = df_housing[['neighborhood', 'ocean_proximity_NEAR OCEAN']].dropna().drop_duplicates(subset='neighborhood')

# Merge with df_neighborhood on the 'neighborhood' column
df_neighborhood = df_neighborhood.merge(df_ocean, on='neighborhood', how='left')

# Save to CSV
df_neighborhood.to_csv('df_neighborhood.csv', index=False)

In [None]:
df_neighborhood.head(10)

In [None]:
# Define the neighborhoods of interest
neighborhoods_of_interest = ["Brooktree", "Fisherman's Wharf", "Los Osos"]

# Filter and display the matching rows
filtered_df = df_neighborhood[df_neighborhood['neighborhood'].isin(neighborhoods_of_interest)]
print(filtered_df)

# ----------

# Setup SageMaker FeatureStore

In [None]:
import boto3
import sagemaker

original_boto3_version = boto3.__version__
%pip install 'boto3>1.17.21'

In [None]:
# Inspect Dataset
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import io

s3_client = boto3.client("s3", region_name=region)

#XXXXXXXXXXXXXXXXXX
#fraud_detection_bucket_name replaced with home_value_prediction

home_value_prediction = f"sagemaker-example-files-prod-{region}"
identity_file_key = ("housing_gmaps_data_raw.csv"
    #"datasets/tabular/fraud_detection/synthethic_fraud_detection_SA/sampled_identity.csv"
)
transaction_file_key = ("housing.csv"
    #"datasets/tabular/fraud_detection/synthethic_fraud_detection_SA/sampled_transactions.csv"
)

identity_data_object = s3_client.get_object(
    Bucket=home_value_prediction, Key=identity_file_key
)
transaction_data_object = s3_client.get_object(
    Bucket=home_value_prediction, Key=transaction_file_key
)

identity_data = pd.read_csv(io.BytesIO(identity_data_object["Body"].read()))
transaction_data = pd.read_csv(io.BytesIO(transaction_data_object["Body"].read()))


In [None]:
identity_from sagemaker.session import Session

region = boto3.Session().region_name

boto_session = boto3.Session(region_name=region)

sagemaker_client = boto_session.client(service_name="sagemaker", region_name=region)
featurestore_runtime = boto_session.client(
    service_name="sagemaker-featurestore-runtime", region_name=region
)

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime,
)data.head()

## S3 Bucket Setup For The OfflineStore

In [None]:
# You can modify the following to use a bucket of your choosing
default_s3_bucket_name = feature_store_session.default_bucket()
prefix = "sagemaker-featurestore-demo"

print(default_s3_bucket_name)

In [None]:
from sagemaker import get_execution_role

# You can modify the following to use a role of your choosing. See the documentation for how to create this.
role = get_execution_role()
print(role)

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import io

s3_client = boto3.client("s3", region_name=region)

In [None]:
fraud_detection_bucket_name = f"sagemaker-example-files-prod-{region}"
identity_file_key = ("housing_gmaps_data_cleaned.csv")
transaction_file_key = ("housing_cleaned.csv")

In [None]:
response = s3_client.list_objects_v2(Bucket="fraud-detection-bucket-name")
for obj in response.get("Contents", []):
    print(obj["Key"])

In [None]:
identity_data_object = s3_client.get_object(
    Bucket=fraud_detection_bucket_name, Key=identity_file_key
)
transaction_data_object = s3_client.get_object(
    Bucket=fraud_detection_bucket_name, Key=transaction_file_key
)

identity_data = pd.read_csv(io.BytesIO(identity_data_object["Body"].read()))
transaction_data = pd.read_csv(io.BytesIO(transaction_data_object["Body"].read()))

In [None]:
import os

# Get current working directory
current_directory = os.getcwd()
print("Current Directory:", current_directory)

In [None]:
# List all files and directories in the current directory
files = os.listdir()
print("Files and directories:", files)

In [None]:
identity_data.head()

In [None]:
transaction_data.head()

In [None]:
home_value_prediction = f"sagemaker-example-files-prod-{region}"
identity_file_key = ("housing_gmaps_data_raw.csv"
)
transaction_file_key = ("housing.csv"
)

## Feature Engineering

In [None]:
identity_data = identity_data.round(5)
transaction_data = transaction_data.round(5)

identity_data = identity_data.fillna(0)
transaction_data = transaction_data.fillna(0)

# Feature transformations for this dataset are applied before ingestion into FeatureStore.
# One hot encode card4, card6
encoded_house_value = pd.get_dummies(transaction_data["total_rooms"], prefix="house_value")
encoded_postal_code = pd.get_dummies(transaction_data["total_bedrooms"], prefix="postal_code")

transformed_transaction_data = pd.concat(
    [transaction_data, encoded_house_value, encoded_postal_code], axis=1
)
# blank space is not allowed in feature name
transformed_transaction_data = transformed_transaction_data.rename(
    columns={"neighborhood": "neighborhood"}

In [None]:
identity_data.head()

In [None]:
transformed_transaction_data.head()

### Ingest Data into FeatureStore

In [None]:
#      Define FeatureGroups

In [None]:
from time import gmtime, strftime, sleep

identity_feature_group_name = "identity-feature-group-" + strftime("%d-%H-%M-%S", gmtime())
transaction_feature_group_name = "transaction-feature-group-" + strftime("%d-%H-%M-%S", gmtime())

In [None]:
from sagemaker.feature_store.feature_group import FeatureGroup

identity_feature_group = FeatureGroup(
    name=identity_feature_group_name, sagemaker_session=feature_store_session
)
transaction_feature_group = FeatureGroup(
    name=transaction_feature_group_name, sagemaker_session=feature_store_session
)

In [None]:
import time

current_time_sec = int(round(time.time()))

bool_columns = transformed_transaction_data.select_dtypes(include='bool').columns
transformed_transaction_data[bool_columns] = transformed_transaction_data[bool_columns].astype(int)


def cast_object_to_string(data_frame):
    for label in data_frame.columns:
        if data_frame.dtypes[label] == "object":
            data_frame[label] = data_frame[label].astype("str").astype("string")


# cast object dtype to string. The SageMaker FeatureStore Python SDK will then map the string dtype to String feature type.
cast_object_to_string(identity_data)
cast_object_to_string(transformed_transaction_data)

# record identifier and event time feature names
record_identifier_feature_name = "TransactionID"
event_time_feature_name = "EventTime"

# append EventTime feature
identity_data[event_time_feature_name] = pd.Series(
    [current_time_sec] * len(identity_data), dtype="float64"
)
transformed_transaction_data[event_time_feature_name] = pd.Series(
    [current_time_sec] * len(transaction_data), dtype="float64"
)

# load feature definitions to the feature group. SageMaker FeatureStore Python SDK will auto-detect the data schema based on input data.
identity_feature_group.load_feature_definitions(data_frame=identity_data)
# output is suppressed
transaction_feature_group.load_feature_definitions(data_frame=transformed_transaction_data)
# output is suppressed

#                 Create FeatureGroups in SageMaker FeatureStore

In [None]:
def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    print(f"FeatureGroup {feature_group.name} successfully created.")


identity_feature_group.create(
    s3_uri=f"s3://{default_s3_bucket_name}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True,
)

transaction_feature_group.create(
    s3_uri=f"s3://{default_s3_bucket_name}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True,
)

wait_for_feature_group_creation_complete(feature_group=identity_feature_group)
wait_for_feature_group_creation_complete(feature_group=transaction_feature_group)

#     Confirm the FeatureGroup has been created by using the DescribeFeatureGroup and ListFeatureGroups APIs  

In [None]:
identity_feature_group.describe()

In [None]:
transaction_feature_group.describe()

In [None]:
sagemaker_client.list_feature_groups()  # use boto client to list FeatureGroups

#       PutRecords into FeatureGroup   

In [None]:
identity_feature_group.ingest(data_frame=identity_data, max_workers=3, wait=True)

In [None]:
transaction_feature_group.ingest(data_frame=transformed_transaction_data, max_workers=5, wait=True)

#          To confirm that data has been ingested, we can quickly retrieve a record from the online store:

In [None]:
record_identifier_value = str(2990130)

featurestore_runtime.get_record(
    FeatureGroupName=transaction_feature_group_name,
    RecordIdentifierValueAsString=record_identifier_value,
)

In [None]:
featurestore_runtime.batch_get_record(
    Identifiers=[
        {
            "FeatureGroupName": identity_feature_group_name,
            "RecordIdentifiersValueAsString": ["2990130"],
        },
        {
            "FeatureGroupName": transaction_feature_group_name,
            "RecordIdentifiersValueAsString": ["2990130"],
        },
    ]
)

##     The SageMaker Python SDK’s FeatureStore class also provides the functionality to generate Hive DDL commands.
##     Schema of the table is generated based on the feature definitions.
##     Columns are named after feature name and data-type are inferred based on feature type.

In [None]:
print(identity_feature_group.as_hive_ddl())

In [None]:
account_id = boto3.client("sts").get_caller_identity()["Account"]
print(account_id)

identity_feature_group_resolved_output_s3_uri = (
    identity_feature_group.describe()
    .get("OfflineStoreConfig")
    .get("S3StorageConfig")
    .get("ResolvedOutputS3Uri")
)

transaction_feature_group_resolved_output_s3_uri = (
    transaction_feature_group.describe()
    .get("OfflineStoreConfig")
    .get("S3StorageConfig")
    .get("ResolvedOutputS3Uri")
)

identity_feature_group = FeatureGroup(name="your-identity-feature-group-name", sagemaker_session=sagemaker_session)


transaction_feature_group_s3_prefix = transaction_feature_group_resolved_output_s3_uri.replace(
    f"s3://{default_s3_bucket_name}/", ""
)

offline_store_contents = None
while offline_store_contents is None:
    objects_in_bucket = s3_client.list_objects(
        Bucket=default_s3_bucket_name, Prefix=transaction_feature_group_s3_prefix
    )
    if "Contents" in objects_in_bucket and len(objects_in_bucket["Contents"]) > 1:
        offline_store_contents = objects_in_bucket["Contents"]
    else:
        print("Waiting for data in offline store...\n")
        sleep(60)

print("Data available.")

In [None]:
#XXXXXXXXXXXXXXXXXXXXXXXXx
from sagemaker import Session

sagemaker_session = Session()
client = sagemaker_session.sagemaker_client

response = client.list_feature_groups()
for fg in response["FeatureGroupSummaries"]:
    print(fg["FeatureGroupName"])

In [None]:
print(transaction_feature_group.as_hive_ddl())

In [None]:
account_id = boto3.client("sts").get_caller_identity()["Account"]
print(account_id)

identity_feature_group_resolved_output_s3_uri = (
    identity_feature_group.describe()
    .get("OfflineStoreConfig")
    .get("S3StorageConfig")
    .get("ResolvedOutputS3Uri")
)

transaction_feature_group_resolved_output_s3_uri = (
    transaction_feature_group.describe()
    .get("OfflineStoreConfig")
    .get("S3StorageConfig")
    .get("ResolvedOutputS3Uri")
)

identity_feature_group = FeatureGroup(name="your-identity-feature-group-name", sagemaker_session=sagemaker_session)


transaction_feature_group_s3_prefix = transaction_feature_group_resolved_output_s3_uri.replace(
    f"s3://{default_s3_bucket_name}/", ""
)

offline_store_contents = None
while offline_store_contents is None:
    objects_in_bucket = s3_client.list_objects(
        Bucket=default_s3_bucket_name, Prefix=transaction_feature_group_s3_prefix
    )
    if "Contents" in objects_in_bucket and len(objects_in_bucket["Contents"]) > 1:
        offline_store_contents = objects_in_bucket["Contents"]
    else:
        print("Waiting for data in offline store...\n")
        sleep(60)

print("Data available.")

### Build Training Dataset

In [None]:
#XXXXXXXXXXXXXXXXXXXXXXXXXXx
from sagemaker import Session

sagemaker_session = Session()
client = sagemaker_session.sagemaker_client

response = client.list_feature_groups()

print("Available Feature Groups:")
for fg in response["FeatureGroupSummaries"]:
    print("-", fg["FeatureGroupName"])

In [None]:
identity_query = identity_feature_group.athena_query()
transaction_query = transaction_feature_group.athena_query()

identity_table = identity_query.table_name
transaction_table = transaction_query.table_name

query_string = (
    'SELECT * FROM "'
    + transaction_table
    + '" LEFT JOIN "'
    + identity_table
    + '" ON "'
    + transaction_table
    + '".transactionid = "'
    + identity_table
    + '".transactionid'
)
print("Running " + query_string)

# run Athena query. The output is loaded to a Pandas dataframe.
# dataset = pd.DataFrame()
identity_query.run(
    query_string=query_string,
    output_location="s3://" + default_s3_bucket_name + "/" + prefix + "/query_results/",
)
identity_query.wait()
dataset = identity_query.as_dataframe()

dataset

In [None]:
import sagemaker
from sagemaker.feature_store.feature_group import FeatureGroup

# Create the session
sagemaker_session = sagemaker.Session()

# Initialize the feature group object
identity_feature_group = FeatureGroup(
    name="your-identity-feature-group-name",
    sagemaker_session=sagemaker_session
)

query_string = f"""
SELECT *
FROM "{identity_feature_group.name}"
"""

output_location = f"s3://{default_s3_bucket_name}/{prefix}/athena-results/"

identity_query = identity_feature_group.athena_query()
identity_query.run(query_string=query_string, output_location=output_location)
identity_query.wait()

query_execution = identity_query.get_query_execution()
query_result = (
    "s3://"
    + default_s3_bucket_name
    + "/"
    + prefix
    + "/query_results/"
    + query_execution["QueryExecution"]["QueryExecutionId"]
    + ".csv"
)
print(query_result)

# Select useful columns for training with target column as the first.
dataset = dataset[
    [
        "house_value",
        "neighborhood",
        "postal_code",
        "number_of_rooms",
        "number_of_bedrooms",
        "ocean_proximity_<1H OCEAN",
        "ocean_proximity_INLAND",
        "ocean_proximity_ISLAND",
        "ocean_proximity_NEAR BAY",
        "ocean_proximity_NEAR OCEAN",
    ]
]

# Write to csv in S3 without headers and index column.
dataset.to_csv("dataset.csv", header=False, index=False)
s3_client.upload_file("dataset.csv", default_s3_bucket_name, prefix + "/training_input/dataset.csv")
dataset_uri_prefix = "s3://" + default_s3_bucket_name + "/" + prefix + "/training_input/"

dataset

### Train and Deploy the Model

#     Construct a SageMaker generic estimator using the SageMaker XGBoost container

In [None]:
training_image = sagemaker.image_uris.retrieve("xgboost", region, "1.0-1")

#     Construct a SageMaker generic estimator using the SageMaker XGBoost container

In [None]:
training_output_path = "s3://" + default_s3_bucket_name + "/" + prefix + "/training_output"

from sagemaker.estimator import Estimator

training_model = Estimator(
    training_image,
    role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size=5,
    max_run=3600,
    input_mode="File",
    output_path=training_output_path,
    sagemaker_session=feature_store_session,
)

#       Set hyperparameters

In [None]:
training_model.set_hyperparameters(objective="binary:logistic", num_round=50)

#       Specify training dataset

In [None]:
train_data = sagemaker.inputs.TrainingInput(
    dataset_uri_prefix,
    distribution="FullyReplicated",
    content_type="text/csv",
    s3_data_type="S3Prefix",
)
data_channels = {"train": train_data}

# Start training

In [None]:
training_model.fit(inputs=data_channels, logs=True)

## Set up Hosting for the Model

In [None]:
predictor = training_model.deploy(initial_instance_count=1, instance_type="ml.m5.xlarge")

## SageMaker FeatureStore During Inference

In [None]:
# Incoming inference request.
transaction_id = str(3450774)


# Helper to parse the feature value from the record.
def get_feature_value(record, feature_name):
    return str(list(filter(lambda r: r["FeatureName"] == feature_name, record))[0]["ValueAsString"])


transaction_response = featurestore_runtime.get_record(
    FeatureGroupName=transaction_feature_group_name, RecordIdentifierValueAsString=transaction_id
)
transaction_record = transaction_response["Record"]

transaction_test_data = [
    get_feature_value(transaction_record, "house_value"),
    get_feature_value(transaction_record, "neighborhood"),
    get_feature_value(transaction_record, "postal_code"),
    get_feature_value(transaction_record, "number_of_rooms"),
    get_feature_value(transaction_record, "number_of_bedrooms"),
    get_feature_value(transaction_record, "ocean_proximity_<1H OCEAN"),
    get_feature_value(transaction_record, "ocean_proximity_INLAND"),
    get_feature_value(transaction_record, "ocean_proximity_ISLAND"),
    get_feature_value(transaction_record, "ocean_proximity_NEAR BAY"),
    get_feature_value(transaction_record, "ocean_proximity_NEAR OCEAN"),
]

identity_response = featurestore_runtime.get_record(
    FeatureGroupName=identity_feature_group_name, RecordIdentifierValueAsString=transaction_id
)
identity_record = identity_response["Record"]
id_test_data = [
    get_feature_value(identity_record, "id_01"),
    get_feature_value(identity_record, "id_02"),
    get_feature_value(identity_record, "id_03"),
    get_feature_value(identity_record, "id_04"),
    get_feature_value(identity_record, "id_05"),
]

# Join all pieces for inference request.
inference_request = []
inference_request.extend(transaction_test_data[:])
inference_request.extend(id_test_data[:])

inference_request

In [None]:
import json

results = predictor.predict(",".join(inference_request), initial_args={"ContentType": "text/csv"})
prediction = json.loads(results)
print(prediction)

## Cleanup Resources

In [None]:
predictor.delete_endpoint()

In [None]:
identity_feature_group.delete()
transaction_feature_group.delete()

In [None]:
# restore original boto3 version
%pip install 'boto3=={}'.format(original_boto3_version)

########EXERCISE

In [None]:
import boto3
import sagemaker

original_boto3_version = boto3.__version__
%pip install 'boto3>1.17.21

In [None]:
from sagemaker.session import Session

region = boto3.Session().region_name

boto_session = boto3.Session(region_name=region)

sagemaker_client = boto_session.client(service_name="sagemaker", region_name=region)
featurestore_runtime = boto_session.client(
    service_name="sagemaker-featurestore-runtime", region_name=region
)

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime,
)

In [None]:
# You can modify the following to use a bucket of your choosing
default_s3_bucket_name = feature_store_session.default_bucket()
prefix = "sagemaker-featurestore-demo"

print(default_s3_bucket_name)

In [None]:
from sagemaker import get_execution_role

# You can modify the following to use a role of your choosing. See the documentation for how to create this.
role = get_execution_role()
print(role)

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import io

s3_client = boto3.client("s3", region_name=region)

fraud_detection_bucket_name = f"sagemaker-example-files-prod-{region}"
identity_file_key = (
    "datasets/tabular/fraud_detection/synthethic_fraud_detection_SA/sampled_identity.csv"
)
transaction_file_key = (
    "datasets/tabular/fraud_detection/synthethic_fraud_detection_SA/sampled_transactions.csv"
)

identity_data_object = s3_client.get_object(
    Bucket=fraud_detection_bucket_name, Key=identity_file_key
)
transaction_data_object = s3_client.get_object(
    Bucket=fraud_detection_bucket_name, Key=transaction_file_key
)

identity_data = pd.read_csv(io.BytesIO(identity_data_object["Body"].read()))
transaction_data = pd.read_csv(io.BytesIO(transaction_data_object["Body"].read()))

In [None]:
identity_data.head()

In [None]:
transaction_data.head()

In [None]:
identity_data = identity_data.round(5)
transaction_data = transaction_data.round(5)

identity_data = identity_data.fillna(0)
transaction_data = transaction_data.fillna(0)

# Feature transformations for this dataset are applied before ingestion into FeatureStore.
# One hot encode card4, card6
encoded_card_bank = pd.get_dummies(transaction_data["card4"], prefix="card_bank")
encoded_card_type = pd.get_dummies(transaction_data["card6"], prefix="card_type")

transformed_transaction_data = pd.concat(
    [transaction_data, encoded_card_type, encoded_card_bank], axis=1
)
# blank space is not allowed in feature name
transformed_transaction_data = transformed_transaction_data.rename(
    columns={"card_bank_american express": "card_bank_american_express"}
)

In [None]:
identity_data.head()

In [None]:
transformed_transaction_data.head()

In [None]:
from time import gmtime, strftime, sleep

identity_feature_group_name = "identity-feature-group-" + strftime("%d-%H-%M-%S", gmtime())
transaction_feature_group_name = "transaction-feature-group-" + strftime("%d-%H-%M-%S", gmtime())

In [None]:
from sagemaker.feature_store.feature_group import FeatureGroup

identity_feature_group = FeatureGroup(
    name=identity_feature_group_name, sagemaker_session=feature_store_session
)
transaction_feature_group = FeatureGroup(
    name=transaction_feature_group_name, sagemaker_session=feature_store_session
)

In [None]:
import time

current_time_sec = int(round(time.time()))

bool_columns = transformed_transaction_data.select_dtypes(include='bool').columns
transformed_transaction_data[bool_columns] = transformed_transaction_data[bool_columns].astype(int)


def cast_object_to_string(data_frame):
    for label in data_frame.columns:
        if data_frame.dtypes[label] == "object":
            data_frame[label] = data_frame[label].astype("str").astype("string")


# cast object dtype to string. The SageMaker FeatureStore Python SDK will then map the string dtype to String feature type.
cast_object_to_string(identity_data)
cast_object_to_string(transformed_transaction_data)

# record identifier and event time feature names
record_identifier_feature_name = "TransactionID"
event_time_feature_name = "EventTime"

# append EventTime feature
identity_data[event_time_feature_name] = pd.Series(
    [current_time_sec] * len(identity_data), dtype="float64"
)
transformed_transaction_data[event_time_feature_name] = pd.Series(
    [current_time_sec] * len(transaction_data), dtype="float64"
)

# load feature definitions to the feature group. SageMaker FeatureStore Python SDK will auto-detect the data schema based on input data.
identity_feature_group.load_feature_definitions(data_frame=identity_data)
# output is suppressed
transaction_feature_group.load_feature_definitions(data_frame=transformed_transaction_data)
# output is suppressed

In [None]:
def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    print(f"FeatureGroup {feature_group.name} successfully created.")


identity_feature_group.create(
    s3_uri=f"s3://{default_s3_bucket_name}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True,
)

transaction_feature_group.create(
    s3_uri=f"s3://{default_s3_bucket_name}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True,
)

wait_for_feature_group_creation_complete(feature_group=identity_feature_group)
wait_for_feature_group_creation_complete(feature_group=transaction_feature_group)

In [None]:
identity_feature_group.describe()

In [None]:
transaction_feature_group.describe()

In [None]:
sagemaker_client.list_feature_groups()  # use boto client to list FeatureGroups

In [None]:
identity_feature_group.ingest(data_frame=identity_data, max_workers=3, wait=True)

In [None]:
transaction_feature_group.ingest(data_frame=transformed_transaction_data, max_workers=5, wait=True)

In [None]:
record_identifier_value = str(2990130)

featurestore_runtime.get_record(
    FeatureGroupName=transaction_feature_group_name,
    RecordIdentifierValueAsString=record_identifier_value,
)

In [None]:
featurestore_runtime.batch_get_record(
    Identifiers=[
        {
            "FeatureGroupName": identity_feature_group_name,
            "RecordIdentifiersValueAsString": ["2990130"],
        },
        {
            "FeatureGroupName": transaction_feature_group_name,
            "RecordIdentifiersValueAsString": ["2990130"],
        },
    ]
)

In [None]:
print(identity_feature_group.as_hive_ddl())

In [None]:
account_id = boto3.client("sts").get_caller_identity()["Account"]
print(account_id)

identity_feature_group_resolved_output_s3_uri = (
    identity_feature_group.describe()
    .get("OfflineStoreConfig")
    .get("S3StorageConfig")
    .get("ResolvedOutputS3Uri")
)

transaction_feature_group_resolved_output_s3_uri = (
    transaction_feature_group.describe()
    .get("OfflineStoreConfig")
    .get("S3StorageConfig")
    .get("ResolvedOutputS3Uri")
)

identity_feature_group = FeatureGroup(name="your-identity-feature-group-name", sagemaker_session=sagemaker_session)


transaction_feature_group_s3_prefix = transaction_feature_group_resolved_output_s3_uri.replace(
    f"s3://{default_s3_bucket_name}/", ""
)

offline_store_contents = None
while offline_store_contents is None:
    objects_in_bucket = s3_client.list_objects(
        Bucket=default_s3_bucket_name, Prefix=transaction_feature_group_s3_prefix
    )
    if "Contents" in objects_in_bucket and len(objects_in_bucket["Contents"]) > 1:
        offline_store_contents = objects_in_bucket["Contents"]
    else:
        print("Waiting for data in offline store...\n")
        sleep(60)

print("Data available.")

In [None]:
#XXXXXXXXXXXXXXXXXXXXXXXXXXx
from sagemaker import Session

sagemaker_session = Session()
client = sagemaker_session.sagemaker_client

response = client.list_feature_groups()

print("Available Feature Groups:")
for fg in response["FeatureGroupSummaries"]:
    print("-", fg["FeatureGroupName"])

In [None]:
identity_query = identity_feature_group.athena_query()
transaction_query = transaction_feature_group.athena_query()

identity_table = identity_query.table_name
transaction_table = transaction_query.table_name

query_string = (
    'SELECT * FROM "'
    + transaction_table
    + '" LEFT JOIN "'
    + identity_table
    + '" ON "'
    + transaction_table
    + '".transactionid = "'
    + identity_table
    + '".transactionid'
)
print("Running " + query_string)

# run Athena query. The output is loaded to a Pandas dataframe.
# dataset = pd.DataFrame()
identity_query.run(
    query_string=query_string,
    output_location="s3://" + default_s3_bucket_name + "/" + prefix + "/query_results/",
)
identity_query.wait()
dataset = identity_query.as_dataframe()

dataset

In [None]:
import sagemaker
from sagemaker.feature_store.feature_group import FeatureGroup

# Create the session
sagemaker_session = sagemaker.Session()

# Initialize the feature group object
identity_feature_group = FeatureGroup(
    name="your-identity-feature-group-name",
    sagemaker_session=sagemaker_session
)

query_string = f"""
SELECT *
FROM "{identity_feature_group.name}"
"""

output_location = f"s3://{default_s3_bucket_name}/{prefix}/athena-results/"

identity_query = identity_feature_group.athena_query()
identity_query.run(query_string=query_string, output_location=output_location)
identity_query.wait()

query_execution = identity_query.get_query_execution()
query_result = (
    "s3://"
    + default_s3_bucket_name
    + "/"
    + prefix
    + "/query_results/"
    + query_execution["QueryExecution"]["QueryExecutionId"]
    + ".csv"
)
print(query_result)

# Select useful columns for training with target column as the first.
dataset = dataset[
    [
        "isfraud",
        "transactiondt",
        "transactionamt",
        "card1",
        "card2",
        "card3",
        "card5",
        "card_type_credit",
        "card_type_debit",
        "card_bank_american_express",
        "card_bank_discover",
        "card_bank_mastercard",
        "card_bank_visa",
        "id_01",
        "id_02",
        "id_03",
        "id_04",
        "id_05",
    ]
]

# Write to csv in S3 without headers and index column.
dataset.to_csv("dataset.csv", header=False, index=False)
s3_client.upload_file("dataset.csv", default_s3_bucket_name, prefix + "/training_input/dataset.csv")
dataset_uri_prefix = "s3://" + default_s3_bucket_name + "/" + prefix + "/training_input/"

dataset

In [None]:
training_image = sagemaker.image_uris.retrieve("xgboost", region, "1.0-1")

In [None]:
training_output_path = "s3://" + default_s3_bucket_name + "/" + prefix + "/training_output"

from sagemaker.estimator import Estimator

training_model = Estimator(
    training_image,
    role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size=5,
    max_run=3600,
    input_mode="File",
    output_path=training_output_path,
    sagemaker_session=feature_store_session,
)

In [None]:
training_model.set_hyperparameters(objective="binary:logistic", num_round=50)

In [None]:
train_data = sagemaker.inputs.TrainingInput(
    dataset_uri_prefix,
    distribution="FullyReplicated",
    content_type="text/csv",
    s3_data_type="S3Prefix",
)
data_channels = {"train": train_data}

In [None]:
training_model.fit(inputs=data_channels, logs=True)

In [None]:
predictor = training_model.deploy(initial_instance_count=1, instance_type="ml.m5.xlarge")

In [None]:
# Incoming inference request.
transaction_id = str(3450774)


# Helper to parse the feature value from the record.
def get_feature_value(record, feature_name):
    return str(list(filter(lambda r: r["FeatureName"] == feature_name, record))[0]["ValueAsString"])


transaction_response = featurestore_runtime.get_record(
    FeatureGroupName=transaction_feature_group_name, RecordIdentifierValueAsString=transaction_id
)
transaction_record = transaction_response["Record"]

transaction_test_data = [
    get_feature_value(transaction_record, "TransactionDT"),
    get_feature_value(transaction_record, "TransactionAmt"),
    get_feature_value(transaction_record, "card1"),
    get_feature_value(transaction_record, "card2"),
    get_feature_value(transaction_record, "card3"),
    get_feature_value(transaction_record, "card5"),
    get_feature_value(transaction_record, "card_type_credit"),
    get_feature_value(transaction_record, "card_type_debit"),
    get_feature_value(transaction_record, "card_bank_american_express"),
    get_feature_value(transaction_record, "card_bank_discover"),
    get_feature_value(transaction_record, "card_bank_mastercard"),
    get_feature_value(transaction_record, "card_bank_visa"),
]

identity_response = featurestore_runtime.get_record(
    FeatureGroupName=identity_feature_group_name, RecordIdentifierValueAsString=transaction_id
)
identity_record = identity_response["Record"]
id_test_data = [
    get_feature_value(identity_record, "id_01"),
    get_feature_value(identity_record, "id_02"),
    get_feature_value(identity_record, "id_03"),
    get_feature_value(identity_record, "id_04"),
    get_feature_value(identity_record, "id_05"),
]

# Join all pieces for inference request.
inference_request = []
inference_request.extend(transaction_test_data[:])
inference_request.extend(id_test_data[:])

inference_request

In [None]:
import json

results = predictor.predict(",".join(inference_request), initial_args={"ContentType": "text/csv"})
prediction = json.loads(results)
print(prediction)

In [None]:
predictor.delete_endpoint()

In [None]:
identity_feature_group.delete()
transaction_feature_group.delete()

In [None]:
# restore original boto3 version
%pip install 'boto3=={}'.format(original_boto3_version)