-
Notifications
You must be signed in to change notification settings - Fork 14.6k
KAFKA-14895: [1/N] Move AddPartitionsToTxnManager files to java #19879
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The next PR will move AddPartitionsToTxnManagerTest.scala to java |
// Note: Synchronization is not needed on inflightNodes since it is always accessed from this thread. | ||
inflightNodes.remove(node); | ||
if (response.authenticationException() != null) { | ||
log.error(String.format("AddPartitionsToTxnRequest failed for node %s with an authentication exception.", response.destination()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.error("AddPartitionsToTxnRequest failed for node {} with an authentication exception.", response.destination(), response.authenticationException());
@Override | ||
public Collection<RequestAndCompletionHandler> generateRequests() { | ||
// build and add requests to the queue | ||
List<RequestAndCompletionHandler> list = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please consider using Iterator.remove
to optimize it.
public Collection<RequestAndCompletionHandler> generateRequests() {
// build and add requests to the queue
List<RequestAndCompletionHandler> list = new ArrayList<>();
long currentTimeMs = time.milliseconds();
synchronized (nodesToTransactions) {
var iter = nodesToTransactions.entrySet().iterator();
while (iter.hasNext()) {
var entry = iter.next();
var node = entry.getKey(); // 第一次 next()
var transactionDataAndCallbacks = entry.getValue(); // 第二次 next(),這裡有問題
if (!inflightNodes.contains(node)) {
list.add(new RequestAndCompletionHandler(
currentTimeMs,
node,
AddPartitionsToTxnRequest.Builder.forBroker(transactionDataAndCallbacks.transactionData()),
new AddPartitionsToTxnHandler(node, transactionDataAndCallbacks)
));
inflightNodes.add(node);
iter.remove();
}
}
}
return list;
}
} | ||
} | ||
|
||
private Map.Entry<ListenerName, SecurityProtocol> getInterBrokerListenerNameAndSecurityProtocol() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interBrokerListenerNameAndSecurityProtocol
} | ||
} | ||
|
||
private static SecurityProtocol getSecurityProtocol(String protocolName, String configName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
securityProtocol
@@ -275,7 +275,7 @@ abstract class QuorumTestHarness extends Logging { | |||
formatter.addDirectory(metadataDir.getAbsolutePath) | |||
formatter.setReleaseVersion(metadataVersion) | |||
formatter.setUnstableFeatureVersionsEnabled(true) | |||
formatter.setControllerListenerName(config.controllerListenerNames.head) | |||
formatter.setControllerListenerName(config.controllerListenerNames.stream.findFirst.orElseThrow) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
formatter.setControllerListenerName(config.controllerListenerNames.get(0))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went with this approach at first just to keep things consistent—since .head
throws a NoSuchElementException when the list is empty, and stream.findFirst().orElseThrow()
behaves the same way. get(0)
would throw an IndexOutOfBoundsException instead, so it felt a bit different. But perhaps I'm overthinking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about using Objects.requireNonNull
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ummm - you want to have consistent exception - that is not required I think - get(0)
is sufficient enough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comment! I've applied the get(0)
approach in this PR.
@@ -137,7 +137,9 @@ public ListenerName controllerListenerName() { | |||
.next() | |||
.config() | |||
.controllerListenerNames() | |||
.head() | |||
.stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return new ListenerName(
Objects.requireNonNull(controllers()
.values()
.iterator()
.next()
.config()
.controllerListenerNames()
.get(0))
);
@@ -216,7 +208,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) | |||
def quotaConfig: QuotaConfig = _quotaConfig | |||
|
|||
/** ********* General Configuration ***********/ | |||
var brokerId: Int = getInt(ServerConfigs.BROKER_ID_CONFIG) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was the only remaining usage of these configs AddPartitionsToTxnManager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, this config is used in quite a few places, e.g. Partition.scala. But yeah, moving it to AbstractKafkaConfig is totally fine since KafkaConfig.scala extends AbstractKafkaConfig.java.
@@ -126,7 +126,7 @@ object StorageTool extends Logging { | |||
setClusterId(namespace.getString("cluster_id")). | |||
setUnstableFeatureVersionsEnabled(config.unstableFeatureVersionsEnabled). | |||
setIgnoreFormatted(namespace.getBoolean("ignore_formatted")). | |||
setControllerListenerName(config.controllerListenerNames.head). | |||
setControllerListenerName(config.controllerListenerNames.stream.findFirst.orElseThrow). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we do get(0) here like some of the other suggestions?
I reviewed the main bulk of the change. It looks pretty good! Just some small nits already mentioned. 👍 |
@brandboat please fix the build error |
* An interface which handles the Partition Response based on the Request Version and the exact operation. | ||
*/ | ||
@FunctionalInterface | ||
public interface TransactionSupportedOperation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please consider using java enum instead?
/**
* handles the Partition Response based on the Request Version and the exact operation.
*/
enum TransactionSupportedOperation {
/**
* This is the default workflow which maps to cases when the Produce Request Version or the
* Txn_offset_commit request was lower than the first version supporting the new Error Class.
*/
DEFAULT_ERROR(false),
/**
* This maps to the case when the clients are updated to handle the TransactionAbortableException.
*/
GENERIC_ERROR_SUPPORTED(false),
/**
* This allows the partition to be added to the transactions inflight with the Produce and TxnOffsetCommit requests.
* Plus the behaviors in genericErrorSupported.
*/
ADD_PARTITION(true);
private final boolean supportsEpochBump;
TransactionSupportedOperation(boolean supportsEpochBump) {
this.supportsEpochBump = supportsEpochBump;
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, good call.
Move AddPartitionsToTxnManager to server module and convert to Java.
This patch moves AddPartitionsToTxnManager from the core module to the
server module, with its package updated from
kafka.server
toorg.apache.kafka.server.transaction
. Additionally, severalconfiguration used by AddPartitionsToTxnManager are moved from
KafkaConfig.scala to AbstractKafkaConfig.java.
The next PR will move AddPartitionsToTxnManagerTest.scala to java
Reviewers: Justine Olshan jolshan@confluent.io, Chia-Ping Tsai
chia7712@gmail.com