-
Notifications
You must be signed in to change notification settings - Fork 0
-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Re-processing of records on restart because of conflicting offsets ?? #20
Comments
Could be because of tmp files as stated here - |
This is due to confluentinc/kafka-connect-hdfs#224. I'm looking into the code whether there is a fix. |
But in our case, i think this is not the issue since the Take a look at this function which fetches the max offset - https://github.com/confluentinc/kafka-connect-hdfs/blob/18e5149859867fdb1e6002ad063be27dd91f5180/src/main/java/io/confluent/connect/hdfs/FileUtils.java#L106 |
I don't think it is. The TopicPartitionWriter will use the partition path, and that is public String generatePartitionedPath(String topic, String encodedPartition) {
return topic + "/" + encodedPartition;
} which in turn uses our Schema-based partioning. I've done a test and it seems like our partitioner is no longer needed. The bug we needed it for is fixed upstream. I'm creating a integration test as well to show this. |
This was fixed by using the default partitioner as in #21 and deleting old stale temp files. |
I get a lot of messages like following on restart of the container -
This causes records to be processed again. Since the HDFS connector reads offsets from the file names, here is a snapshot of the files in hdfs for the above case.
hdfs dfs -ls /topicAndroidNew/questionnaire_phq8
hdfs dfs -ls /topicAndroidNew/questionnaire_phq8/partition=1
hdfs dfs -ls /topicAndroidNew/questionnaire_phq8/partition=1_key_schema=org.radarcns.kafka.ObservationKey-1_value_schema=org.radarcns.active.questionnaire.Questionnaire-1
hdfs dfs -ls /topicAndroidNew/questionnaire_phq8/partition=1_key_schema=org.radarcns.kafka.ObservationKey-1_value_schema=org.radarcns.active.questionnaire.Questionnaire-2
hdfs dfs -ls /topicAndroidNew/questionnaire_phq8/partition=1_key_schema=org.radarcns.kafka.ObservationKey-1_value_schema=org.radarcns.active.questionnaire.Questionnaire-3
hdfs dfs -ls /topicAndroidNew/questionnaire_phq8/partition=1_key_schema=org.radarcns.kafka.ObservationKey-1_value_schema=org.radarcns.active.questionnaire.Questionnaire-4
hdfs dfs -ls /topicAndroidNew/questionnaire_phq8/partition=1_key_schema=org.radarcns.kafka.ObservationKey-1_value_schema=org.radarcns.active.questionnaire.Questionnaire-6
I know these folders are because of the custom partitioner which uses the schema version. The current latest schema version for phq8-value is 6. As you can see that in the above files for version 6, the offset is 23, so yes it get the correct next offset 24, but then it says
Fetch offset 24 is out of range
and then resets it to 11. Where is it getting this offset 11 and why does it reset it to 11 ?? This causes records from offset 11 to 23 to be re-processed.The text was updated successfully, but these errors were encountered: