New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace BulkProcessor with our own implementation #2
Conversation
@ewencp Ready for review. This PR replaces the BulkProcessor with our own implementation. The implementation did the following:
|
ca0d458
to
0e08a15
Compare
private static final long LINGER_MS_DEFAULT = 1; | ||
private static final String LINGER_MS_DISPLAY = "Linger (ms)"; | ||
|
||
public static final String MAX_INFLIGHT_REQUESTS_CONFIG = "max.in.flight.requests"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
INFLIGHT -> IN_FLIGHT
9c21f43
to
283f8e9
Compare
@ewencp Ready for another round of review. Here are the major changes:
|
test this please |
* @param maxInflightRequests The max allowed number of inflight requests. | ||
* @return an instance of ElasticsearchWriter Builder. | ||
*/ | ||
public Builder setMaxInflightRequests(int maxInflightRequests) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, and not really that important since this is all internal apis, but since "in flight" is two words, I'd normally expect maxInFlightRequests
instead of maxInflightRequests
. (Same capitalization appears elsewhere too.)
606ba13
to
27bc4d4
Compare
@ewencp Thanks for the review. Addressed the comments. PTAL. |
} | ||
|
||
private boolean canSubmit(RecordBatch batch, long now) { | ||
if (guaranteeOrdering && muted) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's leave this to a follow up patch since we've already got lots of updates in this one, but here's another idea. Could we just move retries into the execute()
method? In the case of max in flight == 1, we just want to block on the first request until all retries are exhausted. With 1 executor, that should happen (as long as we can kill everything before any other submissions are processed). With max in flight > 1, we want to process multiple in parallel, but we know we won't have ordering guarantees, so we can just retry each as many times as needed (and we want each retry to be handled before anything behind it in the queue), and then give up and log an error. I think this approach would significantly simplify things?
@Ishiihara Left one final follow up re: delivery guarantees, but lets address it in follow up. LGTM. I do also see this transient failure, but rarely -- after running the tests 6 or 7 times. I think some of the messages are expected, but not sure which (if any) exceptions are related to the real issue:
|
27bc4d4
to
83be437
Compare
No description provided.