Skip to content

Commit

Permalink
feat: add grpc endpoint (#2232)
Browse files Browse the repository at this point in the history
Co-authored-by: Xander Song <axiomofjoy@gmail.com>
Co-authored-by: Roger Yang <roger.yang@arize.com>
  • Loading branch information
axiomofjoy and RogerHYang committed Apr 29, 2024
1 parent e5698d7 commit 8bbd136
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 29 deletions.
13 changes: 8 additions & 5 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# This Dockerfile is provided for convenience if you wish to run Phoenix in a
# container or sidecar. To build the image, run the following commmand:
#
#
# > docker build -t phoenix
#
# You can then run the image in the background with:
Expand All @@ -10,7 +10,7 @@
# or in the foreground with:
#
# > docker run -it -p 6006:6006 phoenix
#
#
# How are you using Phoenix in production? Let us know!
#
# To get support or provide feedback, contact the team in the #phoenix-support
Expand All @@ -35,7 +35,7 @@ WORKDIR /phoenix
COPY ./ /phoenix/
COPY --from=frontend-builder /phoenix/src/phoenix/server/static/ /phoenix/src/phoenix/server/static/
# Delete symbolic links used during development.
RUN find src/ -xtype l -delete
RUN find src/ -xtype l -delete
RUN pip install --target ./env ".[container, pg]"

# The production image is distroless, meaning that it is a minimal image that
Expand All @@ -54,9 +54,12 @@ FROM gcr.io/distroless/python3-debian12
WORKDIR /phoenix
COPY --from=backend-builder /phoenix/env/ ./env
ENV PYTHONPATH="/phoenix/env:$PYTHONPATH"
# Export the Phoenix port.
ENV PYTHONUNBUFFERED=1
# Expose the Phoenix port.
EXPOSE 6006
# Export the Prometheus port.
# Expose the Phoenix GRPC port.
EXPOSE 4317
# Expose the Prometheus port.
EXPOSE 9090
# Run the Phoenix server. Note that the ENTRYPOINT of the base image invokes
# Python, so no explicit invocation of Python is needed here. See
Expand Down
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ dependencies = [
"wrapt",
"sortedcontainers",
"protobuf>=3.20, <6.0",
"grpcio",
"ddsketch",
"tqdm",
"requests",
Expand Down Expand Up @@ -106,6 +107,7 @@ container = [
"opentelemetry-semantic-conventions",
"opentelemetry-instrumentation-starlette",
"opentelemetry-instrumentation-sqlalchemy",
"opentelemetry-instrumentation-grpc",
"strawberry-graphql[opentelemetry]==0.227.2", # need to pin version because we're monkey-patching
]

Expand Down Expand Up @@ -172,12 +174,14 @@ dependencies = [
"openai>=1.0.0",
"litellm>=1.0.3",
"prometheus_client",
"grpcio",
"opentelemetry-sdk",
"opentelemetry-proto>=1.12.0",
"opentelemetry-exporter-otlp",
"opentelemetry-semantic-conventions",
"opentelemetry-instrumentation-starlette",
"opentelemetry-instrumentation-sqlalchemy",
"opentelemetry-instrumentation-grpc",
"strawberry-graphql[opentelemetry]==0.227.2", # need to pin version because we're monkey-patching
]

Expand Down Expand Up @@ -344,6 +348,7 @@ module = [
"opentelemetry.*",
"pyarrow",
"sqlean",
"grpc.*",
]
ignore_missing_imports = true

Expand Down
35 changes: 27 additions & 8 deletions src/phoenix/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

# Phoenix environment variables
ENV_PHOENIX_PORT = "PHOENIX_PORT"
ENV_PHOENIX_GRPC_PORT = "PHOENIX_GRPC_PORT"
ENV_PHOENIX_HOST = "PHOENIX_HOST"
ENV_NOTEBOOK_ENV = "PHOENIX_NOTEBOOK_ENV"
ENV_PHOENIX_COLLECTOR_ENDPOINT = "PHOENIX_COLLECTOR_ENDPOINT"
Expand Down Expand Up @@ -101,14 +102,18 @@ def get_storage_dir() -> Path:
PHOENIX_DIR = Path(__file__).resolve().parent
# Server config
SERVER_DIR = PHOENIX_DIR / "server"
# The host the server will run on after launch_app is called
HOST = "0.0.0.0"
# The port the server will run on after launch_app is called
"""The host the server will run on after launch_app is called."""
PORT = 6006
# The prefix of datasets that are auto-assigned a name
"""The port the server will run on after launch_app is called."""
GRPC_PORT = 4317
"""The port the gRPC server will run on after launch_app is called.
The default network port for OTLP/gRPC is 4317.
See https://opentelemetry.io/docs/specs/otlp/#otlpgrpc-default-port"""
GENERATED_DATASET_NAME_PREFIX = "phoenix_dataset_"
# The work directory for saving, loading, and exporting datasets
"""The prefix of datasets that are auto-assigned a name."""
WORKING_DIR = get_working_dir()
"""The work directory for saving, loading, and exporting datasets."""

ROOT_DIR = WORKING_DIR
EXPORT_DIR = ROOT_DIR / "exports"
Expand Down Expand Up @@ -161,10 +166,24 @@ def get_exported_files(directory: Path) -> List[Path]:


def get_env_port() -> int:
return (
int(port)
if isinstance(port := os.getenv(ENV_PHOENIX_PORT), str) and port.isnumeric()
else PORT
if not (port := os.getenv(ENV_PHOENIX_PORT)):
return PORT
if port.isnumeric():
return int(port)
raise ValueError(
f"Invalid value for environment variable {ENV_PHOENIX_PORT}: "
f"{port}. Value must be an integer."
)


def get_env_grpc_port() -> int:
if not (port := os.getenv(ENV_PHOENIX_GRPC_PORT)):
return GRPC_PORT
if port.isnumeric():
return int(port)
raise ValueError(
f"Invalid value for environment variable {ENV_PHOENIX_GRPC_PORT}: "
f"{port}. Value must be an integer."
)


Expand Down
7 changes: 4 additions & 3 deletions src/phoenix/db/bulk_inserter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import (
Any,
AsyncContextManager,
Awaitable,
Callable,
Iterable,
List,
Expand Down Expand Up @@ -70,18 +71,18 @@ def last_inserted_at(self) -> Optional[datetime]:

async def __aenter__(
self,
) -> Tuple[Callable[[Span, str], None], Callable[[pb.Evaluation], None]]:
) -> Tuple[Callable[[Span, str], Awaitable[None]], Callable[[pb.Evaluation], Awaitable[None]]]:
self._running = True
self._task = asyncio.create_task(self._bulk_insert())
return self._queue_span, self._queue_evaluation

async def __aexit__(self, *args: Any) -> None:
self._running = False

def _queue_span(self, span: Span, project_name: str) -> None:
async def _queue_span(self, span: Span, project_name: str) -> None:
self._spans.append((span, project_name))

def _queue_evaluation(self, evaluation: pb.Evaluation) -> None:
async def _queue_evaluation(self, evaluation: pb.Evaluation) -> None:
self._evaluations.append(evaluation)

async def _bulk_insert(self) -> None:
Expand Down
4 changes: 2 additions & 2 deletions src/phoenix/server/api/routers/v1/evaluations.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ async def post_evaluations(request: Request) -> Response:
"Evaluation name must not be blank/empty",
status_code=HTTP_422_UNPROCESSABLE_ENTITY,
)
request.state.queue_evaluation_for_bulk_insert(evaluation)
await request.state.queue_evaluation_for_bulk_insert(evaluation)
return Response()


Expand Down Expand Up @@ -195,7 +195,7 @@ async def _process_pyarrow(request: Request) -> Response:

async def _add_evaluations(state: State, evaluations: Evaluations) -> None:
for evaluation in encode_evaluations(evaluations):
state.queue_evaluation_for_bulk_insert(evaluation)
await state.queue_evaluation_for_bulk_insert(evaluation)


def _read_sql_trace_evaluations_into_dataframe(
Expand Down
15 changes: 7 additions & 8 deletions src/phoenix/server/api/routers/v1/traces.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import (
ExportTraceServiceRequest,
)
from starlette.background import BackgroundTask
from starlette.datastructures import State
from starlette.requests import Request
from starlette.responses import Response
from starlette.status import (
Expand Down Expand Up @@ -67,16 +69,13 @@ async def post_traces(request: Request) -> Response:
content="Request body is invalid ExportTraceServiceRequest",
status_code=HTTP_422_UNPROCESSABLE_ENTITY,
)
return Response(background=BackgroundTask(_add_spans, req, request.state))


async def _add_spans(req: ExportTraceServiceRequest, state: State) -> None:
for resource_spans in req.resource_spans:
project_name = get_project_name(resource_spans.resource.attributes)
for scope_span in resource_spans.scope_spans:
for otlp_span in scope_span.spans:
span = decode_otlp_span(otlp_span)
# TODO(persistence): Decide which one is better: delayed
# bulk-insert or insert each request immediately, i.e. one
# transaction per request. The bulk-insert is more efficient,
# but it queues data in volatile (buffer) memory (for a short
# period of time), so the 200 response is not a genuine
# confirmation of data persistence.
request.state.queue_span_for_bulk_insert(span, project_name)
return Response()
await state.queue_span_for_bulk_insert(span, project_name)
25 changes: 23 additions & 2 deletions src/phoenix/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Callable,
Dict,
Iterable,
List,
NamedTuple,
Optional,
Tuple,
Expand Down Expand Up @@ -63,9 +64,13 @@
from phoenix.server.api.dataloaders.span_descendants import SpanDescendantsDataLoader
from phoenix.server.api.routers.v1 import V1_ROUTES
from phoenix.server.api.schema import schema
from phoenix.server.grpc_server import GrpcServer
from phoenix.server.telemetry import initialize_opentelemetry_tracer_provider
from phoenix.trace.schemas import Span

if TYPE_CHECKING:
from opentelemetry.trace import TracerProvider

logger = logging.getLogger(__name__)

templates = Jinja2Templates(directory=SERVER_DIR / "templates")
Expand Down Expand Up @@ -205,15 +210,23 @@ async def factory() -> AsyncIterator[AsyncSession]:


def _lifespan(
*,
bulk_inserter: BulkInserter,
tracer_provider: Optional["TracerProvider"] = None,
clean_ups: Iterable[Callable[[], None]] = (),
) -> StatefulLifespan[Starlette]:
@contextlib.asynccontextmanager
async def lifespan(_: Starlette) -> AsyncIterator[Dict[str, Any]]:
async with bulk_inserter as (queue_span, queue_evaluation):
async with bulk_inserter as (queue_span, queue_evaluation), GrpcServer(
queue_span,
tracer_provider=tracer_provider,
):
yield {
"queue_span_for_bulk_insert": queue_span,
"queue_evaluation_for_bulk_insert": queue_evaluation,
}
for clean_up in clean_ups:
clean_up()

return lifespan

Expand All @@ -238,6 +251,7 @@ def create_app(
initial_spans: Optional[Iterable[Union[Span, Tuple[Span, str]]]] = None,
initial_evaluations: Optional[Iterable[pb.Evaluation]] = None,
) -> Starlette:
clean_ups: List[Callable[[], None]] = [] # To be called at app shutdown.
initial_batch_of_spans: Iterable[Tuple[Span, str]] = (
()
if initial_spans is None
Expand Down Expand Up @@ -281,6 +295,7 @@ def create_app(
engine=engine.sync_engine,
tracer_provider=tracer_provider,
)
clean_ups.append(SQLAlchemyInstrumentor().uninstrument)
if TYPE_CHECKING:
# Type-check the class before monkey-patching its private attribute.
assert OpenTelemetryExtension._tracer
Expand Down Expand Up @@ -315,7 +330,11 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
else:
prometheus_middlewares = []
app = Starlette(
lifespan=_lifespan(bulk_inserter),
lifespan=_lifespan(
bulk_inserter=bulk_inserter,
tracer_provider=tracer_provider,
clean_ups=clean_ups,
),
middleware=[
Middleware(HeadersMiddleware),
*prometheus_middlewares,
Expand Down Expand Up @@ -359,5 +378,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
if tracer_provider:
from opentelemetry.instrumentation.starlette import StarletteInstrumentor

StarletteInstrumentor().instrument(tracer_provider=tracer_provider)
StarletteInstrumentor.instrument_app(app, tracer_provider=tracer_provider)
clean_ups.append(StarletteInstrumentor().uninstrument)
return app
77 changes: 77 additions & 0 deletions src/phoenix/server/grpc_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Optional

import grpc
from grpc.aio import RpcContext, Server
from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import (
ExportTraceServiceRequest,
ExportTraceServiceResponse,
)
from opentelemetry.proto.collector.trace.v1.trace_service_pb2_grpc import (
TraceServiceServicer,
add_TraceServiceServicer_to_server,
)
from typing_extensions import TypeAlias

from phoenix.config import get_env_grpc_port
from phoenix.trace.otel import decode_otlp_span
from phoenix.trace.schemas import Span
from phoenix.utilities.project import get_project_name

if TYPE_CHECKING:
from opentelemetry.trace import TracerProvider

ProjectName: TypeAlias = str


class Servicer(TraceServiceServicer):
def __init__(
self,
callback: Callable[[Span, ProjectName], Awaitable[None]],
) -> None:
super().__init__()
self._callback = callback

async def Export(
self,
request: ExportTraceServiceRequest,
context: RpcContext,
) -> ExportTraceServiceResponse:
for resource_spans in request.resource_spans:
project_name = get_project_name(resource_spans.resource.attributes)
for scope_span in resource_spans.scope_spans:
for otlp_span in scope_span.spans:
span = decode_otlp_span(otlp_span)
await self._callback(span, project_name)
return ExportTraceServiceResponse()


class GrpcServer:
def __init__(
self,
callback: Callable[[Span, ProjectName], Awaitable[None]],
tracer_provider: Optional["TracerProvider"] = None,
) -> None:
self._callback = callback
self._server: Optional[Server] = None
self._tracer_provider = tracer_provider

async def __aenter__(self) -> None:
if self._tracer_provider is not None:
from opentelemetry.instrumentation.grpc import GrpcAioInstrumentorServer

GrpcAioInstrumentorServer().instrument(tracer_provider=self._tracer_provider) # type: ignore
server = grpc.aio.server(options=(("grpc.so_reuseport", 0),))
server.add_insecure_port(f"[::]:{get_env_grpc_port()}")
add_TraceServiceServicer_to_server(Servicer(self._callback), server) # type: ignore
await server.start()
self._server = server

async def __aexit__(self, *args: Any, **kwargs: Any) -> None:
if self._server is None:
return
await self._server.stop(5)
self._server = None
if self._tracer_provider is not None:
from opentelemetry.instrumentation.grpc import GrpcAioInstrumentorServer

GrpcAioInstrumentorServer().uninstrument() # type: ignore

0 comments on commit 8bbd136

Please sign in to comment.