Skip to content

Commit

Permalink
fixed some things
Browse files Browse the repository at this point in the history
Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>
  • Loading branch information
franciscojavierarceo committed Jun 17, 2024
1 parent 98054cb commit b1e4003
Showing 1 changed file with 41 additions and 59 deletions.
100 changes: 41 additions & 59 deletions sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import traceback
import warnings
from contextlib import asynccontextmanager
from typing import List, Optional, Any, Union
from typing import Any, List, Optional

import pandas as pd
from dateutil import parser
Expand Down Expand Up @@ -47,8 +47,8 @@ class MaterializeIncrementalRequest(BaseModel):


def get_app(
store: "feast.FeatureStore",
registry_ttl_sec: int = DEFAULT_FEATURE_SERVER_REGISTRY_TTL,
store: "feast.FeatureStore",
registry_ttl_sec: int = DEFAULT_FEATURE_SERVER_REGISTRY_TTL,
):
proto_json.patch()
# Asynchronously refresh registry, notifying shutdown and canceling the active timer if the app is shutting down
Expand Down Expand Up @@ -164,14 +164,17 @@ def get_predictions_endpoint(body=Depends(get_body)):

predictions = _get_predictions(
store=store,
features=features,
entity=body["entities"],
model_name=body["model_name"],
inputs=features,
log_features=body["log_features"],
force_recompute=body["force_recompute"],
log_features=body["log_features"],
)

return Response(content=json.dumps(predictions), media_type="application/json")
return Response(
content=json.dumps(predictions), media_type="application/json"
)
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
Expand Down Expand Up @@ -216,7 +219,6 @@ def materialize_incremental(body=Depends(get_body)):
if sys.platform != "win32":
import gunicorn.app.base


class FeastServeApplication(gunicorn.app.base.BaseApplication):
def __init__(self, store: "feast.FeatureStore", **options):
self._app = get_app(
Expand All @@ -238,13 +240,13 @@ def load(self):


def start_server(
store: "feast.FeatureStore",
host: str,
port: int,
no_access_log: bool,
workers: int,
keep_alive_timeout: int,
registry_ttl_sec: int,
store: "feast.FeatureStore",
host: str,
port: int,
no_access_log: bool,
workers: int,
keep_alive_timeout: int,
registry_ttl_sec: int,
):
if sys.platform != "win32":
FeastServeApplication(
Expand All @@ -266,62 +268,42 @@ def _get_features_from_body(store: "feast.FeatureStore", body: Request):
body = json.loads(body)
# Initialize parameters for FeatureStore.get_online_features(...) call
if "feature_service" in body:
features = store.get_feature_service(
body["feature_service"], allow_cache=True
)
features = store.get_feature_service(body["feature_service"], allow_cache=True)
else:
features = body["features"]
return features, body

def _get_predictions(
store: "feast.FeatureStore",
entities: pd.DataFrame,
model_name: str,
model: Any,
inputs: Union[pd.DataFrame, List[dict[str, Any]]],
log_features: bool = True,
force_recompute: bool = True,
) -> Union[float, List[float], List[str]]:
# Convert inputs to DataFrame if necessary
if isinstance(inputs, dict):
df_inputs = pd.DataFrame([inputs])
elif isinstance(inputs, list):
df_inputs = pd.DataFrame(inputs)
else:
raise ValueError("Inputs must be a dictionary or a list of dictionaries")

def _get_predictions(
store: "feast.FeatureStore",
features: str,
entities: pd.DataFrame,
model_name: str,
model: Any,
force_recompute: bool = True,
log_features: bool = True,
) -> pd.DataFrame:
# Get model features
model_features = store.get_feature_service(model_name)
if force_recompute:
# Fetch features from the offline store
features = store.get_online_features(
prediction_df = store.get_online_features(
entity_rows=[entities],
feature_refs=features,
).to_dict()

features = store._get_online_features(
features=features,
entity_values=entities,
full_feature_names=full_feature_names,
).to_dict()
# Merge features with inputs
df_inputs = pd.merge(df_inputs, features, on="entity_id")
score_df = df_inputs[["feature1", "feature2", "feature3"]]
).to_df()
# Predict using the model
predictions = model.predict(score_df)
predictions = model.predict(prediction_df)
prediction_df["predictions"] = predictions
# Log features to the offline store if needed (on computations)
if log_features:
# Assuming store is a global variable or can be accessed here
store.push(
push_source_name=model_features, df=prediction_df, to=PushMode.OFFLINE
)
else:
store.get_online_features(
entity_rows=[entity],
features=[
"model_name:prediction",
]
)

# Log features to the offline store if needed
if log_features:
# Assuming store is a global variable or can be accessed here
store.push(
push_source_name="model_predictions",
df=df_inputs.assign(predictions=predictions.tolist()),
to=PushMode.OFFLINE
prediction_df = store.get_online_features(
entity_rows=[entities],
features=model_features,
)

return predictions.tolist()
return prediction_df

0 comments on commit b1e4003

Please sign in to comment.