From 920147a187a5f283fb03932049ee1fec668de29f Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 27 Feb 2025 03:22:08 +0000 Subject: [PATCH 1/2] Add chat application to feature server Co-Authored-By: Francisco Javier Arceo --- MANIFEST.in | 1 + sdk/python/feast/feature_server.py | 107 +++++++++++++++++++- sdk/python/feast/static/chat/index.html | 129 ++++++++++++++++++++++++ 3 files changed, 236 insertions(+), 1 deletion(-) create mode 100644 sdk/python/feast/static/chat/index.html diff --git a/MANIFEST.in b/MANIFEST.in index c9828633d9d..c43708cdc6f 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -7,3 +7,4 @@ prune examples graft sdk/python/feast/ui/build graft sdk/python/feast/embedded_go/lib +recursive-include sdk/python/feast/static * diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index 023ce7d1115..434efa7e44b 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -1,17 +1,29 @@ +import asyncio +import os import sys import threading import time import traceback from contextlib import asynccontextmanager +from importlib import resources as importlib_resources from typing import Any, Dict, List, Optional import pandas as pd import psutil from dateutil import parser -from fastapi import Depends, FastAPI, Request, Response, status +from fastapi import ( + Depends, + FastAPI, + Request, + Response, + WebSocket, + WebSocketDisconnect, + status, +) from fastapi.concurrency import run_in_threadpool from fastapi.logger import logger from fastapi.responses import JSONResponse +from fastapi.staticfiles import StaticFiles from google.protobuf.json_format import MessageToDict from prometheus_client import Gauge, start_http_server from pydantic import BaseModel @@ -78,6 +90,15 @@ class GetOnlineFeaturesRequest(BaseModel): query_string: Optional[str] = None +class ChatMessage(BaseModel): + role: str + content: str + + +class ChatRequest(BaseModel): + messages: List[ChatMessage] + + def _get_features(request: GetOnlineFeaturesRequest, store: "feast.FeatureStore"): if request.feature_service: feature_service = store.get_feature_service( @@ -113,6 +134,35 @@ def get_app( store: "feast.FeatureStore", registry_ttl_sec: int = DEFAULT_FEATURE_SERVER_REGISTRY_TTL, ): + """ + Creates a FastAPI app that can be used to start a feature server. + + Args: + store: The FeatureStore to use for serving features + registry_ttl_sec: The TTL in seconds for the registry cache + + Returns: + A FastAPI app + + Example: + ```python + from feast import FeatureStore + + store = FeatureStore(repo_path="feature_repo") + app = get_app(store) + ``` + + The app provides the following endpoints: + - `/get-online-features`: Get online features + - `/retrieve-online-documents`: Retrieve online documents + - `/push`: Push features to the feature store + - `/write-to-online-store`: Write to the online store + - `/health`: Health check + - `/materialize`: Materialize features + - `/materialize-incremental`: Materialize features incrementally + - `/chat`: Chat UI + - `/ws/chat`: WebSocket endpoint for chat + """ proto_json.patch() # Asynchronously refresh registry, notifying shutdown and canceling the active timer if the app is shutting down registry_proto = None @@ -297,6 +347,21 @@ async def health(): else Response(status_code=status.HTTP_503_SERVICE_UNAVAILABLE) ) + @app.post("/chat") + async def chat(request: ChatRequest): + # Process the chat request + # For now, just return dummy text + return {"response": "This is a dummy response from the Feast feature server."} + + @app.get("/chat") + async def chat_ui(): + # Serve the chat UI + static_dir_ref = importlib_resources.files(__spec__.parent) / "static/chat" # type: ignore[name-defined, arg-type] + with importlib_resources.as_file(static_dir_ref) as static_dir: + with open(os.path.join(static_dir, "index.html")) as f: + content = f.read() + return Response(content=content, media_type="text/html") + @app.post("/materialize", dependencies=[Depends(inject_user_details)]) def materialize(request: MaterializeRequest) -> None: for feature_view in request.feature_views or []: @@ -337,6 +402,46 @@ async def rest_exception_handler(request: Request, exc: Exception): content=str(exc), ) + # Chat WebSocket connection manager + class ConnectionManager: + def __init__(self): + self.active_connections: List[WebSocket] = [] + + async def connect(self, websocket: WebSocket): + await websocket.accept() + self.active_connections.append(websocket) + + def disconnect(self, websocket: WebSocket): + self.active_connections.remove(websocket) + + async def send_message(self, message: str, websocket: WebSocket): + await websocket.send_text(message) + + manager = ConnectionManager() + + @app.websocket("/ws/chat") + async def websocket_endpoint(websocket: WebSocket): + await manager.connect(websocket) + try: + while True: + message = await websocket.receive_text() + # Process the received message (currently unused but kept for future implementation) + # For now, just return dummy text + response = f"You sent: '{message}'. This is a dummy response from the Feast feature server." + + # Stream the response word by word + words = response.split() + for word in words: + await manager.send_message(word + " ", websocket) + await asyncio.sleep(0.1) # Add a small delay between words + except WebSocketDisconnect: + manager.disconnect(websocket) + + # Mount static files + static_dir_ref = importlib_resources.files(__spec__.parent) / "static" # type: ignore[name-defined, arg-type] + with importlib_resources.as_file(static_dir_ref) as static_dir: + app.mount("/static", StaticFiles(directory=static_dir), name="static") + return app diff --git a/sdk/python/feast/static/chat/index.html b/sdk/python/feast/static/chat/index.html new file mode 100644 index 00000000000..302c3b55b6a --- /dev/null +++ b/sdk/python/feast/static/chat/index.html @@ -0,0 +1,129 @@ + + + + + + Feast Chat + + + +
+
Hello! How can I help you today?
+
+
+ + +
+ + + From 1f4274fa50f12989277abaa2d11921e27f3942ed Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 27 Feb 2025 15:28:24 +0000 Subject: [PATCH 2/2] Fix null check in ui_server.py to prevent SerializeToString error Co-Authored-By: Francisco Javier Arceo --- sdk/python/feast/ui_server.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/python/feast/ui_server.py b/sdk/python/feast/ui_server.py index 1d115920c3a..d852bb279cc 100644 --- a/sdk/python/feast/ui_server.py +++ b/sdk/python/feast/ui_server.py @@ -69,6 +69,8 @@ def shutdown_event(): @app.get("/registry") def read_registry(): + if registry_proto is None: + return Response(status_code=503) # Service Unavailable return Response( content=registry_proto.SerializeToString(), media_type="application/octet-stream",