Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 Sink errors "Commit of offsets threw an unexpected exception for sequence number 1: null" #335

Closed
deuscapturus opened this issue Jun 1, 2020 · 3 comments

Comments

@deuscapturus
Copy link

deuscapturus commented Jun 1, 2020

S3 Sink Connector does not work for me on version 5.5.0 and 5.4.2

Connector fails with error "Commit of offsets threw an unexpected exception for sequence number 1: null"

My Dockerfile for kafka-connect is:

FROM confluentinc/cp-kafka-connect:5.5.0

RUN confluent-hub install confluentinc/kafka-connect-s3:5.5.0 --no-prompt

My connector configuration:

curl -X POST http://my-kafka-connect:8083/connectors -H "Content-Type: application/json" -d '
{
  "name": "stack_exchange_posts_to_s3",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "1",
    "topics": "exp.inferences.v1.StackExchangePosts",
    "s3.region": "us-west-2",
    "s3.bucket.name": "my-s3-bucket",
    "s3.part.size": "26214400",
    "flush.size": "100",
    "enhanced.avro.schema.support": "true",
    "partition.duration.ms": "60000",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "partitioner.class": "io.confluent.connect.storage.partitioner.FieldPartitioner",
    "partition.field.name":"community",
    "schema.compatibility": "FULL_TRANSITIVE",
    "rotate.schedule.interval.ms": "60000",
    "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://my-avro-schema-registry:8081",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter"
  }
}'

kafka-connect output

[2020-06-01 16:46:05,114] INFO [Consumer clientId=connector-consumer-stack_exchange_posts_to_s3-0, groupId=connect-stack_exchange_posts_to_s3] Finished assignment for group at generation 1: {connector-consumer-stack_exchange_posts_to_s3-0-f755ccf0-d328-43f7-b4b4-39627f5b0f68=Assignment(partitions=[exp.inferences.v1.StackExchangePosts-0, exp.inferences.v1.StackExchangePosts-1, exp.inferences.v1.StackExchangePosts-2])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-06-01 16:46:05,121] INFO [Consumer clientId=connector-consumer-stack_exchange_posts_to_s3-0, groupId=connect-stack_exchange_posts_to_s3] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-06-01 16:46:05,121] INFO [Consumer clientId=connector-consumer-stack_exchange_posts_to_s3-0, groupId=connect-stack_exchange_posts_to_s3] Adding newly assigned partitions: exp.inferences.v1.StackExchangePosts-2, exp.inferences.v1.StackExchangePosts-0, exp.inferences.v1.StackExchangePosts-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-06-01 16:46:05,182] INFO [Consumer clientId=connector-consumer-stack_exchange_posts_to_s3-0, groupId=connect-stack_exchange_posts_to_s3] Found no committed offset for partition exp.inferences.v1.StackExchangePosts-2 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-06-01 16:46:05,182] INFO [Consumer clientId=connector-consumer-stack_exchange_posts_to_s3-0, groupId=connect-stack_exchange_posts_to_s3] Found no committed offset for partition exp.inferences.v1.StackExchangePosts-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-06-01 16:46:05,182] INFO [Consumer clientId=connector-consumer-stack_exchange_posts_to_s3-0, groupId=connect-stack_exchange_posts_to_s3] Found no committed offset for partition exp.inferences.v1.StackExchangePosts-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-06-01 16:46:05,186] INFO [Consumer clientId=connector-consumer-stack_exchange_posts_to_s3-0, groupId=connect-stack_exchange_posts_to_s3] Resetting offset for partition exp.inferences.v1.StackExchangePosts-1 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
[2020-06-01 16:46:05,188] INFO [Consumer clientId=connector-consumer-stack_exchange_posts_to_s3-0, groupId=connect-stack_exchange_posts_to_s3] Resetting offset for partition exp.inferences.v1.StackExchangePosts-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
[2020-06-01 16:46:05,189] INFO [Consumer clientId=connector-consumer-stack_exchange_posts_to_s3-0, groupId=connect-stack_exchange_posts_to_s3] Resetting offset for partition exp.inferences.v1.StackExchangePosts-2 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
[2020-06-01 16:46:05,220] WARN WorkerSinkTask{id=stack_exchange_posts_to_s3-0} Offset commit failed during close (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2020-06-01 16:46:05,220] ERROR WorkerSinkTask{id=stack_exchange_posts_to_s3-0} Commit of offsets threw an unexpected exception for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
java.lang.NullPointerException
	at io.confluent.connect.s3.S3SinkTask.preCommit(S3SinkTask.java:216)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:383)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:598)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:200)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
[2020-06-01 16:46:05,221] ERROR WorkerSinkTask{id=stack_exchange_posts_to_s3-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
java.lang.NullPointerException
	at io.confluent.connect.s3.S3SinkTask.close(S3SinkTask.java:229)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:401)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:598)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:200)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
[2020-06-01 16:46:05,221] ERROR WorkerSinkTask{id=stack_exchange_posts_to_s3-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
[2020-06-01 16:46:05,221] INFO [Consumer clientId=connector-consumer-stack_exchange_posts_to_s3-0, groupId=connect-stack_exchange_posts_to_s3] Revoke previously assigned partitions exp.inferences.v1.StackExchangePosts-2, exp.inferences.v1.StackExchangePosts-0, exp.inferences.v1.StackExchangePosts-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-06-01 16:46:05,221] WARN WorkerSinkTask{id=stack_exchange_posts_to_s3-0} Offset commit failed during close (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2020-06-01 16:46:05,221] ERROR WorkerSinkTask{id=stack_exchange_posts_to_s3-0} Commit of offsets threw an unexpected exception for sequence number 2: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
java.lang.NullPointerException
	at io.confluent.connect.s3.S3SinkTask.preCommit(S3SinkTask.java:216)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:383)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:598)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1300(WorkerSinkTask.java:69)
	at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:674)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:297)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:726)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:950)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:872)
	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2343)
	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2310)
	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2260)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:168)
	at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:163)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:190)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
[2020-06-01 16:46:05,221] INFO [Consumer clientId=connector-consumer-stack_exchange_posts_to_s3-0, groupId=connect-stack_exchange_posts_to_s3] Member connector-consumer-stack_exchange_posts_to_s3-0-f755ccf0-d328-43f7-b4b4-39627f5b0f68 sending LeaveGroup request to coordinator 10.107.231.59:9092 (id: 2147483642 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
@deuscapturus deuscapturus changed the title 5.5 S3 Sink errors "Commit of offsets threw an unexpected exception for sequence number 1: null" S3 Sink errors "Commit of offsets threw an unexpected exception for sequence number 1: null" Jun 1, 2020
@deuscapturus
Copy link
Author

I fixed this myself by add "timestamp": "UTC" to the connector configuration.

@KulykDmytro
Copy link

KulykDmytro commented Aug 26, 2020

I fixed this myself by add "timestamp": "UTC" to the connector configuration.

"timezone": "UTC"

@maoljaca
Copy link

I fixed this myself by add "timestamp": "UTC" to the connector configuration.

Fixed it for me as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants