diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 01c9ed83be..c39c696d99 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -836,11 +836,33 @@ def ingest( Returns: str: ingestion id for this dataset + + Examples: + >>> from feast import Client + >>> + >>> client = Client(core_url="localhost:6565") + >>> fs_df = pd.DataFrame( + >>> { + >>> "datetime": [pd.datetime.now()], + >>> "driver": [1001], + >>> "rating": [4.3], + >>> } + >>> ) + >>> client.set_project("project1") + >>> client.ingest("driver", fs_df) + >>> + >>> driver_fs = client.get_feature_set(name="driver", project="project1") + >>> client.ingest(driver_fs, fs_df) """ if isinstance(feature_set, FeatureSet): name = feature_set.name + project = feature_set.project elif isinstance(feature_set, str): + if self.project is not None: + project = self.project + else: + project = "default" name = feature_set else: raise Exception("Feature set name must be provided") @@ -858,7 +880,9 @@ def ingest( while True: if timeout is not None and time.time() - current_time >= timeout: raise TimeoutError("Timed out waiting for feature set to be ready") - fetched_feature_set: Optional[FeatureSet] = self.get_feature_set(name) + fetched_feature_set: Optional[FeatureSet] = self.get_feature_set( + name, project + ) if ( fetched_feature_set is not None and fetched_feature_set.status == FeatureSetStatus.STATUS_READY diff --git a/tests/e2e/redis/basic-ingest-redis-serving.py b/tests/e2e/redis/basic-ingest-redis-serving.py index 5bb79bc8e8..54776f33e1 100644 --- a/tests/e2e/redis/basic-ingest-redis-serving.py +++ b/tests/e2e/redis/basic-ingest-redis-serving.py @@ -558,6 +558,112 @@ def try_get_features2(): ) +@pytest.mark.timeout(600) +@pytest.mark.run(order=16) +def test_basic_ingest_retrieval_fs(client): + # Set to another project to test ingestion based on current project context + client.set_project(PROJECT_NAME + "_NS1") + driver_fs = FeatureSet( + name="driver_fs", + features=[ + Feature(name="driver_fs_rating", dtype=ValueType.FLOAT), + Feature(name="driver_fs_cost", dtype=ValueType.FLOAT), + ], + entities=[Entity("driver_fs_id", ValueType.INT64)], + max_age=Duration(seconds=3600), + ) + client.apply(driver_fs) + + N_ROWS = 2 + time_offset = datetime.utcnow().replace(tzinfo=pytz.utc) + driver_df = pd.DataFrame( + { + "datetime": [time_offset] * N_ROWS, + "driver_fs_id": [i for i in range(N_ROWS)], + "driver_fs_rating": [float(i) for i in range(N_ROWS)], + "driver_fs_cost": [float(i) + 0.5 for i in range(N_ROWS)], + } + ) + client.ingest(driver_fs, driver_df, timeout=600) + time.sleep(15) + + online_request_entity = [{"driver_fs_id": 0}, {"driver_fs_id": 1}] + online_request_features = ["driver_fs_rating", "driver_fs_cost"] + + def try_get_features(): + response = client.get_online_features( + entity_rows=online_request_entity, feature_refs=online_request_features + ) + return response, True + + online_features_actual = wait_retry_backoff( + retry_fn=try_get_features, + timeout_secs=90, + timeout_msg="Timed out trying to get online feature values", + ) + + online_features_expected = { + "driver_fs_id": [0, 1], + "driver_fs_rating": [0.0, 1.0], + "driver_fs_cost": [0.5, 1.5], + } + + assert online_features_actual.to_dict() == online_features_expected + + +@pytest.mark.timeout(600) +@pytest.mark.run(order=17) +def test_basic_ingest_retrieval_str(client): + # Set to another project to test ingestion based on current project context + client.set_project(PROJECT_NAME + "_NS1") + customer_fs = FeatureSet( + name="cust_fs", + features=[ + Feature(name="cust_rating", dtype=ValueType.INT64), + Feature(name="cust_cost", dtype=ValueType.FLOAT), + ], + entities=[Entity("cust_id", ValueType.INT64)], + max_age=Duration(seconds=3600), + ) + client.apply(customer_fs) + + N_ROWS = 2 + time_offset = datetime.utcnow().replace(tzinfo=pytz.utc) + cust_df = pd.DataFrame( + { + "datetime": [time_offset] * N_ROWS, + "cust_id": [i for i in range(N_ROWS)], + "cust_rating": [i for i in range(N_ROWS)], + "cust_cost": [float(i) + 0.5 for i in range(N_ROWS)], + } + ) + client.ingest("cust_fs", cust_df, timeout=600) + time.sleep(15) + + online_request_entity = [{"cust_id": 0}, {"cust_id": 1}] + online_request_features = ["cust_rating", "cust_cost"] + + def try_get_features(): + response = client.get_online_features( + entity_rows=online_request_entity, feature_refs=online_request_features + ) + return response, True + + online_features_actual = wait_retry_backoff( + retry_fn=try_get_features, + timeout_secs=90, + timeout_msg="Timed out trying to get online feature values", + ) + + online_features_expected = { + "cust_id": [0, 1], + "cust_rating": [0, 1], + "cust_cost": [0.5, 1.5], + } + + assert online_features_actual.to_dict() == online_features_expected + + @pytest.fixture(scope="module") def all_types_dataframe(): return pd.DataFrame(