-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14742: Throttle connectors in ExactlyOnceSourceIntegrationTest to fix flakey OOMEs #13291
KAFKA-14742: Throttle connectors in ExactlyOnceSourceIntegrationTest to fix flakey OOMEs #13291
Conversation
…to fix flakey OOMEs Signed-off-by: Greg Harris <greg.harris@aiven.io>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many thanks for the PR, looks good to me.
I could see that the number of records produced was so large that running the tests I was getting an OOM too.
I jsut left a couple fo small comments
@@ -81,6 +81,7 @@ | |||
import static org.apache.kafka.connect.integration.MonitorableSourceConnector.CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG; | |||
import static org.apache.kafka.connect.integration.MonitorableSourceConnector.CUSTOM_TRANSACTION_BOUNDARIES_CONFIG; | |||
import static org.apache.kafka.connect.integration.MonitorableSourceConnector.MESSAGES_PER_POLL_CONFIG; | |||
import static org.apache.kafka.connect.integration.MonitorableSourceConnector.THROUGHPUT_CONFIG; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could be THROUGHPUT_MSGS_PER_SEC_CONFIG
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that would make sense if the underlying configuration was throughput.msgs.per.sec
but it is currently throughput
. I preferred to keep the existing name instead of renaming + aliasing the configuration. just to keep this PR small.
Do you think renaming the configuration is important here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to go and look what unit throughput
was in and ThroughputThrottler
says Can be messages/sec or bytes/sec
that's why its name is generic.
In the case of this test, it is messages/sec, so for me the longer name I suggested helps readability.
@@ -266,6 +267,7 @@ public void testPollBoundary() throws Exception { | |||
props.put(NAME_CONFIG, CONNECTOR_NAME); | |||
props.put(TRANSACTION_BOUNDARY_CONFIG, POLL.toString()); | |||
props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced)); | |||
props.put(THROUGHPUT_CONFIG, Integer.toString(recordsProduced)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the config is a Long, so these settings could be
Long.toString(100L)
I checked the test that OOM'ing for me too and the number of records actually produced with your setting is still much larger than actually erquired.
I found using the same variable recordsProduced
for throughput was a bit puzzling, maybe just using another literal would be ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
even 50 msgs/sec will be enough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the result of Integer.toString(100) and Long.toString(100L) are the same, I don't think this necessary.
The reason I re-used the same variable was because I wanted to keep the runtime of the test constant. If there were two variables, someone could tune one while holding the other constant until the test timed out.
I agree that recordsProduced
is a poor name, because this test produces many more records than that under normal conditions. Do you have a better name in mind?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, it may not be necessary to use Long instead of Integer but it helps. Property is a long, I'd prefer to set a long rather than rely on conversion.
And using two variables instead of one, although with related values is again helping readability.
Reusing the same one is something that makes me stop and think "why...?"
So having a 2nd variable e.g. like
long throughput_msgs_sec = recordsProduced / 2L;
would be my preference (and a short line comment for it) e.g.
// need to limit actual records.count() to avoid OOM
Signed-off-by: Greg Harris <greg.harris@aiven.io>
I pulled these values out into three separate constants with descriptive names, let me know if this is closer to what you had in mind! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks LGTM
…tream-trunk-27-Feb-2023 * commit 'dcc179995153c22c6248702976b60755b0b9fda8': MINOR: srcJar should depend on processMessages task (apache#13316) KAFKA-14659 source-record-write-[rate|total] metrics should exclude filtered records (apache#13193) MINOR: ExponentialBackoff Javadoc improvements (apache#13317) KAFKA-14742: Throttle connectors in ExactlyOnceSourceIntegrationTest to fix flakey OOMEs (apache#13291)
On my local machine, testIntervalBoundary is asserting on nearly 2.5 million records, when it appears that the test is written to need only 100-1000 records to perform assertions. This causes OOMEs in the test assertions which iterate over the set of records and perform memory allocations.
I looked into reducing the assertion's memory overhead, but it didn't seem practical as even the smallest allocations appeared to exceed the memory limit.
Instead, I configured the pre-existing throttle mechanism inside the MonitorableSourceConnector, so that tests now seem to produce ~90k records on my machine, leaving adequate spare memory for the existing assertions to pass without issue.
Committer Checklist (excluded from commit message)