Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Make public API more similar to generated clients #56

Merged
merged 8 commits into from
Nov 5, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@

from google.cloud.pubsub_v1.subscriber.message import Message

from google.cloud.pubsublite.cloudpubsub.subscriber import AsyncSubscriber
from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import (
AsyncSingleSubscriber,
)
from google.cloud.pubsublite.internal.wait_ignore_cancelled import wait_ignore_cancelled
from google.cloud.pubsublite.internal.wire.assigner import Assigner
from google.cloud.pubsublite.internal.wire.permanent_failable import PermanentFailable
from google.cloud.pubsublite.types import Partition

PartitionSubscriberFactory = Callable[[Partition], AsyncSubscriber]
PartitionSubscriberFactory = Callable[[Partition], AsyncSingleSubscriber]


class _RunningSubscriber(NamedTuple):
subscriber: AsyncSubscriber
subscriber: AsyncSingleSubscriber
poller: Future


class AssigningSubscriber(AsyncSubscriber, PermanentFailable):
class AssigningSingleSubscriber(AsyncSingleSubscriber, PermanentFailable):
_assigner_factory: Callable[[], Assigner]
_subscriber_factory: PartitionSubscriberFactory

Expand Down Expand Up @@ -47,7 +49,7 @@ def __init__(
async def read(self) -> Message:
return await self.await_unless_failed(self._messages.get())

async def _subscribe_action(self, subscriber: AsyncSubscriber):
async def _subscribe_action(self, subscriber: AsyncSingleSubscriber):
message = await subscriber.read()
await self._messages.put(message)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
from google.cloud.pubsublite.cloudpubsub.message_transforms import (
from_cps_publish_message,
)
from google.cloud.pubsublite.cloudpubsub.publisher import AsyncPublisher
from google.cloud.pubsublite.cloudpubsub.internal.single_publisher import (
AsyncSinglePublisher,
)
from google.cloud.pubsublite.internal.wire.publisher import Publisher


class AsyncPublisherImpl(AsyncPublisher):
class AsyncSinglePublisherImpl(AsyncSinglePublisher):
_publisher_factory: Callable[[], Publisher]
_publisher: Optional[Publisher]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@
from google.cloud.pubsub_v1.types import BatchSettings

from google.cloud.pubsublite.cloudpubsub.internal.async_publisher_impl import (
AsyncPublisherImpl,
AsyncSinglePublisherImpl,
)
from google.cloud.pubsublite.cloudpubsub.internal.publisher_impl import (
SinglePublisherImpl,
)
from google.cloud.pubsublite.cloudpubsub.internal.single_publisher import (
AsyncSinglePublisher,
SinglePublisher,
)
from google.cloud.pubsublite.cloudpubsub.internal.publisher_impl import PublisherImpl
from google.cloud.pubsublite.cloudpubsub.publisher import AsyncPublisher, Publisher
from google.cloud.pubsublite.internal.wire.make_publisher import (
make_publisher as make_wire_publisher,
DEFAULT_BATCHING_SETTINGS as WIRE_DEFAULT_BATCHING,
Expand All @@ -23,16 +28,18 @@

def make_async_publisher(
topic: TopicPath,
transport: str,
per_partition_batching_settings: Optional[BatchSettings] = None,
credentials: Optional[Credentials] = None,
client_options: Optional[ClientOptions] = None,
metadata: Optional[Mapping[str, str]] = None,
) -> AsyncPublisher:
) -> AsyncSinglePublisher:
"""
Make a new publisher for the given topic.

Args:
topic: The topic to publish to.
transport: The transport type to use.
per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases.
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
Expand All @@ -55,21 +62,23 @@ def underlying_factory():
metadata,
)

return AsyncPublisherImpl(underlying_factory)
return AsyncSinglePublisherImpl(underlying_factory)


def make_publisher(
topic: TopicPath,
transport: str,
per_partition_batching_settings: Optional[BatchSettings] = None,
credentials: Optional[Credentials] = None,
client_options: Optional[ClientOptions] = None,
metadata: Optional[Mapping[str, str]] = None,
) -> Publisher:
) -> SinglePublisher:
"""
Make a new publisher for the given topic.

Args:
topic: The topic to publish to.
transport: The transport type to use.
per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases.
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
Expand All @@ -81,7 +90,7 @@ def make_publisher(
Throws:
GoogleApiCallException on any error determining topic structure.
"""
return PublisherImpl(
return SinglePublisherImpl(
make_async_publisher(
topic,
per_partition_batching_settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@
from google.api_core.client_options import ClientOptions
from google.auth.credentials import Credentials
from google.cloud.pubsub_v1.subscriber.futures import StreamingPullFuture

from google.cloud.pubsublite.cloudpubsub.subscriber_client_interface import (
MessageCallback,
)
from google.cloud.pubsublite.types import FlowControlSettings
from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker_impl import (
AckSetTrackerImpl,
)
from google.cloud.pubsublite.cloudpubsub.internal.assigning_subscriber import (
PartitionSubscriberFactory,
AssigningSubscriber,
AssigningSingleSubscriber,
)
from google.cloud.pubsublite.cloudpubsub.internal.single_partition_subscriber import (
SinglePartitionSubscriber,
SinglePartitionSingleSubscriber,
)
import google.cloud.pubsublite.cloudpubsub.internal.subscriber_impl as cps_subscriber
from google.cloud.pubsublite.cloudpubsub.message_transformer import (
Expand All @@ -25,9 +29,8 @@
NackHandler,
DefaultNackHandler,
)
from google.cloud.pubsublite.cloudpubsub.subscriber import (
AsyncSubscriber,
MessageCallback,
from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import (
AsyncSingleSubscriber,
)
from google.cloud.pubsublite.internal.endpoints import regional_endpoint
from google.cloud.pubsublite.internal.wire.assigner import Assigner
Expand Down Expand Up @@ -67,12 +70,15 @@

def _make_dynamic_assigner(
subscription: SubscriptionPath,
assignment_client: PartitionAssignmentServiceAsyncClient,
transport: str,
client_options: ClientOptions,
credentials: Optional[Credentials],
base_metadata: Optional[Mapping[str, str]],
) -> Assigner:
def assignment_connection_factory(
requests: AsyncIterator[PartitionAssignmentRequest],
):
assignment_client = PartitionAssignmentServiceAsyncClient(credentials=credentials, transport=transport, client_options=client_options) # type: ignore
return assignment_client.assign_partitions(
requests, metadata=list(base_metadata.items())
)
Expand All @@ -87,18 +93,19 @@ def assignment_connection_factory(

def _make_partition_subscriber_factory(
subscription: SubscriptionPath,
transport: str,
client_options: ClientOptions,
credentials: Optional[Credentials],
base_metadata: Optional[Mapping[str, str]],
flow_control_settings: FlowControlSettings,
nack_handler: NackHandler,
message_transformer: MessageTransformer,
) -> PartitionSubscriberFactory:
def factory(partition: Partition) -> AsyncSubscriber:
def factory(partition: Partition) -> AsyncSingleSubscriber:
subscribe_client = SubscriberServiceAsyncClient(
credentials=credentials, client_options=client_options
credentials=credentials, client_options=client_options, transport=transport
) # type: ignore
cursor_client = CursorServiceAsyncClient(credentials=credentials, client_options=client_options) # type: ignore
cursor_client = CursorServiceAsyncClient(credentials=credentials, client_options=client_options, transport=transport) # type: ignore
final_metadata = merge_metadata(
base_metadata, subscription_routing_metadata(subscription, partition)
)
Expand Down Expand Up @@ -130,7 +137,7 @@ def cursor_connection_factory(
GapicConnectionFactory(cursor_connection_factory),
)
ack_set_tracker = AckSetTrackerImpl(committer)
return SinglePartitionSubscriber(
return SinglePartitionSingleSubscriber(
subscriber,
flow_control_settings,
ack_set_tracker,
Expand All @@ -143,19 +150,21 @@ def cursor_connection_factory(

def make_async_subscriber(
subscription: SubscriptionPath,
transport: str,
per_partition_flow_control_settings: FlowControlSettings,
nack_handler: Optional[NackHandler] = None,
message_transformer: Optional[MessageTransformer] = None,
fixed_partitions: Optional[Set[Partition]] = None,
credentials: Optional[Credentials] = None,
client_options: Optional[ClientOptions] = None,
metadata: Optional[Mapping[str, str]] = None,
) -> AsyncSubscriber:
) -> AsyncSingleSubscriber:
"""
Make a Pub/Sub Lite AsyncSubscriber.

Args:
subscription: The subscription to subscribe to.
transport: The transport type to use.
per_partition_flow_control_settings: The flow control settings for each partition subscribed to. Note that these
settings apply to each partition individually, not in aggregate.
nack_handler: An optional handler for when nack() is called on a Message. The default will fail the client.
Expand All @@ -178,11 +187,7 @@ def make_async_subscriber(
assigner_factory = lambda: FixedSetAssigner(fixed_partitions) # noqa: E731
else:
assigner_factory = lambda: _make_dynamic_assigner( # noqa: E731
subscription,
PartitionAssignmentServiceAsyncClient(
credentials=credentials, client_options=client_options
),
metadata,
subscription, transport, client_options, credentials, metadata,
)

if nack_handler is None:
Expand All @@ -191,18 +196,20 @@ def make_async_subscriber(
message_transformer = DefaultMessageTransformer()
partition_subscriber_factory = _make_partition_subscriber_factory(
subscription,
transport,
client_options,
credentials,
metadata,
per_partition_flow_control_settings,
nack_handler,
message_transformer,
)
return AssigningSubscriber(assigner_factory, partition_subscriber_factory)
return AssigningSingleSubscriber(assigner_factory, partition_subscriber_factory)


def make_subscriber(
subscription: SubscriptionPath,
transport: str,
per_partition_flow_control_settings: FlowControlSettings,
callback: MessageCallback,
nack_handler: Optional[NackHandler] = None,
Expand All @@ -218,6 +225,7 @@ def make_subscriber(

Args:
subscription: The subscription to subscribe to.
transport: The transport type to use.
per_partition_flow_control_settings: The flow control settings for each partition subscribed to. Note that these
settings apply to each partition individually, not in aggregate.
callback: The callback to call with each message.
Expand All @@ -236,6 +244,7 @@ def make_subscriber(
"""
underlying = make_async_subscriber(
subscription,
transport,
per_partition_flow_control_settings,
nack_handler,
message_transformer,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from typing import Callable, Dict, Union, Mapping

from google.api_core.exceptions import GoogleAPICallError

from google.cloud.pubsublite.cloudpubsub.internal.single_publisher import (
AsyncSinglePublisher,
)
from google.cloud.pubsublite.cloudpubsub.publisher_client_interface import (
AsyncPublisherClientInterface,
)
from google.cloud.pubsublite.types import TopicPath


AsyncPublisherFactory = Callable[[TopicPath], AsyncSinglePublisher]


class MultiplexedAsyncPublisherClient(AsyncPublisherClientInterface):
_publisher_factory: AsyncPublisherFactory
_live_publishers: Dict[TopicPath, AsyncSinglePublisher]

def __init__(self, publisher_factory: AsyncPublisherFactory):
self._publisher_factory = publisher_factory
self._live_publishers = {}

async def publish(
self,
topic: Union[TopicPath, str],
data: bytes,
ordering_key: str = "",
**attrs: Mapping[str, str]
) -> str:
if isinstance(topic, str):
topic = TopicPath.parse(topic)
publisher: AsyncSinglePublisher
if topic not in self._live_publishers:
publisher = self._publisher_factory(topic)
self._live_publishers[topic] = publisher
await publisher.__aenter__()
publisher = self._live_publishers[topic]
try:
return await publisher.publish(
data=data, ordering_key=ordering_key, **attrs
)
except GoogleAPICallError as e:
self._on_failure(topic, publisher)
raise e

def _on_failure(self, topic: TopicPath, publisher: AsyncSinglePublisher):
if topic not in self._live_publishers:
return
current_publisher = self._live_publishers[topic]
if current_publisher is not publisher:
return
del self._live_publishers[topic]

await publisher.__aexit__(None, None, None)
dpcollins-google marked this conversation as resolved.
Show resolved Hide resolved

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_value, traceback):
live_publishers = self._live_publishers
self._live_publishers = {}
for topic, pub in live_publishers.items():
await pub.__aexit__(exc_type, exc_value, traceback)