New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15087 Move/rewrite InterBrokerSendThread to server-commons #13856
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dimitarndimitrov Thanks for the PR. I made a pass on it and it looks pretty good. I left a bunch of small comments for consideration.
awaitShutdown(); | ||
try { | ||
networkClient.close(); | ||
} catch (IOException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was a bit surprised by this one. What's the reasoning here? I was wondering if we should just use closeQuietly
. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, I didn't know about closeQuietly
- I'll just integrate it here.
As for the reasoning - I wasn't sure whether it's OK to suppress the exception (as in theory it's not currently guaranteed that it strictly can't be thrown) so I decided to be overly conservative.
true, | ||
requestTimeoutMs, | ||
request.handler | ||
))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You mix two code styles here. If you put the closing parenthesis on a new line like here, you should do it for all of them.
generateRequests().forEach(request ->
unsentRequests.put(
request.destination,
networkClient.newClientRequest(
request.destination.idString(),
request.request,
request.creationTimeMs,
true,
requestTimeoutMs,
request.handler
)
)
);
Alternatively, you can keep all the closing parenthesis after request.handler
. I personally prefer the former style but I leave it up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done (using the former style).
void completeWithDisconnect(ClientRequest request, | ||
long now, | ||
AuthenticationException authenticationException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We never format method like this. The following would be better.
void completeWithDisconnect(
ClientRequest request,
long now,
AuthenticationException authenticationException
) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
true /* disconnected */, | ||
null /* versionMismatch */, | ||
authenticationException, | ||
null)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: On closing parenthesis, whatever style you decide to use, let's try to be consistent in the file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
- Interestingly while trying to address all the occurrences I noticed that the style you've suggested, with each closing parentheses on its own new line, is also consistent with the method signature formatting you've suggested.
} | ||
} | ||
|
||
void completeWithDisconnect(ClientRequest request, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we make it private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done (also made it static
).
same(handler.handler))) | ||
.thenReturn(clientRequest); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: format.
|
||
when(networkClient.connectionDelay(any(), anyLong())).thenReturn(0L); | ||
|
||
when(networkClient.poll(anyLong(), anyLong())).thenReturn(new ArrayList<>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: emptyList()
?
)) | ||
} | ||
} | ||
} | ||
None | ||
util.Collections.emptyList() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Is there a reason why we need util.
prefix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had the exact same thought - getting rid of the util
prefixes in the various places in Scala classes where Java collection types are now used.
Unfortunately IntelliJ flagged the Collection
return type of generateRequests()
with a Reference must be prefixed
warning, so as I didn't find a nice way to avoid the prefix there, I decided it's preferable to just keep the prefixes for all the related usages of Java collection types.
@@ -37,7 +39,8 @@ class TransactionDataAndCallbacks(val transactionData: AddPartitionsToTxnTransac | |||
|
|||
|
|||
class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time: Time) | |||
extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + config.brokerId, client, config.requestTimeoutMs, time) { | |||
extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + config.brokerId, client, config.requestTimeoutMs, time) | |||
with Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to set the log prefix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch, thanks!
isInterruptible = false | ||
) { | ||
false | ||
) with Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto?
The Java rewrite is kept relatively close to the Scala original to minimize potential newly introduced bugs and to make reviewing simpler. The following details might be of note: - The `Logging` trait moved to InterBrokerSendThread with the rewrite of ShutdownableThread has been similarly moved to any subclasses that currently use it. InterBrokerSendThread's own logging has been made to use ShutdownableThread's logger which mimics the prefix/log identifier that the trait provided. - The case RequestAndCompletionHandler class has been made a separate POJO class and the internal-use UnsentRequests class has been kept as a static nested class. - The relatively commonly used but internal (not part of the public API) clients classes that InterBrokerSendThread relies on have been allowlisted in the server-common import control. - The accompanying test class has also been moved and rewritten with one new test added and most of the pre-existing tests made stricter.
73ab878
to
827af58
Compare
Rebased and force-pushed to address a trivial conflict in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks. I will wait on the CI to complete and merge it if it looks good.
The Java rewrite is kept relatively close to the Scala original to minimize potential newly introduced bugs and to make reviewing simpler. The following details might be of note:
Logging
trait moved toInterBrokerSendThread
with the rewrite ofShutdownableThread
has been similarly moved to any subclasses that currently use it.InterBrokerSendThread
's own logging has been made to useShutdownableThread
's logger which mimics the prefix/log identifier that the trait provided.RequestAndCompletionHandler
class has been made a separate POJO class and the internal-useUnsentRequests
class has been kept as a static nested class.InterBrokerSendThread
relies on have been allowlisted in the server-common import control.InterBrokerSendThread
-generateRequests()
- now returns a Java collection soasScala
has been used at the various call-sites in Scala classes.The local test run after the changes ended with 7661 tests completed, 2 failed, 3 skipped and the failures being seemingly unrelated and not in server-common or core.
Committer Checklist (excluded from commit message)