Process Data-ingestion messages through Batched async mechanism to improve throughput. #71
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Current Problem :
Presently, only a single Listener container is generated to consume messages from the Connectors ingestion Topic (across all the partitions) . This Listener sequentially processes each record returned by the internal
poll()
method which eventually affects the overall throughput of Connector, since the downstreamTransactionConsumer#listen
does a blocking call for writing transactions which could span for few seconds.Therefore given a scenario where
TransactionConsumer#listen
takes 2 seconds complete, in order to process 100 incoming records fetched by thepoll()
method it takes around 50 seconds.Proposed Fix :
Assign a dedicated Listener Container for each partition in the Topic, per connector instance (capped to a max of 6 Listeners, in order to avoid spawning a large number of Listeners for high-partitioned Topics ).
Each Listener gets a batch of Messages from the Partition it is assigned to, this batch is processed asynchronously by submitting it to a task executor in one go. The Listener thread defers the next poll until the entire records are processed parallelly. Once the batch is processed, Listener gets the next Batch from poll()
In case one of the records encounters an exception while processing parallelly, we perform a partial Batch commit and the failed and unprocessed records are sent again in the next poll()