In [None]:
import warnings
warnings.filterwarnings("ignore")

import os
import feast
import merlin.models.tf as mm
import nvtabular as nvt
import numpy as np
import tensorflow as tf


from merlin.datasets.ecommerce import transform_aliccp
from merlin.schema.tags import Tags
from merlin.io.dataset import Dataset
from nvtabular.ops import *

# for running this example on CPU, comment out the line below
# os.environ["TF_GPU_ALLOCATOR"] = "cuda_malloc_async"

In [None]:
DATA_DIR = "/model-data/aliccp"
BASE_DIR = "/workdir"

In [None]:
feature_repo_path = os.path.join(BASE_DIR, "feature_repo")

#### Prepare User and Item features

In [None]:
from merlin.models.utils.dataset import unique_rows_by_features

# Load pre-generated User features file
user_features = Dataset(os.path.join(DATA_DIR, "user_features.parquet")).to_ddf().compute()
user_features.head()

In [None]:
from datetime import datetime

user_features["datetime"] = datetime.now()
user_features["datetime"] = user_features["datetime"].astype("datetime64[ns]")
user_features["created"] = datetime.now()
user_features["created"] = user_features["created"].astype("datetime64[ns]")
user_features.head()

In [None]:
# Write parquet file to feature_repo
user_features.to_parquet(
    os.path.join(feature_repo_path, "data", "user_features.parquet")
)

In [None]:
# Load pre-generated Item features file
item_features = Dataset(os.path.join(DATA_DIR, "item_features.parquet")).to_ddf().compute()
item_features.head()

In [None]:
item_features["datetime"] = datetime.now()
item_features["datetime"] = item_features["datetime"].astype("datetime64[ns]")
item_features["created"] = datetime.now()
item_features["created"] = item_features["created"].astype("datetime64[ns]")
item_features.head()

In [None]:
# Write parquet file to feature_repo
item_features.to_parquet(
    os.path.join(feature_repo_path, "data", "item_features.parquet")
)

#### Register features

In [None]:
%cd $feature_repo_path
!feast apply

In [None]:
!feast materialize 1995-01-01T01:01:01 2025-01-01T01:01:01

In [None]:
feature_store = feast.FeatureStore(feature_repo_path)
feature_store.get_feature_view("user_features").features

In [None]:
%%timeit
feature_store.get_online_features(
    features=["user_features:user_id", "user_features:user_age"],
    entity_rows=[{"user_id_raw": 1}]
).to_df()

# Fast feature retrieval from Redis!

In [None]:
import seedir as sd

sd.seedir(
    feature_repo_path,
    style="lines",
    itemlimit=10,
    depthlimit=3,
    exclude_folders=[".ipynb_checkpoints", "__pycache__"],
    sort=True,
)

#### Redis ANN Index Setup
Load Item Embeddings

In [None]:
item_embeddings = Dataset(os.path.join(DATA_DIR, "item_embeddings.parquet")).to_ddf().compute()
item_embeddings.head()

In [None]:
import asyncio
import redis.asyncio as redis
from redis.commands.search.query import Query
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.field import VectorField

# Connect to the Redis client
host, port = os.environ["FEATURE_STORE_ADDRESS"].split(":")
redis_conn = redis.Redis(host=host, port=port)

In [None]:
INDEX_NAME = "candidate_index"
VECTOR_FIELD_NAME = "item_embedding"

vector_field = VectorField(
    VECTOR_FIELD_NAME,
    "HNSW", {
        "TYPE": "FLOAT32",
        "DIM": 64,
        "DISTANCE_METRIC": "IP",
        "INITIAL_CAP": len(item_embeddings),
    }
)

# Create ANN Index
await redis_conn.ft(INDEX_NAME).create_index(
    fields = [vector_field],
    definition= IndexDefinition(prefix=["ITEM:"], index_type=IndexType.HASH)
)

In [None]:
# Function to write item embeddings to Redis
async def write_item_embeddings(embs, n: int, redis_conn: redis.Redis):
    semaphore = asyncio.Semaphore(n)
    async def write(row):
        async with semaphore:
            item_id = int(row.pop("item_id"))
            entry = {
                "item_id": item_id,
                VECTOR_FIELD_NAME: np.array(row.values, dtype=np.float32).tobytes()
            }
            await redis_conn.hset(f"ITEM:{item_id}", mapping=entry)
    asyncio.gather(*[write(row[1]) for row in embs.iterrows()])

In [None]:
# Write embeddings to Redis ANN Index created above
await write_item_embeddings(item_embeddings, 100, redis_conn)

In [None]:
# Verify Index Construction
await redis_conn.ft(INDEX_NAME).info()

In [None]:
# Fetch an Item ID
item_ids = [key for key in await redis_conn.keys() if b"ITEM:" in key]
item_id = item_ids[0]

# Fetch a testing input vector
test_vector = await redis_conn.hget(item_id.decode("utf"), VECTOR_FIELD_NAME)

# Create a Redis VSS Query
query = Query(f"*=>[KNN 10 @{VECTOR_FIELD_NAME} $vec_param AS vector_score]")\
    .sort_by("vector_score")\
    .return_fields("id", "vector_score")\
    .dialect(2)

# Search for KNN
k_nearest_neighbors = await redis_conn.ft(INDEX_NAME).search(query, query_params={"vec_param": test_vector})
# Inspect results
k_nearest_neighbors.docs