Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offsets are not managed correctly when the connector fails to process some of the records in a batch #68

Closed
byreshb opened this issue Jan 15, 2021 · 5 comments
Assignees

Comments

@byreshb
Copy link

byreshb commented Jan 15, 2021

Connector was processing 500 records in batch. When one of the records threw an exception, connector stopped processing (Connector task status: FAILED). When connector task was restarted, connector resumed processing skipping the failed record. Maybe the connector falsely committed the offset for all records in the batch.

Connector Plugin Version:

  • 2.0.x

Steps to reproduce:

  • Publish records to a topic with some records having invalid data
  • Start the connector to process records in a batch
  • One of the records failed to process and throws an exception because of invalid data (Example: [row index 487]: invalid: Timestamp field value is out of range:1441393020755000000)
  • Restart the connector task
  • Connector starts running

Expected Result:

  • Connector should try to reprocess the failed records after the task restart and no records in the batch should be skipped

Actual Result:

  • Connector skipped the failed record and continued processing.

Error log snippet:

2021-01-14 23:14:22.882 Z ERROR WorkerSinkTask{id=edw-bigquery-sink-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:567)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:200)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Some write threads encountered unrecoverable errors: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: table insertion failed for the following rows:
        [row index 0]: stopped:
        [row index 1]: stopped:
        .
        .        
        [row index 5]: stopped:
        [row index 6]: stopped:
        [row index 7]: invalid: Timestamp field value is out of range:1783986724881000000
        [row index 8]: stopped:
        [row index 9]: stopped:
        .
        .
        [row index 17]: stopped:
        [row index 18]: stopped:
        [row index 19]: invalid: Timestamp field value is out of range:1783986724881000000
        [row index 20]: stopped:
        .
        .
        [row index 24]: stopped:
        [row index 25]: stopped: , com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: table insertion failed for the following rows:
        [row index 0]: stopped:
        [row index 1]: stopped:
        .
        .
        [row index 9]: stopped:
        [row index 10]: invalid: Timestamp field value is out of range:1783986724881000000
        [row index 11]: invalid: Timestamp field value is out of range:1783986724881000000
        [row index 12]: stopped:
        [row index 13]: stopped:
        .
        .
        [row index 26]: stopped:
        [row index 27]: stopped:
        [row index 28]: stopped: ; See logs for more detail
        at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.maybeThrowEncounteredErrors(KCBQThreadPoolExecutor.java:107)
        at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:218)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:545)
        ... 10 more
2021-01-14 23:14:22.882 Z ERROR WorkerSinkTask{id=edw-bigquery-sink-connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
@C0urante
Copy link

Ugh, sorry about this @byreshb. I introduced this behavior with the guard in

// Return immediately here since the executor will already be shutdown
if (stopped) {
return;
}
which was meant to prevent unnecessary exceptions from being logged when the framework invoked flush on an already-failed task due to KAFKA-10188.

It does technically prevent those unnecessary exceptions from being rethrown, but it has the unfortunate side effect that the call to flush then completes normally, and the framework goes ahead and commits offsets for a batch. So with the case you're bringing up, I think the sequence of steps is:

  1. Framework reads a batch of records from Kafka and sends them to the task
  2. Task fails in put
  3. Framework calls stop on task
  4. Framework calls flush on task in a final attempt to commit offsets back to Kafka for the record batch. Because of the stopped check added in the task's flush method, the task returns and the offset commit succeeds, whereas before, an exception would be thrown that the executor is already shut down and we can't await task completions on it, which would cause the framework to not commit offsets for the batch (desired behavior in this case).

This should only affect snapshot 2.0+ builds of the connector running on Connect workers that don't yet have apache/kafka#8910, which removes that final offset commit attempt (since it's not a great idea to invoke anything on a task after stop except maybe start).

I'll file a fix PR that should hopefully work for this case and prevent the obnoxious messages about not being able to await tasks on a closed executor. Thanks for catching this before it made its way into a release!

@C0urante
Copy link

Opened #70 which should fix this. @byreshb would you be willing to build with that PR branch and give this a shot? I think this should address the issue but it's always nice to have confirmation from the original reporter.

@byreshb
Copy link
Author

byreshb commented Jan 18, 2021

Looks like the offset and the number of records in BQ table doesn't match. Test details are documented below: (done on my local)

Max.poll.records = 5
Number of topic partitions = 1
Number of connect tasks: 1
{ 
    "name": "bigquery-sink-connector-test",
    "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
    "tasks.max": "1",
    "topics": "topic-name",    
    "sanitizeTopics": "true",
    "autoCreateTables": "true",
    "allowSchemaUnionization": "false",
    "allowBigQueryRequiredFieldRelaxation": "true",
    "allBQFieldsNullable": "true",
    "allowNewBigQueryFields": "true",
    "includeKafkaData": "true",
    "schemaRetriever": "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",
    "schemaRegistryLocation": "http://schema-registry:8081",    
    "project": "********",
    "datasets": ".*=kafka_connect_test",
    "defaultDataset": "kafka_connect_test",
    "keyfile": "********",
    "bootstrap.servers": "broker:29092",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",    
    "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",

    "transforms": "InsertField",
    "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.InsertField.timestamp.field": "timestamp"    
}

Publish Message 1 = Correct Data. Lag = 0, Current Offset = 1, End Offset = 1 (0 message in BQ table) – New Error? Expected 1 message in BQ Table
Publish Message 2 = Correct Data, Lag = 0, Current Offset = 2, End Offset = 2 (1 messages in BQ table)
Publish Message 3 = Correct Data, Lag = 0, Current Offset = 3, End Offset = 3 (2 messages in BQ table)
Publish Message 4 = Correct Data, Lag = 0, Current Offset = 4, End Offset = 4 (3 messages in BQ table)
Publish Message 5 = Correct Data, Lag = 0, Current Offset = 5, End Offset = 5 (4 messages in BQ table)

Connector: Running, Connect Task: Running

Publish Message 6 = Invalid Data, Lag = 1, Current Offset = 5, End Offset = 6 (4 messages in BQ table)
(Invalid Data: Kafka topic accepts it but bigquery cannot insert record)

Connector: Running, Connect Task: Failed

Publish Message 7 = Correct Data, Lag = 2, Current Offset = 5, End Offset = 7 (4 messages in BQ table)
Publish Message 8 = Correct Data, Lag = 3, Current Offset = 5, End Offset = 8 (4 messages in BQ table)
Publish Message 9 = Correct Data, Lag = 4, Current Offset = 5, End Offset = 9 (4 messages in BQ table)
Publish Message 10 = Correct Data, Lag = 5, Current Offset = 5, End Offset = 10 (4 messages in BQ table)
Publish Message 11 = Correct Data, Lag = 6, Current Offset = 5, End Offset = 11 (4 messages in BQ table)
Publish Message 12 = Correct Data, Lag = 7, Current Offset = 5, End Offset = 12 (4 messages in BQ table)
Publish Message 13 = Correct Data, Lag = 8, Current Offset = 5, End Offset = 13 (4 messages in BQ table)

Restart Connect Task:
curl -X POST localhost:8083/connectors/edw-bigquery-sink-connector-test/tasks/0/restart

Lag = 8, Current Offset = 5, End Offset = 13 (7 messages in BQ table) – Offset didn’t change but there are 3 new records in BQ table which doesn’t look correct
Connector: Running, Connect Task: Failed

Restarted the connect task and the connector service few more times. No change in the result
Lag = 8, Current Offset = 5, End Offset = 13 (7 messages in BQ table)
Connector: Running, Connect Task: Failed

@C0urante
Copy link

C0urante commented Jan 19, 2021

@byreshb extra records in the table are expected in this case. The connector doesn't manually manage offsets unless upsert/delete is turned on since manual offset management causes things to break when SMTs are used that change topic names in sink records. As a result, it can only commit an entire batch at a time, where a "batch" is all the records sent to the record since the last offset commit. So, if it reads some valid records, sends those to BigQuery successfully, then reads a record that causes it to fail, it's unable to commit offsets for the valid records, and will re-process them when restarted.

This is obviously not great but it's not a regression; the regression here is that instead of reprocessing failed records, the connector actually skips them and drops them forever, which can lead to data loss. That's all that #70 is meant to address; manual offset management can be added separately (although it would either cause the connector to break on topic-altering SMTs or would require a KIP to add some kind of API for knowing the original topic/partition/offset of each record).

C0urante added a commit that referenced this issue Jan 19, 2021
* GH-68: Continue to check for errors in flush even after stopped

* GH-68: Add unit test
@C0urante
Copy link

Closing as I believe this has been addressed with #70 (as confirmed with the testing by @byreshb).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants