Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: refactor client classes for safer type checking #552

Merged
merged 11 commits into from Jan 19, 2022
1 change: 0 additions & 1 deletion .gitignore
Expand Up @@ -29,7 +29,6 @@ pip-log.txt
.nox
.cache
.pytest_cache
.pytype


# Mac
Expand Down
74 changes: 0 additions & 74 deletions google/cloud/pubsub_v1/_gapic.py

This file was deleted.

2 changes: 1 addition & 1 deletion google/cloud/pubsub_v1/publisher/_batch/thread.py
Expand Up @@ -271,7 +271,7 @@ def _commit(self) -> None:
batch_transport_succeeded = True
try:
# Performs retries for errors defined by the retry configuration.
response = self._client.api.publish(
response = self._client._gapic_publish(
topic=self._topic,
messages=self._messages,
retry=self._commit_retry,
Expand Down
47 changes: 33 additions & 14 deletions google/cloud/pubsub_v1/publisher/client.py
Expand Up @@ -22,12 +22,12 @@
import time
import typing
from typing import Any, Dict, Optional, Sequence, Tuple, Type, Union
import warnings

from google.api_core import gapic_v1
from google.auth.credentials import AnonymousCredentials # type: ignore
from google.oauth2 import service_account # type: ignore

from google.cloud.pubsub_v1 import _gapic
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.publisher import exceptions
from google.cloud.pubsub_v1.publisher import futures
Expand All @@ -49,15 +49,11 @@
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.publisher import _batch
from google.pubsub_v1.services.publisher.client import OptionalRetry
from google.pubsub_v1.types import pubsub as pubsub_types


_LOGGER = logging.getLogger(__name__)

_DENYLISTED_METHODS = (
"publish",
"from_service_account_file",
"from_service_account_json",
)

_raw_proto_pubbsub_message = gapic_types.PubsubMessage.pb()

Expand All @@ -66,8 +62,7 @@
]


@_gapic.add_methods(publisher_client.PublisherClient, denylist=_DENYLISTED_METHODS)
class Client(object):
class Client(publisher_client.PublisherClient):
"""A publisher client for Google Cloud Pub/Sub.

This creates an object that is capable of publishing messages.
Expand Down Expand Up @@ -146,8 +141,8 @@ def __init__(

# Add the metrics headers, and instantiate the underlying GAPIC
# client.
self.api = publisher_client.PublisherClient(**kwargs)
plamut marked this conversation as resolved.
Show resolved Hide resolved
self._target = self.api._transport._host
super().__init__(**kwargs)
self._target = self._transport._host
self._batch_class = thread.Batch
self.batch_settings = types.BatchSettings(*batch_settings)

Expand All @@ -164,7 +159,7 @@ def __init__(
self._flow_controller = FlowController(self.publisher_options.flow_control)

@classmethod
def from_service_account_file(
def from_service_account_file( # type: ignore[override]
cls,
filename: str,
batch_settings: Union[types.BatchSettings, Sequence] = (),
Expand All @@ -188,7 +183,7 @@ def from_service_account_file(
kwargs["credentials"] = credentials
return cls(batch_settings, **kwargs)

from_service_account_json = from_service_account_file
from_service_account_json = from_service_account_file # type: ignore[assignment]

@property
def target(self) -> str:
Expand All @@ -199,6 +194,26 @@ def target(self) -> str:
"""
return self._target

@property
def api(self):
"""The underlying gapic API client.

.. versionchanged:: 2.10.0
Instead of a GAPIC ``PublisherClient`` client instance, this property is a
proxy object to it with the same interface.

.. deprecated:: 2.10.0
Use the GAPIC methods and properties on the client instance directly
instead of through the :attr:`api` attribute.
"""
msg = (
'The "api" property only exists for backward compatibility, access its '
'attributes directly thorugh the client instance (e.g. "client.foo" '
'instead of "client.api.foo").'
)
warnings.warn(msg, category=DeprecationWarning)
return super()

def _get_or_create_sequencer(self, topic: str, ordering_key: str) -> SequencerType:
""" Get an existing sequencer or create a new one given the (topic,
ordering_key) pair.
Expand Down Expand Up @@ -252,7 +267,11 @@ def resume_publish(self, topic: str, ordering_key: str) -> None:
else:
sequencer.unpause()

def publish(
def _gapic_publish(self, *args, **kwargs) -> "pubsub_types.PublishResponse":
"""Call the GAPIC public API directly."""
return super().publish(*args, **kwargs)

def publish( # type: ignore[override]
self,
topic: str,
data: bytes,
Expand Down Expand Up @@ -382,7 +401,7 @@ def on_publish_done(future):
if self._enable_message_ordering:
if retry is gapic_v1.method.DEFAULT:
# use the default retry for the publish GRPC method as a base
transport = self.api._transport
transport = self._transport
base_retry = transport._wrapped_methods[transport.publish]._retry
retry = base_retry.with_deadline(2.0 ** 32)
else:
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/pubsub_v1/publisher/flow_controller.py
Expand Up @@ -25,7 +25,7 @@
_LOGGER = logging.getLogger(__name__)


MessageType = Type[types.PubsubMessage] # type: ignore # pytype: disable=module-attr
MessageType = Type[types.PubsubMessage] # type: ignore


class _QuantityReservation:
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/pubsub_v1/subscriber/_protocol/leaser.py
Expand Up @@ -76,7 +76,7 @@ def message_count(self) -> int:
return len(self._leased_messages)

@property
def ack_ids(self) -> KeysView[str]: # pytype: disable=invalid-annotation
def ack_ids(self) -> KeysView[str]:
"""The ack IDs of all leased messages."""
return self._leased_messages.keys()

Expand Down
Expand Up @@ -532,7 +532,7 @@ def open(
self._get_initial_request, stream_ack_deadline_seconds
)
self._rpc = bidi.ResumableBidiRpc(
start_rpc=self._client.api.streaming_pull,
start_rpc=self._client.streaming_pull,
initial_request=get_initial_request,
should_recover=self._should_recover,
should_terminate=self._should_terminate,
Expand All @@ -548,14 +548,11 @@ def open(

# Create references to threads
assert self._scheduler is not None
# pytype: disable=wrong-arg-types
# (pytype incorrectly complains about "self" not being the right argument type)
scheduler_queue = self._scheduler.queue
self._dispatcher = dispatcher.Dispatcher(self, scheduler_queue)
self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response)
self._leaser = leaser.Leaser(self)
self._heartbeater = heartbeater.Heartbeater(self)
# pytype: enable=wrong-arg-types

# Start the thread to pass the requests.
self._dispatcher.start()
Expand Down
48 changes: 29 additions & 19 deletions google/cloud/pubsub_v1/subscriber/client.py
Expand Up @@ -18,11 +18,11 @@
import pkg_resources
import typing
from typing import cast, Any, Callable, Optional, Sequence, Union
import warnings

from google.auth.credentials import AnonymousCredentials # type: ignore
from google.oauth2 import service_account # type: ignore

from google.cloud.pubsub_v1 import _gapic
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.subscriber import futures
from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager
Expand All @@ -42,15 +42,8 @@
# a PIP package.
__version__ = "0.0"

_DENYLISTED_METHODS = (
"publish",
"from_service_account_file",
"from_service_account_json",
)


@_gapic.add_methods(subscriber_client.SubscriberClient, denylist=_DENYLISTED_METHODS)
class Client(object):
class Client(subscriber_client.SubscriberClient):
"""A subscriber client for Google Cloud Pub/Sub.

This creates an object that is capable of subscribing to messages.
Expand Down Expand Up @@ -91,12 +84,14 @@ def __init__(self, **kwargs: Any):
kwargs["credentials"] = AnonymousCredentials()

# Instantiate the underlying GAPIC client.
self._api = subscriber_client.SubscriberClient(**kwargs)
self._target = self._api._transport._host
super().__init__(**kwargs)
self._target = self._transport._host
self._closed = False

@classmethod
def from_service_account_file(cls, filename: str, **kwargs: Any) -> "Client":
def from_service_account_file( # type: ignore[override]
cls, filename: str, **kwargs: Any
) -> "Client":
"""Creates an instance of this client using the provided credentials
file.

Expand All @@ -112,7 +107,7 @@ def from_service_account_file(cls, filename: str, **kwargs: Any) -> "Client":
kwargs["credentials"] = credentials
return cls(**kwargs)

from_service_account_json = from_service_account_file
from_service_account_json = from_service_account_file # type: ignore[assignment]

@property
def target(self) -> str:
Expand All @@ -123,11 +118,6 @@ def target(self) -> str:
"""
return self._target

@property
def api(self) -> subscriber_client.SubscriberClient:
"""The underlying gapic API client."""
return self._api

plamut marked this conversation as resolved.
Show resolved Hide resolved
@property
def closed(self) -> bool:
"""Return whether the client has been closed and cannot be used anymore.
Expand All @@ -136,6 +126,26 @@ def closed(self) -> bool:
"""
return self._closed

@property
def api(self):
"""The underlying gapic API client.

.. versionchanged:: 2.10.0
Instead of a GAPIC ``SubscriberClient`` client instance, this property is a
proxy object to it with the same interface.

.. deprecated:: 2.10.0
Use the GAPIC methods and properties on the client instance directly
instead of through the :attr:`api` attribute.
"""
msg = (
'The "api" property only exists for backward compatibility, access its '
'attributes directly thorugh the client instance (e.g. "client.foo" '
'instead of "client.api.foo").'
)
warnings.warn(msg, category=DeprecationWarning)
return super()

def subscribe(
self,
subscription: str,
Expand Down Expand Up @@ -266,7 +276,7 @@ def close(self) -> None:

This method is idempotent.
"""
transport = cast("SubscriberGrpcTransport", self.api._transport)
transport = cast("SubscriberGrpcTransport", self._transport)
transport.grpc_channel.close()
self._closed = True

Expand Down
2 changes: 1 addition & 1 deletion google/cloud/pubsub_v1/subscriber/message.py
Expand Up @@ -81,7 +81,7 @@ class Message(object):
The time that this message was originally published.
"""

def __init__( # pytype: disable=module-attr
def __init__(
self,
message: "types.PubsubMessage._meta._pb", # type: ignore
ack_id: str,
Expand Down
2 changes: 0 additions & 2 deletions google/cloud/pubsub_v1/types.py
Expand Up @@ -127,13 +127,11 @@ class PublisherOptions(NamedTuple):
"an instance of :class:`google.api_core.retry.Retry`."
)

# pytype: disable=invalid-annotation
timeout: "OptionalTimeout" = gapic_v1.method.DEFAULT # use api_core default
(
"Timeout settings for message publishing by the client. It should be "
"compatible with :class:`~.pubsub_v1.types.TimeoutType`."
)
# pytype: enable=invalid-annotation


# Define the type class and default values for flow control settings.
Expand Down