Skip to content

Commit

Permalink
fix: Fix DynamoDB fetches when there are entities that are not found (#…
Browse files Browse the repository at this point in the history
…2573)

* fix: Fix DynamoDB fetches when there are entities that are not found

Signed-off-by: Danny Chiao <danny@tecton.ai>

* remove sort_keys from dynamo since they must be sorted. Add better test for different unknowns

Signed-off-by: Danny Chiao <danny@tecton.ai>
  • Loading branch information
adchia committed Apr 20, 2022
1 parent 20b704a commit 882328f
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 21 deletions.
28 changes: 14 additions & 14 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
region: StrictStr
"""AWS Region Name"""

sort_response: bool = True
"""Whether or not to sort BatchGetItem response."""

table_name_template: StrictStr = "{project}.{table_name}"
"""DynamoDB table name template"""

Expand Down Expand Up @@ -204,9 +201,6 @@ def online_read(
"""
Retrieve feature values from the online DynamoDB store.
Note: This method is currently not optimized to retrieve a lot of data at a time
as it does sequential gets from the DynamoDB table.
Args:
config: The RepoConfig for the current FeatureStore.
table: Feast FeatureView.
Expand All @@ -224,7 +218,6 @@ def online_read(
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
entity_ids = [compute_entity_id(entity_key) for entity_key in entity_keys]
batch_size = online_config.batch_size
sort_response = online_config.sort_response
entity_ids_iter = iter(entity_ids)
while True:
batch = list(itertools.islice(entity_ids_iter, batch_size))
Expand All @@ -243,20 +236,27 @@ def online_read(
response = response.get("Responses")
table_responses = response.get(table_instance.name)
if table_responses:
if sort_response:
table_responses = self._sort_dynamodb_response(
table_responses, entity_ids
)
table_responses = self._sort_dynamodb_response(
table_responses, entity_ids
)
entity_idx = 0
for tbl_res in table_responses:
entity_id = tbl_res["entity_id"]
while entity_id != batch[entity_idx]:
result.append((None, None))
entity_idx += 1
res = {}
for feature_name, value_bin in tbl_res["values"].items():
val = ValueProto()
val.ParseFromString(value_bin.value)
res[feature_name] = val
result.append((datetime.fromisoformat(tbl_res["event_ts"]), res))
else:
batch_size_nones = ((None, None),) * len(batch)
result.extend(batch_size_nones)
entity_idx += 1

# Not all entities in a batch may have responses
# Pad with remaining values in batch that were not found
batch_size_nones = ((None, None),) * (len(batch) - len(result))
result.extend(batch_size_nones)
return result

def _get_dynamodb_client(self, region: str, endpoint_url: Optional[str] = None):
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/online_stores/online_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ def online_read(
entity_keys: a list of entity keys that should be read from the FeatureStore.
requested_features: (Optional) A subset of the features that should be read from the FeatureStore.
Returns:
Data is returned as a list, one item per entity key. Each item in the list is a tuple
of event_ts for the row, and the feature data as a dict from feature names to values.
Values are returned as Value proto message.
Data is returned as a list, one item per entity key in the original order as the entity_keys argument.
Each item in the list is a tuple of event_ts for the row, and the feature data as a dict from feature names
to values. Values are returned as Value proto message.
"""
...

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
DynamoDBOnlineStoreConfig,
DynamoDBTable,
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import RepoConfig
from tests.utils.online_store_utils import (
_create_n_customer_test_samples,
Expand Down Expand Up @@ -49,7 +51,6 @@ def test_online_store_config_default():
assert dynamodb_store_config.batch_size == 40
assert dynamodb_store_config.endpoint_url is None
assert dynamodb_store_config.region == aws_region
assert dynamodb_store_config.sort_response is True
assert dynamodb_store_config.table_name_template == "{project}.{table_name}"


Expand All @@ -70,20 +71,17 @@ def test_online_store_config_custom_params():
aws_region = "us-west-2"
batch_size = 20
endpoint_url = "http://localhost:8000"
sort_response = False
table_name_template = "feast_test.dynamodb_table"
dynamodb_store_config = DynamoDBOnlineStoreConfig(
region=aws_region,
batch_size=batch_size,
endpoint_url=endpoint_url,
sort_response=sort_response,
table_name_template=table_name_template,
)
assert dynamodb_store_config.type == "dynamodb"
assert dynamodb_store_config.batch_size == batch_size
assert dynamodb_store_config.endpoint_url == endpoint_url
assert dynamodb_store_config.region == aws_region
assert dynamodb_store_config.sort_response == sort_response
assert dynamodb_store_config.table_name_template == table_name_template


Expand Down Expand Up @@ -175,6 +173,42 @@ def test_online_read(repo_config, n_samples):
assert [item[1] for item in returned_items] == list(features)


@mock_dynamodb2
def test_online_read_unknown_entity(repo_config):
"""Test DynamoDBOnlineStore online_read method."""
n_samples = 2
_create_test_table(PROJECT, f"{TABLE_NAME}_{n_samples}", REGION)
data = _create_n_customer_test_samples(n=n_samples)
_insert_data_test_table(data, PROJECT, f"{TABLE_NAME}_{n_samples}", REGION)

entity_keys, features, *rest = zip(*data)
# Append a nonsensical entity to search for
entity_keys = list(entity_keys)
features = list(features)
dynamodb_store = DynamoDBOnlineStore()

# Have the unknown entity be in the beginning, middle, and end of the list of entities.
for pos in range(len(entity_keys)):
entity_keys_with_unknown = deepcopy(entity_keys)
entity_keys_with_unknown.insert(
pos,
EntityKeyProto(
join_keys=["customer"], entity_values=[ValueProto(string_val="12359")]
),
)
features_with_none = deepcopy(features)
features_with_none.insert(pos, None)
returned_items = dynamodb_store.online_read(
config=repo_config,
table=MockFeatureView(name=f"{TABLE_NAME}_{n_samples}"),
entity_keys=entity_keys_with_unknown,
)
assert len(returned_items) == len(entity_keys_with_unknown)
assert [item[1] for item in returned_items] == list(features_with_none)
# The order should match the original entity key order
assert returned_items[pos] == (None, None)


@mock_dynamodb2
def test_write_batch_non_duplicates(repo_config):
"""Test DynamoDBOnline Store deduplicate write batch request items."""
Expand Down

0 comments on commit 882328f

Please sign in to comment.