Apache Iceberg version
1.10.1 (latest release)
Query engine
None
Please describe the bug 🐞
There is a performance problem on Coordinator's check on commit readiness which significantly degrade the system performance when there is a backlog on message processing on control topic.
During each commit cycle, Coordinator reads each DATA_COMPLETE message from the control topic and calling commitState.isCommitReady() method to see whether we got all topic partitions represented from those messages. However check is done through a loop for all previous messages. If there are n DATA_COMPLETE messages, this is an O(N^2) calculation.
When everything is smooth, the n is usually bound by the number of workers in the Kafka Connect cluster and the number of source topic partitions each worker needs to process but when there is a backlog building up on the control topic, the things goes spiral down. Often the backlog buildup started when there was a networking or HiveMetaStore availability issue, the Coordinator has problems committing entries to HMS. The commit failed and the retry on the next commit cycle needs to process 2n messages from the control topic (because worker still keeps generating). The inefficient processing of CommitState.isCommitReady() coupled with increased number of Kafka messages to be processed from the control topic cause the next commit cycle more prone to failure.
The fix for this performance issue is simple, we just need to use a map to cache the topic partition names we have seen so far in CommitState. Once the size of the map reaches the expected count, the commit is ready. This should be an O(n) calculation.
Willingness to contribute
Apache Iceberg version
1.10.1 (latest release)
Query engine
None
Please describe the bug 🐞
There is a performance problem on Coordinator's check on commit readiness which significantly degrade the system performance when there is a backlog on message processing on control topic.
During each commit cycle, Coordinator reads each DATA_COMPLETE message from the control topic and calling commitState.isCommitReady() method to see whether we got all topic partitions represented from those messages. However check is done through a loop for all previous messages. If there are n DATA_COMPLETE messages, this is an O(N^2) calculation.
When everything is smooth, the n is usually bound by the number of workers in the Kafka Connect cluster and the number of source topic partitions each worker needs to process but when there is a backlog building up on the control topic, the things goes spiral down. Often the backlog buildup started when there was a networking or HiveMetaStore availability issue, the Coordinator has problems committing entries to HMS. The commit failed and the retry on the next commit cycle needs to process 2n messages from the control topic (because worker still keeps generating). The inefficient processing of CommitState.isCommitReady() coupled with increased number of Kafka messages to be processed from the control topic cause the next commit cycle more prone to failure.
The fix for this performance issue is simple, we just need to use a map to cache the topic partition names we have seen so far in CommitState. Once the size of the map reaches the expected count, the commit is ready. This should be an O(n) calculation.
Willingness to contribute