Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka-connector not flushing messages (ack) #277

Open
nicodds opened this issue May 14, 2021 · 1 comment
Open

kafka-connector not flushing messages (ack) #277

nicodds opened this issue May 14, 2021 · 1 comment

Comments

@nicodds
Copy link

nicodds commented May 14, 2021

I recently synced the repository with my local copy, in order to test the new advancements. I noticed that the connector became unstable and is incurring in a serious problem, since it is not flushing some messages from the subscription. Consequently I'm experiencing frequent duplicates: the same messages are pulled several times from the PubSub subscription because the ack deadline expires).

The errors in the logs are as follow:

[2021-05-14 16:25:46,674] WARN Failed to acknowledge message: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Request payload size exceeds the limit: 524288 bytes. (com.google.pubsub.kafka.source.CloudPu
bSubSourceTask:366)

.......

[2021-05-14 16:26:21,905] ERROR WorkerSourceTask{id=aaaaa_schedule-2} Failed to flush, timed out while waiting for producer to flush outstanding 440 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:492)
[2021-05-14 16:26:21,906] ERROR WorkerSourceTask{id=aaaaa_schedule-2} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:113)
[2021-05-14 16:26:21,906] INFO WorkerSourceTask{id=aaaaa_schedule-3} flushing 1294 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2021-05-14 16:26:22,943] INFO WorkerSourceTask{id=aaaaa-settlement-3} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2021-05-14 16:26:22,944] ERROR WorkerSourceTask{id=aaaaa-settlement-3} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:184)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:298)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:324)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)   
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Invalid schema type for ByteArrayConverter: STRUCT
        at org.apache.kafka.connect.converters.ByteArrayConverter.fromConnectData(ByteArrayConverter.java:55)
        at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:298)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
        ... 11 more
May 14, 2021 4:26:22 PM io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueue
SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=70, target=pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
java.lang.RuntimeException: ManagedChannel allocation site
        at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:93)
        at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
        at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
        at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:518)
        at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:327)
        at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.access$1700(InstantiatingGrpcChannelProvider.java:74)
        at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider$1.createSingleChannel(InstantiatingGrpcChannelProvider.java:220)
        at com.google.api.gax.grpc.ChannelPool.create(ChannelPool.java:72)
        at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:227)
        at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:210)
        at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:169)
        at com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create(GrpcSubscriberStub.java:272)
        at com.google.pubsub.kafka.source.CloudPubSubSourceConnector.verifySubscription(CloudPubSubSourceConnector.java:314)
        at com.google.pubsub.kafka.source.CloudPubSubSourceConnector.start(CloudPubSubSourceConnector.java:157)
        at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:184)
        at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:209)
        at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:348)
        at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:331)
        at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:140)
        at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:117)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)


@nicodds
Copy link
Author

nicodds commented May 14, 2021

feel free to write for any further detail

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant