Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement make_publisher which creates a routing publisher. #11

Merged
merged 2 commits into from
Aug 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions google/cloud/pubsublite/endpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from google.cloud.pubsublite.location import CloudRegion


def regional_endpoint(region: CloudRegion):
return f"{region}-pubsublite.googleapis.com"
5 changes: 4 additions & 1 deletion google/cloud/pubsublite/internal/wire/gapic_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ class GapicConnectionFactory(ConnectionFactory[Request, Response]):
"""A ConnectionFactory that produces GapicConnections."""
_producer = Callable[[AsyncIterator[Request]], AsyncIterable[Response]]

def New(self) -> Connection[Request, Response]:
def __init__(self, producer: Callable[[AsyncIterator[Request]], AsyncIterable[Response]]):
self._producer = producer

def new(self) -> Connection[Request, Response]:
conn = GapicConnection[Request, Response]()
response_iterable = self._producer(conn)
conn.set_response_it(response_iterable.__aiter__())
Expand Down
62 changes: 62 additions & 0 deletions google/cloud/pubsublite/internal/wire/make_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from typing import AsyncIterator, Mapping, Optional, MutableMapping

from google.cloud.pubsublite.endpoints import regional_endpoint
from google.cloud.pubsublite.internal.wire.default_routing_policy import DefaultRoutingPolicy
from google.cloud.pubsublite.internal.wire.gapic_connection import GapicConnectionFactory
from google.cloud.pubsublite.internal.wire.merge_metadata import merge_metadata
from google.cloud.pubsublite.internal.wire.publisher import Publisher
from google.cloud.pubsublite.internal.wire.routing_publisher import RoutingPublisher
from google.cloud.pubsublite.internal.wire.single_partition_publisher import SinglePartitionPublisher
from google.cloud.pubsublite.partition import Partition
from google.cloud.pubsublite.paths import TopicPath
from google.cloud.pubsublite.routing_metadata import topic_routing_metadata
from google.cloud.pubsublite_v1 import InitialPublishRequest, PublishRequest
from google.cloud.pubsublite_v1.services.publisher_service import async_client
from google.cloud.pubsublite_v1.services.admin_service.client import AdminServiceClient
from google.cloud.pubsublite_v1.types.admin import GetTopicPartitionsRequest
from google.api_core.client_options import ClientOptions
from google.auth.credentials import Credentials


def make_publisher(
dpcollins-google marked this conversation as resolved.
Show resolved Hide resolved
topic: TopicPath,
batching_delay_secs: float = .05,
credentials: Optional[Credentials] = None,
client_options: Optional[ClientOptions] = None,
metadata: Optional[Mapping[str, str]] = None) -> Publisher:
"""
Make a new publisher for the given topic.

Args:
topic: The topic to publish to.
batching_delay_secs: The delay in seconds to batch messages. The default is reasonable for most cases.
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
metadata: Additional metadata to send with the RPC.

Returns:
A new Publisher.

Throws:
GoogleApiCallException on any error determining topic structure.
"""
if client_options is None:
client_options = ClientOptions(api_endpoint=regional_endpoint(topic.location.region))
client = async_client.PublisherServiceAsyncClient(
credentials=credentials, client_options=client_options) # type: ignore

admin_client = AdminServiceClient(credentials=credentials, client_options=client_options)
partitions = admin_client.get_topic_partitions(GetTopicPartitionsRequest(name=str(topic)))

clients: MutableMapping[Partition, Publisher] = {}

for partition in range(partitions.partition_count):
partition = Partition(partition)

def connection_factory(requests: AsyncIterator[PublishRequest]):
final_metadata = merge_metadata(metadata, topic_routing_metadata(topic, partition))
return client.publish(requests, metadata=list(final_metadata.items()))

clients[partition] = SinglePartitionPublisher(InitialPublishRequest(topic=str(topic), partition=partition.value),
batching_delay_secs, GapicConnectionFactory(connection_factory))
return RoutingPublisher(DefaultRoutingPolicy(partitions.partition_count), clients)
15 changes: 15 additions & 0 deletions google/cloud/pubsublite/internal/wire/merge_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import Mapping, Optional


def merge_metadata(a: Optional[Mapping[str, str]], b: Optional[Mapping[str, str]]) -> Mapping[str, str]:
dpcollins-google marked this conversation as resolved.
Show resolved Hide resolved
"""
Merge the two sets of metadata if either exists. The second map overwrites the first.
"""
result = {}
if a:
for k, v in a.items():
result[k] = v
if b:
for k, v in b.items():
result[k] = v
return result
3 changes: 3 additions & 0 deletions google/cloud/pubsublite/internal/wire/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@


class Publisher(ABC):
"""
A Pub/Sub Lite asynchronous wire protocol publisher.
"""
@abstractmethod
async def __aenter__(self):
raise NotImplementedError()
Expand Down
34 changes: 34 additions & 0 deletions google/cloud/pubsublite/internal/wire/pubsub_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from base64 import b64encode
from typing import Mapping, Optional, NamedTuple

from absl import logging
import pkg_resources
from google.protobuf import struct_pb2


class _Semver(NamedTuple):
major: int
minor: int


def _version() -> _Semver:
version = pkg_resources.get_distribution("google-cloud-pubsublite").version
splits = version.split(".")
if len(splits) != 3:
logging.info(f"Failed to extract semver from {version}.")
return _Semver(0, 0)
return _Semver(int(splits[0]), int(splits[1]))


def pubsub_context(framework: Optional[str] = None) -> Mapping[str, str]:
"""Construct the pubsub context mapping for the given framework."""
context = struct_pb2.Struct()
context.fields["language"] = struct_pb2.Value(string_value="PYTHON")
if framework:
context.fields["framework"] = struct_pb2.Value(string_value=framework)
version = _version()
context.fields["major_version"] = struct_pb2.Value(number_value=version.major)
context.fields["minor_version"] = struct_pb2.Value(number_value=version.minor)
encoded = b64encode(context.SerializeToString())
return {"x-goog-pubsub-context": encoded}

6 changes: 3 additions & 3 deletions google/cloud/pubsublite/internal/wire/routing_publisher.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict
from typing import Mapping

from google.cloud.pubsublite.internal.wire.publisher import Publisher
from google.cloud.pubsublite.internal.wire.routing_policy import RoutingPolicy
Expand All @@ -9,9 +9,9 @@

class RoutingPublisher(Publisher):
_routing_policy: RoutingPolicy
_publishers: Dict[Partition, Publisher]
_publishers: Mapping[Partition, Publisher]

def __init__(self, routing_policy: RoutingPolicy, publishers: Dict[Partition, Publisher]):
def __init__(self, routing_policy: RoutingPolicy, publishers: Mapping[Partition, Publisher]):
self._routing_policy = routing_policy
self._publishers = publishers

Expand Down
13 changes: 13 additions & 0 deletions google/cloud/pubsublite/location.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from typing import NamedTuple


class CloudRegion(NamedTuple):
name: str


class CloudZone(NamedTuple):
region: CloudRegion
zone_id: str

def __str__(self):
return f"{self.region.name}-{self.zone_id}"
21 changes: 21 additions & 0 deletions google/cloud/pubsublite/paths.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import NamedTuple

from google.cloud.pubsublite.location import CloudZone


class TopicPath(NamedTuple):
project_number: int
location: CloudZone
name: str

def __str__(self):
return f"projects/{self.project_number}/locations/{self.location}/topics/{self.name}"


class SubscriptionPath(NamedTuple):
project_number: int
location: CloudZone
name: str

def __str__(self):
return f"projects/{self.project_number}/locations/{self.location}/subscriptions/{self.name}"
17 changes: 17 additions & 0 deletions google/cloud/pubsublite/routing_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from typing import Mapping, Union
from urllib.parse import urlencode

from google.cloud.pubsublite.partition import Partition
from google.cloud.pubsublite.paths import TopicPath, SubscriptionPath

_PARAMS_HEADER = "x-goog-request-params"


def topic_routing_metadata(topic: TopicPath, partition: Partition) -> Mapping[str, str]:
encoded = urlencode(topic)
return {_PARAMS_HEADER: f"partition={partition.value}&topic={encoded}"}


def subscription_routing_metadata(subscription: SubscriptionPath, partition: Partition) -> Mapping[str, str]:
encoded = urlencode(subscription)
return {_PARAMS_HEADER: f"partition={partition.value}&subscription={encoded}"}
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"google-api-core >= 1.22.0",
"absl-py >= 0.9.0",
"proto-plus >= 0.4.0",
"setuptools"
]

setuptools.setup(
Expand Down