From 550da19ca0cc30b98c31ed7d93960f97cbe9eb55 Mon Sep 17 00:00:00 2001 From: David Heryanto Date: Tue, 7 Jan 2020 12:12:24 +0800 Subject: [PATCH] Add missing wait time after running feast apply Otherwise ingested data will not be processed because Beam job is not ready --- tests/e2e/bq-batch-retrieval.py | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/tests/e2e/bq-batch-retrieval.py b/tests/e2e/bq-batch-retrieval.py index 6b654f9a70e..d7a717bed3b 100644 --- a/tests/e2e/bq-batch-retrieval.py +++ b/tests/e2e/bq-batch-retrieval.py @@ -17,7 +17,12 @@ from google.protobuf.duration_pb2 import Duration from pandavro import to_avro -pd.set_option('display.max_columns', None) +pd.set_option("display.max_columns", None) + +# How long we should wait for the Beam job to be ready for ingestion after running `feast apply`. +# When using DirectRunner, 20 seconds is a reasonable time from past observations. +WAIT_TIME_IN_SECONDS_FOR_FEAST_APPLY = 20 + @pytest.fixture(scope="module") def core_url(pytestconfig): @@ -62,6 +67,7 @@ def test_get_batch_features_with_file(client): ) client.apply(file_fs1) + time.sleep(WAIT_TIME_IN_SECONDS_FOR_FEAST_APPLY) file_fs1 = client.get_feature_set(name="file_feature_set", version=1) N_ROWS = 10 @@ -99,6 +105,7 @@ def test_get_batch_features_with_gs_path(client, gcs_path): ) client.apply(gcs_fs1) + time.sleep(WAIT_TIME_IN_SECONDS_FOR_FEAST_APPLY) gcs_fs1 = client.get_feature_set(name="gcs_feature_set", version=1) N_ROWS = 10 @@ -149,7 +156,7 @@ def test_order_by_creation_time(client): max_age=Duration(seconds=100), ) client.apply(proc_time_fs) - time.sleep(10) + time.sleep(WAIT_TIME_IN_SECONDS_FOR_FEAST_APPLY) proc_time_fs = client.get_feature_set(name="processing_time", version=1) time_offset = datetime.utcnow().replace(tzinfo=pytz.utc) @@ -188,13 +195,17 @@ def test_additional_columns_in_entity_table(client): max_age=Duration(seconds=100), ) client.apply(add_cols_fs) - time.sleep(10) + time.sleep(WAIT_TIME_IN_SECONDS_FOR_FEAST_APPLY) add_cols_fs = client.get_feature_set(name="additional_columns", version=1) N_ROWS = 10 time_offset = datetime.utcnow().replace(tzinfo=pytz.utc) features_df = pd.DataFrame( - {"datetime": [time_offset] * N_ROWS, "entity_id": [i for i in range(N_ROWS)], "feature_value": ["abc"] * N_ROWS} + { + "datetime": [time_offset] * N_ROWS, + "entity_id": [i for i in range(N_ROWS)], + "feature_value": ["abc"] * N_ROWS, + } ) client.ingest(add_cols_fs, features_df) @@ -225,7 +236,7 @@ def test_point_in_time_correctness_join(client): max_age=Duration(seconds=100), ) client.apply(historical_fs) - time.sleep(10) + time.sleep(WAIT_TIME_IN_SECONDS_FOR_FEAST_APPLY) historical_fs = client.get_feature_set(name="historical", version=1) time_offset = datetime.utcnow().replace(tzinfo=pytz.utc) @@ -271,11 +282,11 @@ def test_multiple_featureset_joins(client): ) client.apply(fs1) - time.sleep(10) + time.sleep(WAIT_TIME_IN_SECONDS_FOR_FEAST_APPLY) fs1 = client.get_feature_set(name="feature_set_1", version=1) client.apply(fs2) - time.sleep(10) + time.sleep(WAIT_TIME_IN_SECONDS_FOR_FEAST_APPLY) fs2 = client.get_feature_set(name="feature_set_2", version=1) N_ROWS = 10