Skip to content

PubSub Subscriber throws StatusRuntimeException when holding many leases #1126

@relud

Description

@relud

Environment details

  1. OS type and version: max os x 10.16; linux 5.4.170+ (osImage: Container-Optimized OS from Google)
  2. Java version: Oracle Java 11.0.10 on mac os; openjdk version "11.0.15" 2022-04-19 in docker/gke
  3. version(s): google-cloud-pubsub 1.116.0, 1.116.1, 1.116.2, 1.116.3, 1.116.4, 1.117.0

Steps to reproduce

  1. create pubsub topic and subscription
  2. publish at least 2775 messages to the topic
  3. use com.google.cloud.pubsub.v1.Subscriber to consume at least 2775 messages and hold their lease for at least 15 seconds.
  4. observe stack trace below

Code example

import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class HoldLeases {

  public static void main(String[] args) throws Exception {

    final String projectId = ServiceOptions.getDefaultProjectId();
    final TopicAdminClient topicAdminClient = TopicAdminClient.create();
    final SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create();
    final Subscription subscription = Subscription.newBuilder()
        .setName("projects/" + projectId + "/subscriptions/test-subscription-" + UUID.randomUUID())
        .setTopic("projects/" + projectId + "/topics/test-topic-" + UUID.randomUUID())
        .setAckDeadlineSeconds(10).build();

    topicAdminClient.createTopic(subscription.getTopic());
    subscriptionAdminClient.createSubscription(subscription.getName(), subscription.getTopic(),
        PushConfig.getDefaultInstance(), 0);
    final Publisher publisher = Publisher.newBuilder(subscription.getTopic()).build();

    try {
      // publish enough messages to exceed a single ModifyAckDeadline request
      int sentCount = 2775;
      List<ApiFuture<String>> published = new ArrayList<>(sentCount);
      for (int i = 0; i < sentCount; i++) {
        published.add(publisher
            .publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("" + i)).build()));
      }
      publisher.publishAllOutstanding();
      for (ApiFuture<String> future : published) {
        future.get();
      }

      // pull all messages and hold them for 1 minute
      final CompletableFuture<Void> done = new CompletableFuture<Void>().orTimeout(15,
          TimeUnit.SECONDS);
      final MessageReceiver receiver = (message, consumer) -> {
        done.whenComplete((result, exception) -> {
          if (exception == null) {
            consumer.ack();
          } else {
            consumer.nack();
          }
        });
      };

      final Subscriber subscriber = Subscriber
          .newBuilder(ProjectSubscriptionName.parse(subscription.getName()), receiver)
          .setFlowControlSettings(
              FlowControlSettings.newBuilder().setMaxOutstandingElementCount(2775L)
                  .setMaxOutstandingRequestBytes(30_000_000L).build())
          .build();
      done.whenComplete((v, e) -> subscriber.stopAsync());
      try {
        subscriber.startAsync();
        subscriber.awaitTerminated();
      } finally {
        subscriber.stopAsync();
      }
    } finally {
      topicAdminClient.deleteTopic(subscription.getTopic());
      subscriptionAdminClient.deleteSubscription(subscription.getName());
      publisher.shutdown();
    }
  }
}

Stack trace

May 11, 2022 11:56:01 AM com.google.cloud.pubsub.v1.StreamingSubscriberConnection$2 onFailure
WARNING: failed to send operations
com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Request payload size exceeds the limit: 524288 bytes.
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92)
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:41)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:86)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
	at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
	at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:67)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1132)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038)
	at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:808)
	at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:563)
	at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:533)
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:535)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562)
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Request payload size exceeds the limit: 524288 bytes.
	at io.grpc.Status.asRuntimeException(Status.java:535)
	... 17 more

Any additional information below

Issue does not appear with library version 1.115.5 and below.

I don't see the issue with less than 2772 messages in flight, but it's didn't start reliably showing until 2775 messages in flight.

Metadata

Metadata

Assignees

Labels

🚨This issue needs some love.api: pubsubIssues related to the googleapis/java-pubsub API.triage meI really want to be triaged.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions