From 4eafd364aba9d3ece0b5d448dcd3e20a48a7c8b2 Mon Sep 17 00:00:00 2001 From: zhilingc Date: Tue, 28 Jan 2020 10:06:40 +0800 Subject: [PATCH 1/2] Allow users not to set max age for batch retrieval --- .../templates/single_featureset_pit_join.sql | 8 +++-- tests/e2e/bq-batch-retrieval.py | 33 +++++++++++++++++++ 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/serving/src/main/resources/templates/single_featureset_pit_join.sql b/serving/src/main/resources/templates/single_featureset_pit_join.sql index 1f4612b3503..f3f20828ff1 100644 --- a/serving/src/main/resources/templates/single_featureset_pit_join.sql +++ b/serving/src/main/resources/templates/single_featureset_pit_join.sql @@ -29,7 +29,8 @@ SELECT created_timestamp, {{ featureSet.entities | join(', ')}}, false AS is_entity_table -FROM `{{projectId}}.{{datasetId}}.{{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}` WHERE event_timestamp <= '{{maxTimestamp}}' AND event_timestamp >= Timestamp_sub(TIMESTAMP '{{ minTimestamp }}', interval {{ featureSet.maxAge }} second) +FROM `{{projectId}}.{{datasetId}}.{{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}` WHERE event_timestamp <= '{{maxTimestamp}}' +{% if featureSet.maxAge == 0 %}{% else %}AND event_timestamp >= Timestamp_sub(TIMESTAMP '{{ minTimestamp }}', interval {{ featureSet.maxAge }} second){% endif %} ), /* 2. Window the data in the unioned dataset, partitioning by entity and ordering by event_timestamp, as @@ -47,7 +48,7 @@ SELECT event_timestamp, {{ featureSet.entities | join(', ')}}, {% for featureName in featureSet.features %} - IF(event_timestamp >= {{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp AND Timestamp_sub(event_timestamp, interval {{ featureSet.maxAge }} second) < {{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, {{ featureSet.project }}_{{ featureName }}_v{{ featureSet.version }}, NULL) as {{ featureSet.project }}_{{ featureName }}_v{{ featureSet.version }}{% if loop.last %}{% else %}, {% endif %} + IF(event_timestamp >= {{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp {% if featureSet.maxAge == 0 %}{% else %}AND Timestamp_sub(event_timestamp, interval {{ featureSet.maxAge }} second) < {{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp{% endif %}, {{ featureSet.project }}_{{ featureName }}_v{{ featureSet.version }}, NULL) as {{ featureSet.project }}_{{ featureName }}_v{{ featureSet.version }}{% if loop.last %}{% else %}, {% endif %} {% endfor %} FROM ( SELECT @@ -72,7 +73,8 @@ SELECT {% for featureName in featureSet.features %} {{ featureName }} as {{ featureSet.project }}_{{ featureName }}_v{{ featureSet.version }}{% if loop.last %}{% else %}, {% endif %} {% endfor %} -FROM `{{projectId}}.{{datasetId}}.{{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}` WHERE event_timestamp <= '{{maxTimestamp}}' AND event_timestamp >= Timestamp_sub(TIMESTAMP '{{ minTimestamp }}', interval {{ featureSet.maxAge }} second) +FROM `{{projectId}}.{{datasetId}}.{{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}` WHERE event_timestamp <= '{{maxTimestamp}}' +{% if featureSet.maxAge == 0 %}{% else %}AND event_timestamp >= Timestamp_sub(TIMESTAMP '{{ minTimestamp }}', interval {{ featureSet.maxAge }} second){% endif %} ) USING ({{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, created_timestamp, {{ featureSet.entities | join(', ')}}) WHERE is_entity_table ) diff --git a/tests/e2e/bq-batch-retrieval.py b/tests/e2e/bq-batch-retrieval.py index 8616dd37a92..57205b4fd4f 100644 --- a/tests/e2e/bq-batch-retrieval.py +++ b/tests/e2e/bq-batch-retrieval.py @@ -118,6 +118,14 @@ def test_apply_all_featuresets(client): client.apply(fs1) client.apply(fs2) + no_max_age_fs = FeatureSet( + "no_max_age", + features=[Feature("feature_value8", ValueType.INT64)], + entities=[Entity("entity_id", ValueType.INT64)], + max_age=Duration(seconds=0), + ) + client.apply(no_max_age_fs) + def test_get_batch_features_with_file(client): file_fs1 = client.get_feature_set(name="file_feature_set", version=1) @@ -327,3 +335,28 @@ def test_multiple_featureset_joins(client): assert output["entity_id"].to_list() == [int(i) for i in output["feature_value6"].to_list()] assert output["other_entity_id"].to_list() == output["other_feature_value7"].to_list() + + +def test_no_max_age(client): + no_max_age_fs = client.get_feature_set(name="no_max_age", version=1) + + time_offset = datetime.utcnow().replace(tzinfo=pytz.utc) + N_ROWS = 10 + features_8_df = pd.DataFrame( + { + "datetime": [time_offset] * N_ROWS, + "entity_id": [i for i in range(N_ROWS)], + "feature_value8": [i for i in range(N_ROWS)], + } + ) + client.ingest(no_max_age_fs, features_8_df) + + time.sleep(15) + feature_retrieval_job = client.get_batch_features( + entity_rows=features_8_df[["datetime", "entity_id"]], feature_refs=[f"{PROJECT_NAME}/feature_value8:1"] + ) + + output = feature_retrieval_job.to_dataframe() + print(output.head()) + + assert output["entity_id"].to_list() == output["feature_value1"].to_list() \ No newline at end of file From 9f071e591cce3e78f2e49cb238809a275ce4d208 Mon Sep 17 00:00:00 2001 From: zhilingc Date: Tue, 28 Jan 2020 10:49:44 +0800 Subject: [PATCH 2/2] Fix typo in test assertion --- tests/e2e/bq-batch-retrieval.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/e2e/bq-batch-retrieval.py b/tests/e2e/bq-batch-retrieval.py index 57205b4fd4f..0cf05e77e1d 100644 --- a/tests/e2e/bq-batch-retrieval.py +++ b/tests/e2e/bq-batch-retrieval.py @@ -359,4 +359,4 @@ def test_no_max_age(client): output = feature_retrieval_job.to_dataframe() print(output.head()) - assert output["entity_id"].to_list() == output["feature_value1"].to_list() \ No newline at end of file + assert output["entity_id"].to_list() == output["feature_value8"].to_list() \ No newline at end of file