diff --git a/pubsub/google/cloud/pubsub_v1/publisher/batch/thread.py b/pubsub/google/cloud/pubsub_v1/publisher/batch/thread.py index 73cafb9cde13..64186b130e94 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/batch/thread.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/batch/thread.py @@ -20,6 +20,7 @@ import six +import google.api_core.exceptions from google.cloud.pubsub_v1 import types from google.cloud.pubsub_v1.publisher import exceptions from google.cloud.pubsub_v1.publisher import futures @@ -199,10 +200,24 @@ def _commit(self): # Begin the request to publish these messages. # Log how long the underlying request takes. start = time.time() - response = self._client.api.publish( - self._topic, - self._messages, - ) + + try: + response = self._client.api.publish( + self._topic, + self._messages, + ) + except google.api_core.exceptions.GoogleAPICallError as exc: + # We failed to publish, set the exception on all futures and + # exit. + self._status = base.BatchStatus.ERROR + + for future in self._futures: + future.set_exception(exc) + + _LOGGER.exception( + 'Failed to publish %s messages.', len(self._futures)) + return + end = time.time() _LOGGER.debug('gRPC Publish took %s seconds.', end - start) @@ -220,9 +235,14 @@ def _commit(self): self._status = base.BatchStatus.ERROR exception = exceptions.PublishError( 'Some messages were not successfully published.') + for future in self._futures: future.set_exception(exception) + _LOGGER.error( + 'Only %s of %s messages were published.', + len(response.message_ids), len(self._futures)) + def monitor(self): """Commit this batch after sufficient time has elapsed. diff --git a/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py index 1c08e1b3843a..fb62dbc6e550 100644 --- a/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -17,6 +17,7 @@ import mock +import google.api_core.exceptions from google.auth import credentials from google.cloud.pubsub_v1 import publisher from google.cloud.pubsub_v1 import types @@ -201,6 +202,26 @@ def test_blocking__commit_wrong_messageid_length(): assert isinstance(future.exception(), exceptions.PublishError) +def test_block__commmit_api_error(): + batch = create_batch() + futures = ( + batch.publish({'data': b'blah blah blah'}), + batch.publish({'data': b'blah blah blah blah'}), + ) + + # Make the API throw an error when publishing. + error = google.api_core.exceptions.InternalServerError('uh oh') + patch = mock.patch.object( + type(batch.client.api), 'publish', side_effect=error) + + with patch: + batch._commit() + + for future in futures: + assert future.done() + assert future.exception() == error + + def test_monitor(): batch = create_batch(max_latency=5.0) with mock.patch.object(time, 'sleep') as sleep: