diff --git a/google/cloud/pubsublite/admin_client.py b/google/cloud/pubsublite/admin_client.py index 7dd20491..029017e6 100644 --- a/google/cloud/pubsublite/admin_client.py +++ b/google/cloud/pubsublite/admin_client.py @@ -18,7 +18,7 @@ from google.api_core.client_options import ClientOptions from google.api_core.operation import Operation from google.auth.credentials import Credentials -from google.protobuf.field_mask_pb2 import FieldMask +from google.protobuf.field_mask_pb2 import FieldMask # pytype: disable=pyi-error from google.cloud.pubsublite.admin_client_interface import AdminClientInterface from google.cloud.pubsublite.internal.constructable_from_service_account import ( diff --git a/google/cloud/pubsublite/admin_client_interface.py b/google/cloud/pubsublite/admin_client_interface.py index becf9ea7..dd7ad3e6 100644 --- a/google/cloud/pubsublite/admin_client_interface.py +++ b/google/cloud/pubsublite/admin_client_interface.py @@ -27,7 +27,7 @@ ) from google.cloud.pubsublite.types.paths import ReservationPath from google.cloud.pubsublite_v1 import Topic, Subscription, Reservation -from google.protobuf.field_mask_pb2 import FieldMask +from google.protobuf.field_mask_pb2 import FieldMask # pytype: disable=pyi-error class AdminClientInterface(ABC): diff --git a/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker.py b/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker.py index 67268deb..01f0157f 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker.py @@ -12,11 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import abstractmethod +from abc import abstractmethod, ABCMeta from typing import AsyncContextManager -class AckSetTracker(AsyncContextManager): +class AckSetTracker(AsyncContextManager, metaclass=ABCMeta): """ An AckSetTracker tracks disjoint acknowledged messages and commits them when a contiguous prefix of tracked offsets is aggregated. diff --git a/google/cloud/pubsublite/cloudpubsub/internal/make_subscriber.py b/google/cloud/pubsublite/cloudpubsub/internal/make_subscriber.py index eee816e2..e1ffff55 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/make_subscriber.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/make_subscriber.py @@ -88,6 +88,9 @@ def _make_dynamic_assigner( credentials: Optional[Credentials], base_metadata: Optional[Mapping[str, str]], ) -> Assigner: + if base_metadata is None: + base_metadata = {} + def assignment_connection_factory( requests: AsyncIterator[PartitionAssignmentRequest], ): diff --git a/google/cloud/pubsublite/cloudpubsub/internal/single_partition_subscriber.py b/google/cloud/pubsublite/cloudpubsub/internal/single_partition_subscriber.py index e0df2bfb..85cb864a 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/single_partition_subscriber.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/single_partition_subscriber.py @@ -51,7 +51,7 @@ def str(self) -> str: return json.dumps({"generation": self.generation, "offset": self.offset}) @staticmethod - def parse(payload: str) -> "_AckId": + def parse(payload: str) -> "_AckId": # pytype: disable=invalid-annotation loaded = json.loads(payload) return _AckId( generation=int(loaded["generation"]), offset=int(loaded["offset"]), diff --git a/google/cloud/pubsublite/cloudpubsub/internal/single_publisher.py b/google/cloud/pubsublite/cloudpubsub/internal/single_publisher.py index 25bd15b5..14dd08a9 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/single_publisher.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/single_publisher.py @@ -12,12 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import abstractmethod +from abc import abstractmethod, ABCMeta from typing import AsyncContextManager, Mapping, ContextManager from concurrent import futures -class AsyncSinglePublisher(AsyncContextManager): +class AsyncSinglePublisher(AsyncContextManager, metaclass=ABCMeta): """ An AsyncPublisher publishes messages similar to Google Pub/Sub, but must be used in an async context. Any publish failures are permanent. @@ -43,9 +43,10 @@ async def publish( Raises: GoogleApiCallError: On a permanent failure. """ + raise NotImplementedError() -class SinglePublisher(ContextManager): +class SinglePublisher(ContextManager, metaclass=ABCMeta): """ A Publisher publishes messages similar to Google Pub/Sub. Any publish failures are permanent. @@ -70,3 +71,4 @@ def publish( Raises: GoogleApiCallError: On a permanent failure. """ + raise NotImplementedError() diff --git a/google/cloud/pubsublite/cloudpubsub/internal/single_subscriber.py b/google/cloud/pubsublite/cloudpubsub/internal/single_subscriber.py index a2eddbc6..ab787bf3 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/single_subscriber.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/single_subscriber.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import abstractmethod +from abc import abstractmethod, ABCMeta from typing import AsyncContextManager, Callable, Set, Optional from google.cloud.pubsub_v1.subscriber.message import Message @@ -24,7 +24,7 @@ ) -class AsyncSingleSubscriber(AsyncContextManager): +class AsyncSingleSubscriber(AsyncContextManager, metaclass=ABCMeta): """ A Cloud Pub/Sub asynchronous subscriber. diff --git a/google/cloud/pubsublite/cloudpubsub/message_transforms.py b/google/cloud/pubsublite/cloudpubsub/message_transforms.py index 3eeed468..381cf5cd 100644 --- a/google/cloud/pubsublite/cloudpubsub/message_transforms.py +++ b/google/cloud/pubsublite/cloudpubsub/message_transforms.py @@ -15,7 +15,7 @@ import datetime from google.api_core.exceptions import InvalidArgument -from google.protobuf.timestamp_pb2 import Timestamp +from google.protobuf.timestamp_pb2 import Timestamp # pytype: disable=pyi-error from google.pubsub_v1 import PubsubMessage from google.cloud.pubsublite.cloudpubsub import MessageTransformer diff --git a/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py b/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py index 47d6783a..0dafa16b 100644 --- a/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py +++ b/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py @@ -12,14 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import abstractmethod +from abc import abstractmethod, ABCMeta from concurrent.futures import Future from typing import ContextManager, Mapping, Union, AsyncContextManager from google.cloud.pubsublite.types import TopicPath -class AsyncPublisherClientInterface(AsyncContextManager): +class AsyncPublisherClientInterface(AsyncContextManager, metaclass=ABCMeta): """ An AsyncPublisherClientInterface publishes messages similar to Google Pub/Sub, but must be used in an async context. Any publish failures are unlikely to succeed if retried. @@ -50,9 +50,10 @@ async def publish( Raises: GoogleApiCallError: On a permanent failure. """ + raise NotImplementedError() -class PublisherClientInterface(ContextManager): +class PublisherClientInterface(ContextManager, metaclass=ABCMeta): """ A PublisherClientInterface publishes messages similar to Google Pub/Sub. Any publish failures are unlikely to succeed if retried. @@ -84,3 +85,4 @@ def publish( Raises: GoogleApiCallError: On a permanent failure. """ + raise NotImplementedError() diff --git a/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py b/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py index d1586661..d50b383c 100644 --- a/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py +++ b/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import abstractmethod +from abc import abstractmethod, ABCMeta from typing import ( ContextManager, Union, @@ -33,7 +33,7 @@ ) -class AsyncSubscriberClientInterface(AsyncContextManager): +class AsyncSubscriberClientInterface(AsyncContextManager, metaclass=ABCMeta): """ An AsyncSubscriberClientInterface reads messages similar to Google Pub/Sub, but must be used in an async context. @@ -64,12 +64,13 @@ async def subscribe( Raises: GoogleApiCallError: On a permanent failure. """ + raise NotImplementedError() MessageCallback = Callable[[Message], None] -class SubscriberClientInterface(ContextManager): +class SubscriberClientInterface(ContextManager, metaclass=ABCMeta): """ A SubscriberClientInterface reads messages similar to Google Pub/Sub. Any subscribe failures are unlikely to succeed if retried. @@ -103,3 +104,4 @@ def subscribe( Raises: GoogleApiCallError: On a permanent failure. """ + raise NotImplementedError() diff --git a/google/cloud/pubsublite/internal/wire/admin_client_impl.py b/google/cloud/pubsublite/internal/wire/admin_client_impl.py index a5a9366c..ee269f43 100644 --- a/google/cloud/pubsublite/internal/wire/admin_client_impl.py +++ b/google/cloud/pubsublite/internal/wire/admin_client_impl.py @@ -16,7 +16,7 @@ from google.api_core.exceptions import InvalidArgument from google.api_core.operation import Operation -from google.protobuf.field_mask_pb2 import FieldMask +from google.protobuf.field_mask_pb2 import FieldMask # pytype: disable=pyi-error from google.cloud.pubsublite.admin_client_interface import AdminClientInterface from google.cloud.pubsublite.types import ( @@ -37,6 +37,7 @@ Reservation, TimeTarget, SeekSubscriptionRequest, + CreateSubscriptionRequest, ) @@ -88,12 +89,12 @@ def create_subscription( ) -> Subscription: path = SubscriptionPath.parse(subscription.name) return self._underlying.create_subscription( - request={ - "parent": str(path.to_location_path()), - "subscription": subscription, - "subscription_id": path.name, - "skip_backlog": (starting_offset == BacklogLocation.END), - } + request=CreateSubscriptionRequest( + parent=str(path.to_location_path()), + subscription=subscription, + subscription_id=path.name, + skip_backlog=(starting_offset == BacklogLocation.END), + ) ) def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription: diff --git a/google/cloud/pubsublite/internal/wire/assigner.py b/google/cloud/pubsublite/internal/wire/assigner.py index c9d01930..dd1f472b 100644 --- a/google/cloud/pubsublite/internal/wire/assigner.py +++ b/google/cloud/pubsublite/internal/wire/assigner.py @@ -12,13 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import abstractmethod +from abc import abstractmethod, ABCMeta from typing import AsyncContextManager, Set -from google.cloud.pubsublite.types import Partition +from google.cloud.pubsublite.types.partition import Partition -class Assigner(AsyncContextManager): +class Assigner(AsyncContextManager, metaclass=ABCMeta): """ An assigner will deliver a continuous stream of assignments when called into. Perform all necessary work with the assignment before attempting to get the next one. diff --git a/google/cloud/pubsublite/internal/wire/assigner_impl.py b/google/cloud/pubsublite/internal/wire/assigner_impl.py index 4816ea92..17ee55ea 100644 --- a/google/cloud/pubsublite/internal/wire/assigner_impl.py +++ b/google/cloud/pubsublite/internal/wire/assigner_impl.py @@ -28,7 +28,7 @@ ConnectionReinitializer, ) from google.cloud.pubsublite.internal.wire.connection import Connection -from google.cloud.pubsublite.types import Partition +from google.cloud.pubsublite.types.partition import Partition from google.cloud.pubsublite_v1.types import ( PartitionAssignmentRequest, PartitionAssignment, diff --git a/google/cloud/pubsublite/internal/wire/committer.py b/google/cloud/pubsublite/internal/wire/committer.py index f095a96b..f2485f9e 100644 --- a/google/cloud/pubsublite/internal/wire/committer.py +++ b/google/cloud/pubsublite/internal/wire/committer.py @@ -12,13 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import abstractmethod +from abc import abstractmethod, ABCMeta from typing import AsyncContextManager from google.cloud.pubsublite_v1 import Cursor -class Committer(AsyncContextManager): +class Committer(AsyncContextManager, metaclass=ABCMeta): """ A Committer is able to commit subscribers' completed offsets. """ diff --git a/google/cloud/pubsublite/internal/wire/connection.py b/google/cloud/pubsublite/internal/wire/connection.py index 2816fa5f..6e34324e 100644 --- a/google/cloud/pubsublite/internal/wire/connection.py +++ b/google/cloud/pubsublite/internal/wire/connection.py @@ -13,13 +13,15 @@ # limitations under the License. from typing import Generic, TypeVar, AsyncContextManager -from abc import abstractmethod +from abc import abstractmethod, ABCMeta Request = TypeVar("Request") Response = TypeVar("Response") -class Connection(Generic[Request, Response], AsyncContextManager["Connection"]): +class Connection( + AsyncContextManager["Connection"], Generic[Request, Response], metaclass=ABCMeta +): """ A connection to an underlying stream. Only one call to 'read' may be outstanding at a time. """ @@ -45,8 +47,9 @@ async def read(self) -> Response: raise NotImplementedError() -class ConnectionFactory(Generic[Request, Response]): +class ConnectionFactory(Generic[Request, Response], metaclass=ABCMeta): """A factory for producing Connections.""" + @abstractmethod async def new(self) -> Connection[Request, Response]: raise NotImplementedError() diff --git a/google/cloud/pubsublite/internal/wire/default_routing_policy.py b/google/cloud/pubsublite/internal/wire/default_routing_policy.py index d1bb34a2..9d042806 100644 --- a/google/cloud/pubsublite/internal/wire/default_routing_policy.py +++ b/google/cloud/pubsublite/internal/wire/default_routing_policy.py @@ -16,7 +16,7 @@ import random from google.cloud.pubsublite.internal.wire.routing_policy import RoutingPolicy -from google.cloud.pubsublite.types import Partition +from google.cloud.pubsublite.types.partition import Partition from google.cloud.pubsublite_v1.types import PubSubMessage diff --git a/google/cloud/pubsublite/internal/wire/fixed_set_assigner.py b/google/cloud/pubsublite/internal/wire/fixed_set_assigner.py index 86565dea..29d99da9 100644 --- a/google/cloud/pubsublite/internal/wire/fixed_set_assigner.py +++ b/google/cloud/pubsublite/internal/wire/fixed_set_assigner.py @@ -16,7 +16,7 @@ from typing import Set from google.cloud.pubsublite.internal.wire.assigner import Assigner -from google.cloud.pubsublite.types import Partition +from google.cloud.pubsublite.types.partition import Partition class FixedSetAssigner(Assigner): diff --git a/google/cloud/pubsublite/internal/wire/gapic_connection.py b/google/cloud/pubsublite/internal/wire/gapic_connection.py index 27657799..004bcff5 100644 --- a/google/cloud/pubsublite/internal/wire/gapic_connection.py +++ b/google/cloud/pubsublite/internal/wire/gapic_connection.py @@ -12,7 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import AsyncIterator, TypeVar, Optional, Callable, AsyncIterable, Awaitable +from typing import ( + cast, + AsyncIterator, + TypeVar, + Optional, + Callable, + AsyncIterable, + Awaitable, +) import asyncio from google.api_core.exceptions import GoogleAPICallError, FailedPrecondition @@ -34,7 +42,7 @@ class GapicConnection( ): """A Connection wrapping a gapic AsyncIterator[Request/Response] pair.""" - _write_queue: "asyncio.Queue[WorkItem[Request]]" + _write_queue: "asyncio.Queue[WorkItem[Request, None]]" _response_it: Optional[AsyncIterator[Response]] def __init__(self): @@ -50,8 +58,12 @@ async def write(self, request: Request) -> None: await self.await_unless_failed(item.response_future) async def read(self) -> Response: + if self._response_it is None: + self.fail(FailedPrecondition("GapicConnection not initialized.")) + raise self.error() try: - return await self.await_unless_failed(self._response_it.__anext__()) + response_it = cast(AsyncIterator[Response], self._response_it) + return await self.await_unless_failed(response_it.__anext__()) except StopAsyncIteration: self.fail(FailedPrecondition("Server sent unprompted half close.")) except GoogleAPICallError as e: @@ -65,7 +77,7 @@ async def __aexit__(self, exc_type, exc_value, traceback) -> None: pass async def __anext__(self) -> Request: - item: WorkItem[Request] = await self.await_unless_failed( + item: WorkItem[Request, None] = await self.await_unless_failed( self._write_queue.get() ) item.response_future.set_result(None) diff --git a/google/cloud/pubsublite/internal/wire/partition_count_watcher.py b/google/cloud/pubsublite/internal/wire/partition_count_watcher.py index f0527755..157053c6 100644 --- a/google/cloud/pubsublite/internal/wire/partition_count_watcher.py +++ b/google/cloud/pubsublite/internal/wire/partition_count_watcher.py @@ -12,11 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import abstractmethod +from abc import abstractmethod, ABCMeta from typing import AsyncContextManager -class PartitionCountWatcher(AsyncContextManager): +class PartitionCountWatcher(AsyncContextManager, metaclass=ABCMeta): @abstractmethod async def get_partition_count(self) -> int: raise NotImplementedError() diff --git a/google/cloud/pubsublite/internal/wire/partition_count_watcher_impl.py b/google/cloud/pubsublite/internal/wire/partition_count_watcher_impl.py index 1b0796ae..6a27e84d 100644 --- a/google/cloud/pubsublite/internal/wire/partition_count_watcher_impl.py +++ b/google/cloud/pubsublite/internal/wire/partition_count_watcher_impl.py @@ -32,7 +32,7 @@ class PartitionCountWatcherImpl(PartitionCountWatcher, PermanentFailable): _any_success: bool _thread: ThreadPoolExecutor _queue: asyncio.Queue - _poll_partition_loop: asyncio.Future + _partition_loop_poller: asyncio.Future def __init__( self, admin: AdminClientInterface, topic_path: TopicPath, duration: float @@ -46,13 +46,13 @@ def __init__( async def __aenter__(self): self._thread = ThreadPoolExecutor(max_workers=1) self._queue = asyncio.Queue(maxsize=1) - self._poll_partition_loop = asyncio.ensure_future( + self._partition_loop_poller = asyncio.ensure_future( self.run_poller(self._poll_partition_loop) ) async def __aexit__(self, exc_type, exc_val, exc_tb): - self._poll_partition_loop.cancel() - await wait_ignore_cancelled(self._poll_partition_loop) + self._partition_loop_poller.cancel() + await wait_ignore_cancelled(self._partition_loop_poller) self._thread.shutdown(wait=False) def _get_partition_count_sync(self) -> int: diff --git a/google/cloud/pubsublite/internal/wire/publisher.py b/google/cloud/pubsublite/internal/wire/publisher.py index c7352afe..b83e8bf7 100644 --- a/google/cloud/pubsublite/internal/wire/publisher.py +++ b/google/cloud/pubsublite/internal/wire/publisher.py @@ -12,13 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import abstractmethod +from abc import abstractmethod, ABCMeta from typing import AsyncContextManager from google.cloud.pubsublite_v1.types import PubSubMessage from google.cloud.pubsublite.types import MessageMetadata -class Publisher(AsyncContextManager): +class Publisher(AsyncContextManager, metaclass=ABCMeta): """ A Pub/Sub Lite asynchronous wire protocol publisher. """ diff --git a/google/cloud/pubsublite/internal/wire/pubsub_context.py b/google/cloud/pubsublite/internal/wire/pubsub_context.py index 96740187..802f067c 100644 --- a/google/cloud/pubsublite/internal/wire/pubsub_context.py +++ b/google/cloud/pubsublite/internal/wire/pubsub_context.py @@ -17,7 +17,7 @@ import logging import pkg_resources -from google.protobuf import struct_pb2 +from google.protobuf import struct_pb2 # pytype: disable=pyi-error _LOGGER = logging.getLogger(__name__) diff --git a/google/cloud/pubsublite/internal/wire/retrying_connection.py b/google/cloud/pubsublite/internal/wire/retrying_connection.py index 634f8fa3..ae35930a 100644 --- a/google/cloud/pubsublite/internal/wire/retrying_connection.py +++ b/google/cloud/pubsublite/internal/wire/retrying_connection.py @@ -94,10 +94,14 @@ async def _run_loop(self): ) self._read_queue = asyncio.Queue(maxsize=1) self._write_queue = asyncio.Queue(maxsize=1) - await self._reinitializer.reinitialize(connection, last_failure) + await self._reinitializer.reinitialize( + connection, last_failure # pytype: disable=name-error + ) self._initialized_once.set() bad_retries = 0 - await self._loop_connection(connection) + await self._loop_connection( + connection # pytype: disable=name-error + ) except GoogleAPICallError as e: last_failure = e if not is_retryable(e): @@ -118,7 +122,7 @@ async def _run_loop(self): async def _loop_connection(self, connection: Connection[Request, Response]): read_task: "Future[Response]" = asyncio.ensure_future(connection.read()) - write_task: "Future[WorkItem[Request]]" = asyncio.ensure_future( + write_task: "Future[WorkItem[Request, None]]" = asyncio.ensure_future( self._write_queue.get() ) try: diff --git a/google/cloud/pubsublite/internal/wire/routing_policy.py b/google/cloud/pubsublite/internal/wire/routing_policy.py index 9d7bb40b..c9108838 100644 --- a/google/cloud/pubsublite/internal/wire/routing_policy.py +++ b/google/cloud/pubsublite/internal/wire/routing_policy.py @@ -14,7 +14,7 @@ from abc import ABC, abstractmethod -from google.cloud.pubsublite.types import Partition +from google.cloud.pubsublite.types.partition import Partition from google.cloud.pubsublite_v1.types.common import PubSubMessage diff --git a/google/cloud/pubsublite/internal/wire/serial_batcher.py b/google/cloud/pubsublite/internal/wire/serial_batcher.py index 90554d60..abc19773 100644 --- a/google/cloud/pubsublite/internal/wire/serial_batcher.py +++ b/google/cloud/pubsublite/internal/wire/serial_batcher.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import abstractmethod +from abc import abstractmethod, ABCMeta from typing import Generic, List, NamedTuple import asyncio from overrides import overrides @@ -31,7 +31,7 @@ def __add__(self, other: "BatchSize") -> "BatchSize": ) -class RequestSizer(Generic[Request]): +class RequestSizer(Generic[Request], metaclass=ABCMeta): """A RequestSizer determines the size of a request.""" @abstractmethod @@ -47,7 +47,7 @@ def get_size(self, request: Request) -> BatchSize: class IgnoredRequestSizer(RequestSizer[Request]): @overrides - def get_size(self, request: Request) -> BatchSize: + def get_size(self, request) -> BatchSize: return BatchSize(0, 0) diff --git a/google/cloud/pubsublite/internal/wire/subscriber.py b/google/cloud/pubsublite/internal/wire/subscriber.py index 86305366..610ba9d1 100644 --- a/google/cloud/pubsublite/internal/wire/subscriber.py +++ b/google/cloud/pubsublite/internal/wire/subscriber.py @@ -12,12 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import abstractmethod +from abc import abstractmethod, ABCMeta from typing import AsyncContextManager from google.cloud.pubsublite_v1.types import SequencedMessage, FlowControlRequest -class Subscriber(AsyncContextManager): +class Subscriber(AsyncContextManager, metaclass=ABCMeta): """ A Pub/Sub Lite asynchronous wire protocol subscriber. """ diff --git a/google/cloud/pubsublite/testing/test_reset_signal.py b/google/cloud/pubsublite/testing/test_reset_signal.py index d0d989da..e71bf4f9 100644 --- a/google/cloud/pubsublite/testing/test_reset_signal.py +++ b/google/cloud/pubsublite/testing/test_reset_signal.py @@ -14,7 +14,7 @@ from asynctest.mock import MagicMock from google.api_core.exceptions import Aborted, GoogleAPICallError -from google.protobuf.any_pb2 import Any +from google.protobuf.any_pb2 import Any # pytype: disable=pyi-error from google.rpc.error_details_pb2 import ErrorInfo from google.rpc.status_pb2 import Status import grpc diff --git a/google/cloud/pubsublite/testing/test_utils.py b/google/cloud/pubsublite/testing/test_utils.py index 766a5c01..cbf9a7b3 100644 --- a/google/cloud/pubsublite/testing/test_utils.py +++ b/google/cloud/pubsublite/testing/test_utils.py @@ -14,7 +14,7 @@ import asyncio import threading -from typing import List, Union, Any, TypeVar, Generic, Optional, Callable +from typing import List, Union, Any, TypeVar, Generic, Optional, Callable, Awaitable from asynctest import CoroutineMock @@ -30,7 +30,7 @@ async def async_iterable(elts: List[Union[Any, Exception]]): def make_queue_waiter( started_q: "asyncio.Queue[None]", result_q: "asyncio.Queue[Union[T, Exception]]" -): +) -> Callable[[], Awaitable[T]]: """ Given a queue to notify when started and a queue to get results from, return a waiter which notifies started_q when started and returns from result_q when done. diff --git a/google/cloud/pubsublite_v1/services/admin_service/pagers.py b/google/cloud/pubsublite_v1/services/admin_service/pagers.py index 893e9189..c2673200 100644 --- a/google/cloud/pubsublite_v1/services/admin_service/pagers.py +++ b/google/cloud/pubsublite_v1/services/admin_service/pagers.py @@ -15,10 +15,10 @@ # from typing import ( Any, - AsyncIterable, + AsyncIterator, Awaitable, Callable, - Iterable, + Iterator, Sequence, Tuple, Optional, @@ -75,14 +75,14 @@ def __getattr__(self, name: str) -> Any: return getattr(self._response, name) @property - def pages(self) -> Iterable[admin.ListTopicsResponse]: + def pages(self) -> Iterator[admin.ListTopicsResponse]: yield self._response while self._response.next_page_token: self._request.page_token = self._response.next_page_token self._response = self._method(self._request, metadata=self._metadata) yield self._response - def __iter__(self) -> Iterable[common.Topic]: + def __iter__(self) -> Iterator[common.Topic]: for page in self.pages: yield from page.topics @@ -137,14 +137,14 @@ def __getattr__(self, name: str) -> Any: return getattr(self._response, name) @property - async def pages(self) -> AsyncIterable[admin.ListTopicsResponse]: + async def pages(self) -> AsyncIterator[admin.ListTopicsResponse]: yield self._response while self._response.next_page_token: self._request.page_token = self._response.next_page_token self._response = await self._method(self._request, metadata=self._metadata) yield self._response - def __aiter__(self) -> AsyncIterable[common.Topic]: + def __aiter__(self) -> AsyncIterator[common.Topic]: async def async_generator(): async for page in self.pages: for response in page.topics: @@ -203,14 +203,14 @@ def __getattr__(self, name: str) -> Any: return getattr(self._response, name) @property - def pages(self) -> Iterable[admin.ListTopicSubscriptionsResponse]: + def pages(self) -> Iterator[admin.ListTopicSubscriptionsResponse]: yield self._response while self._response.next_page_token: self._request.page_token = self._response.next_page_token self._response = self._method(self._request, metadata=self._metadata) yield self._response - def __iter__(self) -> Iterable[str]: + def __iter__(self) -> Iterator[str]: for page in self.pages: yield from page.subscriptions @@ -265,14 +265,14 @@ def __getattr__(self, name: str) -> Any: return getattr(self._response, name) @property - async def pages(self) -> AsyncIterable[admin.ListTopicSubscriptionsResponse]: + async def pages(self) -> AsyncIterator[admin.ListTopicSubscriptionsResponse]: yield self._response while self._response.next_page_token: self._request.page_token = self._response.next_page_token self._response = await self._method(self._request, metadata=self._metadata) yield self._response - def __aiter__(self) -> AsyncIterable[str]: + def __aiter__(self) -> AsyncIterator[str]: async def async_generator(): async for page in self.pages: for response in page.subscriptions: @@ -331,14 +331,14 @@ def __getattr__(self, name: str) -> Any: return getattr(self._response, name) @property - def pages(self) -> Iterable[admin.ListSubscriptionsResponse]: + def pages(self) -> Iterator[admin.ListSubscriptionsResponse]: yield self._response while self._response.next_page_token: self._request.page_token = self._response.next_page_token self._response = self._method(self._request, metadata=self._metadata) yield self._response - def __iter__(self) -> Iterable[common.Subscription]: + def __iter__(self) -> Iterator[common.Subscription]: for page in self.pages: yield from page.subscriptions @@ -393,14 +393,14 @@ def __getattr__(self, name: str) -> Any: return getattr(self._response, name) @property - async def pages(self) -> AsyncIterable[admin.ListSubscriptionsResponse]: + async def pages(self) -> AsyncIterator[admin.ListSubscriptionsResponse]: yield self._response while self._response.next_page_token: self._request.page_token = self._response.next_page_token self._response = await self._method(self._request, metadata=self._metadata) yield self._response - def __aiter__(self) -> AsyncIterable[common.Subscription]: + def __aiter__(self) -> AsyncIterator[common.Subscription]: async def async_generator(): async for page in self.pages: for response in page.subscriptions: @@ -459,14 +459,14 @@ def __getattr__(self, name: str) -> Any: return getattr(self._response, name) @property - def pages(self) -> Iterable[admin.ListReservationsResponse]: + def pages(self) -> Iterator[admin.ListReservationsResponse]: yield self._response while self._response.next_page_token: self._request.page_token = self._response.next_page_token self._response = self._method(self._request, metadata=self._metadata) yield self._response - def __iter__(self) -> Iterable[common.Reservation]: + def __iter__(self) -> Iterator[common.Reservation]: for page in self.pages: yield from page.reservations @@ -521,14 +521,14 @@ def __getattr__(self, name: str) -> Any: return getattr(self._response, name) @property - async def pages(self) -> AsyncIterable[admin.ListReservationsResponse]: + async def pages(self) -> AsyncIterator[admin.ListReservationsResponse]: yield self._response while self._response.next_page_token: self._request.page_token = self._response.next_page_token self._response = await self._method(self._request, metadata=self._metadata) yield self._response - def __aiter__(self) -> AsyncIterable[common.Reservation]: + def __aiter__(self) -> AsyncIterator[common.Reservation]: async def async_generator(): async for page in self.pages: for response in page.reservations: @@ -587,14 +587,14 @@ def __getattr__(self, name: str) -> Any: return getattr(self._response, name) @property - def pages(self) -> Iterable[admin.ListReservationTopicsResponse]: + def pages(self) -> Iterator[admin.ListReservationTopicsResponse]: yield self._response while self._response.next_page_token: self._request.page_token = self._response.next_page_token self._response = self._method(self._request, metadata=self._metadata) yield self._response - def __iter__(self) -> Iterable[str]: + def __iter__(self) -> Iterator[str]: for page in self.pages: yield from page.topics @@ -649,14 +649,14 @@ def __getattr__(self, name: str) -> Any: return getattr(self._response, name) @property - async def pages(self) -> AsyncIterable[admin.ListReservationTopicsResponse]: + async def pages(self) -> AsyncIterator[admin.ListReservationTopicsResponse]: yield self._response while self._response.next_page_token: self._request.page_token = self._response.next_page_token self._response = await self._method(self._request, metadata=self._metadata) yield self._response - def __aiter__(self) -> AsyncIterable[str]: + def __aiter__(self) -> AsyncIterator[str]: async def async_generator(): async for page in self.pages: for response in page.topics: diff --git a/noxfile.py b/noxfile.py index 1e883ef9..ff06f0e6 100644 --- a/noxfile.py +++ b/noxfile.py @@ -42,6 +42,7 @@ "lint_setup_py", "blacken", "docs", + "pytype", # Custom pytype session ] # Error if a python version is missing @@ -78,9 +79,7 @@ def lint_setup_py(session): session.run("python", "setup.py", "check", "--restructuredtext", "--strict") -def default(session): - # Install all test dependencies, then install this package in-place. - +def install_test_deps(session): constraints_path = str( CURRENT_DIRECTORY / "testing" / f"constraints-{session.python}.txt" ) @@ -97,6 +96,11 @@ def default(session): session.install("-e", ".", "-c", constraints_path) + +def default(session): + # Install all test dependencies, then install this package in-place. + install_test_deps(session) + # Run py.test against the unit tests. session.run( "py.test", @@ -238,3 +242,11 @@ def docfx(session): os.path.join("docs", ""), os.path.join("docs", "_build", "html", ""), ) + + +@nox.session(python=DEFAULT_PYTHON_VERSION) +def pytype(session): + """Run type checks.""" + install_test_deps(session) + session.install("pytype") + session.run("pytype", "google/cloud/pubsublite") diff --git a/owlbot.py b/owlbot.py index ec361f90..0aadbdcf 100644 --- a/owlbot.py +++ b/owlbot.py @@ -75,6 +75,7 @@ excludes=[ ".coveragerc", # the microgenerator has a good coveragerc file "docs/multiprocessing.rst", # exclude multiprocessing note + "noxfile.py", # exclude to opt-in to pytype ] )