Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous Subscription Begins to Fail After Some Time #294

Closed
tristeybro opened this issue Feb 1, 2022 · 1 comment · Fixed by #300 or #292
Closed

Asynchronous Subscription Begins to Fail After Some Time #294

tristeybro opened this issue Feb 1, 2022 · 1 comment · Fixed by #300 or #292
Assignees
Labels
api: pubsublite Issues related to the googleapis/python-pubsublite API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@tristeybro
Copy link

Hello! I have a long-running script that is continuously pulling messages from a pubsublite topic asynchronously and executing them as they come in. This works fine for a while, but eventually runs into an AioRpcError due to receiving a FlowControlRequest that that tried to remove messages. Once that happens, the subscription continues to fail indefinitely. See below.

Environment details

  • OS type and version: Linux
  • Python version: 3.9.7
  • pip version: 21.2.4
  • google-cloud-pubsublite version: 1.2.0

Code example

import asyncio
from google.cloud.pubsublite.cloudpubsub import AsyncSubscriberClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    FlowControlSettings,
    SubscriptionPath,
)
from google.cloud.pubsub_v1.types import PubsubMessage

per_partition_flow_control_settings = FlowControlSettings(
    messages_outstanding=20,
    # 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
    bytes_outstanding=10 * 1024 * 1024,
)

async def process_message():
    ...


async def main():
  async with AsyncSubscriberClient() as subscriber_client:
      streaming_pull_future = await subscriber_client.subscribe(
          subscription_path,
          per_partition_flow_control_settings=per_partition_flow_control_settings,
      )
      async for message in streaming_pull_future:
          process_message_coro = process_message(message)
          asyncio.create_task(process_message_coro)

if __name__ == "__main__":
    asyncio.run(main())

Stack trace

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/google/api_core/grpc_helpers_async.py", line 105, in _wrapped_aiter
    async for response in self._call:  # pragma: no branch
  File "/usr/local/lib/python3.9/site-packages/grpc/aio/_call.py", line 326, in _fetch_stream_responses
  await self._raise_for_status()
  File "/usr/local/lib/python3.9/site-packages/grpc/aio/_call.py", line 236, in _raise_for_status
  raise _create_rpc_error(await self.initial_metadata(), await
  grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
    status = StatusCode.INVALID_ARGUMENT
    details = "Received FlowControlRequest that tried to remove messages"
    debug_error_string = "{"created":"@1643639089.487575412","description":"Error received from peer ipv4:108.177.111.95:443","file":"src/core/lib/surface/call.cc","file_line":1074,"grpc_message":"Received FlowControlRequest that tried to remove messages","grpc_status":3}"

Rest of Stack Trace

Screen Shot 2022-01-31 at 7 44 16 PM

Screen Shot 2022-01-31 at 7 44 24 PM

Screen Shot 2022-01-31 at 7 44 34 PM

@product-auto-label product-auto-label bot added the api: pubsublite Issues related to the googleapis/python-pubsublite API. label Feb 1, 2022
@yoshi-automation yoshi-automation added triage me I really want to be triaged. 🚨 This issue needs some love. labels Feb 1, 2022
@meredithslota meredithslota added priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. and removed 🚨 This issue needs some love. triage me I really want to be triaged. labels Feb 9, 2022
@dpcollins-google
Copy link
Collaborator

dpcollins-google commented Feb 17, 2022

Hello,

I'm sorry you're having this issue. My guess is that you are hitting an edge case where you are somehow making the flow control count go negative.

EDIT: The message count is negative, not the byte count. It has a value of -1.

I will also fix the client to not send an invalid request when this happens.

-Daniel

dpcollins-google added a commit that referenced this issue Feb 17, 2022
This can happen if the server sends a message that goes over the byte limit, which should never happen, but apparently does in edge cases. This ensures that the client doesn't fail if this happens.

fixes: #294
dpcollins-google added a commit that referenced this issue Feb 18, 2022
…300)

This can happen if the server sends a message that goes over the byte limit, which should never happen, but apparently does in edge cases. This ensures that the client doesn't fail if this happens.

fixes: #294
gcf-merge-on-green bot pushed a commit that referenced this issue Feb 18, 2022
🤖 I have created a release *beep* *boop*
---


## [1.4.0](v1.3.0...v1.4.0) (2022-02-18)


### Features

* add api key support ([#291](#291)) ([f0d65ca](f0d65ca))


### Bug Fixes

* ensure bytes cannot go negative on requests sent to the server ([#300](#300)) ([6d3690a](6d3690a)), closes [#294](#294)
* ensure that _failure_task exception is always retrieved ([#301](#301)) ([579abf3](579abf3))
* resolve DuplicateCredentialArgs error when using credentials_file ([b7ed9ad](b7ed9ad))

---
This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsublite Issues related to the googleapis/python-pubsublite API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
4 participants