Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions examples/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import asyncio
from collections.abc import AsyncGenerator

from connect.connect import StreamRequest, UnaryRequest, ensure_single
from connect.connect import StreamRequest, UnaryRequest
from connect.connection_pool import AsyncConnectionPool

from proto.connectrpc.eliza.v1.eliza_pb2 import IntroduceRequest, ReflectRequest, SayRequest
Expand All @@ -21,11 +21,7 @@ async def run_unary(client: ElizaServiceClient) -> None:

async def run_server_streaming(client: ElizaServiceClient) -> None:
"""Run server streaming RPC (Introduce)."""

async def request_generator() -> AsyncGenerator[IntroduceRequest]:
yield IntroduceRequest(name="Alice")

request = StreamRequest(request_generator())
request = StreamRequest(IntroduceRequest(name="Alice"))

message_count = 1
async with client.Introduce(request) as response:
Expand All @@ -43,7 +39,7 @@ async def request_generator() -> AsyncGenerator[ReflectRequest]:

request = StreamRequest(request_generator())
async with client.Reflect(request) as response:
message = await ensure_single(response.messages)
message = await response.single()

print(f"Final response: {message.sentence}")

Expand Down
4 changes: 2 additions & 2 deletions examples/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import hypercorn
import hypercorn.asyncio
from connect.connect import StreamRequest, StreamResponse, UnaryRequest, UnaryResponse, ensure_single
from connect.connect import StreamRequest, StreamResponse, UnaryRequest, UnaryResponse
from connect.handler_context import HandlerContext
from connect.middleware import ConnectMiddleware
from starlette.applications import Starlette
Expand Down Expand Up @@ -36,7 +36,7 @@ async def Introduce(
self, request: StreamRequest[IntroduceRequest], _context: HandlerContext
) -> StreamResponse[IntroduceResponse]:
"""Introduce the Eliza service."""
message = await ensure_single(request.messages)
message = await request.single()
name = message.name
intros = eliza.get_intro_responses(name)

Expand Down
8 changes: 4 additions & 4 deletions src/connect/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
StreamType,
UnaryRequest,
UnaryResponse,
recieve_stream_response,
recieve_unary_response,
receive_stream_response,
receive_unary_response,
)
from connect.connection_pool import AsyncConnectionPool
from connect.error import ConnectError
Expand Down Expand Up @@ -244,7 +244,7 @@ def on_request_send(r: httpcore.Request) -> None:

await conn.send(aiterate([request.message]), call_options.timeout, abort_event=call_options.abort_event)

response = await recieve_unary_response(conn=conn, t=output, abort_event=call_options.abort_event)
response = await receive_unary_response(conn=conn, t=output, abort_event=call_options.abort_event)
return response

unary_func = apply_interceptors(_unary_func, options.interceptors)
Expand Down Expand Up @@ -290,7 +290,7 @@ def on_request_send(r: httpcore.Request) -> None:

await conn.send(request.messages, call_options.timeout, call_options.abort_event)

response = await recieve_stream_response(conn, output, request.spec, call_options.abort_event)
response = await receive_stream_response(conn, output, request.spec, call_options.abort_event)
return response

stream_func = apply_interceptors(_stream_func, options.interceptors)
Expand Down
170 changes: 100 additions & 70 deletions src/connect/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,22 @@ class Peer(BaseModel):


class RequestCommon:
"""RequestCommon is a class that encapsulates common attributes and methods for handling HTTP requests.
"""A common base class for handling request-related functionality.

Attributes:
_spec (Spec): The specification for the request.
_peer (Peer): The peer information.
_headers (Headers): The request headers.
_method (str): The HTTP method used for the request.
This class encapsulates the common properties and behaviors shared across
different types of requests, including specification details, peer information,
headers, and HTTP method configuration.

Attributes:
_spec (Spec): The specification for the request containing procedure details,
descriptor, stream type, and idempotency level.
_peer (Peer): The peer information including address, protocol, and query parameters.
_headers (Headers): The request headers as a collection of key-value pairs.
_method (str): The HTTP method used for the request (defaults to POST).

The class provides property accessors for all attributes with appropriate getters
and setters where modification is allowed. Default values are provided for all
parameters during initialization to ensure the object is always in a valid state.
"""

_spec: Spec
Expand All @@ -73,17 +81,19 @@ def __init__(
headers: Headers | None = None,
method: str | None = None,
) -> None:
"""Initialize a new Request instance.
"""Initialize a Connect request/response context.

Args:
spec (Spec): The specification for the request.
peer (Peer): The peer information.
headers (Mapping[str, str]): The request headers.
method (str): The HTTP method used for the request.
spec: The RPC specification containing procedure name, descriptor, stream type,
and idempotency level. If None, creates a default Spec with empty procedure,
no descriptor, unary stream type, and idempotent level.
peer: The peer information including address, protocol, and query parameters.
If None, creates a default Peer with no address, empty protocol, and empty query.
headers: HTTP headers for the request/response. If None, creates an empty Headers object.
method: HTTP method to use for the request. If None, defaults to POST.

Returns:
None

"""
self._spec = (
spec
Expand Down Expand Up @@ -138,18 +148,12 @@ def method(self, value: str) -> None:
class StreamRequest[T](RequestCommon):
"""StreamRequest class represents a request that can handle streaming messages.

Attributes:
messages (AsyncIterable[T]): An asynchronous iterable of messages.
_spec (Spec): The specification for the request.
_peer (Peer): The peer information.
_headers (Headers): The request headers.
_method (str): The HTTP method used for the request.

This class provides a unified interface for handling both single and multiple
messages in streaming requests. It automatically determines the appropriate
method based on the stream type and usage context.
"""

_messages: AsyncIterable[T]
# timeout: float | None
# abort_event: asyncio.Event | None = None

def __init__(
self,
Expand All @@ -158,50 +162,53 @@ def __init__(
peer: Peer | None = None,
headers: Headers | None = None,
method: str | None = None,
# timeout: float | None = None,
# abort_event: asyncio.Event | None = None,
) -> None:
"""Initialize a new Request instance.
"""Initialize a new instance.

Args:
content (AsyncIterable[T] | T): The request content, which can be an async iterable or a single message.
spec (Spec): The specification for the request.
peer (Peer): The peer information.
headers (Mapping[str, str]): The request headers.
method (str): The HTTP method used for the request.
timeout (float): The timeout for the request.
abort_event (asyncio.Event): An event to signal request abortion.
content: The content to be processed, either a single item of type T or an async iterable of items.
spec: Optional specification object defining the behavior or configuration.
peer: Optional peer object representing the connection endpoint.
headers: Optional headers dictionary for metadata or configuration.
method: Optional string specifying the method or operation type.

Returns:
None

"""
super().__init__(spec, peer, headers, method)
self._messages = content if isinstance(content, AsyncIterable) else aiterate([content])
# self.timeout = timeout
# self.abort_event = abort_event

@property
def messages(self) -> AsyncIterable[T]:
"""Return the request message."""
"""Return the request messages as an async iterable.

Use this when you expect multiple messages (client streaming, bidi streaming).

Example:
async for message in request.messages:
process(message)
"""
return self._messages

async def single(self) -> T:
"""Return a single message from the request.

class UnaryRequest[T](RequestCommon):
"""UnaryRequest is a class that encapsulates a request with a message, specification, peer, headers, and method.
Use this when you expect exactly one message (server-side handlers for client streaming).
Raises ConnectError if there are zero or multiple messages.

Attributes:
message (Req): The request message.
_spec (Spec): The specification of the request.
_peer (Peer): The peer associated with the request.
_headers (Mapping[str, str]): The headers of the request.
_method (str): The method of the request.
Example:
message = await request.single()
process(message)
"""
return await ensure_single(self._messages)

"""

_message: T
# timeout: float | None
# abort_event: asyncio.Event | None = None
class UnaryRequest[T](RequestCommon):
"""A unary request wrapper that extends RequestCommon functionality.

This class encapsulates a single message/content of type T along with common request
metadata such as specifications, peer information, headers, and HTTP method.
"""

def __init__(
self,
Expand All @@ -210,28 +217,21 @@ def __init__(
peer: Peer | None = None,
headers: Headers | None = None,
method: str | None = None,
# timeout: float | None = None,
# abort_event: asyncio.Event | None = None,
) -> None:
"""Initialize a new Request instance.
"""Initialize a new instance with content and optional parameters.

Args:
content (T): The request message.
spec (Spec): The specification for the request.
peer (Peer): The peer information.
headers (Mapping[str, str]): The request headers.
method (str): The HTTP method used for the request.
timeout (float): The timeout for the request.
abort_event (asyncio.Event): An event to signal request abortion.
content (T): The main content/message to be stored in this instance.
spec (Spec | None, optional): Specification object defining behavior or configuration. Defaults to None.
peer (Peer | None, optional): Peer object representing the remote endpoint or connection. Defaults to None.
headers (Headers | None, optional): HTTP headers or metadata associated with the request/response. Defaults to None.
method (str | None, optional): HTTP method or operation type (e.g., 'GET', 'POST'). Defaults to None.

Returns:
None

"""
super().__init__(spec, peer, headers, method)
self._message = content
# self.timeout = timeout
# self.abort_event = abort_event

@property
def message(self) -> T:
Expand Down Expand Up @@ -293,7 +293,12 @@ def message(self) -> T:


class StreamResponse[T](ResponseCommon):
"""Response class for handling responses."""
"""Response class for handling streaming responses.

This class provides a unified interface for handling both single and multiple
messages from streaming responses. It automatically determines the appropriate
method based on the stream type and usage context.
"""

_messages: AsyncIterable[T]

Expand All @@ -303,15 +308,40 @@ def __init__(
headers: Headers | None = None,
trailers: Headers | None = None,
) -> None:
"""Initialize the response with a message."""
"""Initialize the response with content.

Args:
content: Either a single message or an async iterable of messages
headers: Optional response headers
trailers: Optional response trailers
"""
super().__init__(headers, trailers)
self._messages = content if isinstance(content, AsyncIterable) else aiterate([content])

@property
def messages(self) -> AsyncIterable[T]:
"""Return the response message."""
"""Return the response messages as an async iterable.

Use this when you expect multiple messages (server streaming, bidi streaming).

Example:
async for message in response.messages:
print(message)
"""
return self._messages

async def single(self) -> T:
"""Return a single message from the response.

Use this when you expect exactly one message (client streaming results).
Raises ConnectError if there are zero or multiple messages.

Example:
message = await response.single()
print(message)
"""
return await ensure_single(self._messages)

async def aclose(self) -> None:
"""Asynchronously close the response stream."""
aclose = get_acallable_attribute(self._messages, "aclose")
Expand Down Expand Up @@ -475,8 +505,8 @@ async def send_error(self, error: ConnectError) -> None:
raise NotImplementedError()


class UnaryClientConn:
"""Abstract base class for a streaming client connection."""
class UnaryClientConn(abc.ABC):
"""Abstract base class for a unary client connection."""

@property
@abc.abstractmethod
Expand Down Expand Up @@ -529,7 +559,7 @@ async def aclose(self) -> None:
raise NotImplementedError()


class StreamingClientConn:
class StreamingClientConn(abc.ABC):
"""Abstract base class for a streaming client connection."""

@property
Expand Down Expand Up @@ -645,7 +675,7 @@ async def receive_stream_request[T](conn: StreamingHandlerConn, t: type[T]) -> S
)


async def recieve_unary_response[T](
async def receive_unary_response[T](
conn: StreamingClientConn, t: type[T], abort_event: asyncio.Event | None
) -> UnaryResponse[T]:
"""Receives a unary response message from a streaming client connection.
Expand All @@ -672,7 +702,7 @@ async def recieve_unary_response[T](
return UnaryResponse(message, conn.response_headers, conn.response_trailers)


async def recieve_stream_response[T](
async def receive_stream_response[T](
conn: StreamingClientConn, t: type[T], spec: Spec, abort_event: asyncio.Event | None
) -> StreamResponse[T]:
"""Handle receiving a stream response from a streaming client connection.
Expand All @@ -697,10 +727,10 @@ async def recieve_stream_response[T](

"""
if spec.stream_type == StreamType.ClientStream:
single_message = await ensure_single(conn.receive(t, abort_event), conn.aclose)
single_message = await ensure_single(conn.receive(t, abort_event))

return StreamResponse(
AsyncDataStream[T](aiterate([single_message])), conn.response_headers, conn.response_trailers
AsyncDataStream[T](aiterate([single_message]), conn.aclose), conn.response_headers, conn.response_trailers
)
else:
return StreamResponse(
Expand Down
Loading