Skip to content

KAFKA-14895: [1/N] Move AddPartitionsToTxnManager files to java #19879

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 7, 2025

Conversation

brandboat
Copy link
Member

@brandboat brandboat commented Jun 2, 2025

Move AddPartitionsToTxnManager to server module and convert to Java.
This patch moves AddPartitionsToTxnManager from the core module to the
server module, with its package updated from kafka.server to
org.apache.kafka.server.transaction. Additionally, several
configuration used by AddPartitionsToTxnManager are moved from
KafkaConfig.scala to AbstractKafkaConfig.java.

  • brokerId
  • requestTimeoutMs
  • controllerListenerNames
  • interBrokerListenerName
  • interBrokerSecurityProtocol
  • effectiveListenerSecurityProtocolMap

The next PR will move AddPartitionsToTxnManagerTest.scala to java

Reviewers: Justine Olshan jolshan@confluent.io, Chia-Ping Tsai
chia7712@gmail.com

@github-actions github-actions bot added the core Kafka Broker label Jun 2, 2025
@brandboat brandboat changed the title KAKFA-14895: (WIP) [1/N] Move AddPartitionsToTxnManager files to java KAFKA-14895: (WIP) [1/N] Move AddPartitionsToTxnManager files to java Jun 2, 2025
@brandboat
Copy link
Member Author

brandboat commented Jun 2, 2025

The next PR will move AddPartitionsToTxnManagerTest.scala to java

@brandboat brandboat changed the title KAFKA-14895: (WIP) [1/N] Move AddPartitionsToTxnManager files to java KAFKA-14895: [1/N] Move AddPartitionsToTxnManager files to java Jun 2, 2025
@brandboat
Copy link
Member Author

AC, gentle ping @chia7712, @jolshan. Could you please take a look when you are available? Thank you!

// Note: Synchronization is not needed on inflightNodes since it is always accessed from this thread.
inflightNodes.remove(node);
if (response.authenticationException() != null) {
log.error(String.format("AddPartitionsToTxnRequest failed for node %s with an authentication exception.", response.destination()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.error("AddPartitionsToTxnRequest failed for node {} with an authentication exception.", response.destination(), response.authenticationException());

@Override
public Collection<RequestAndCompletionHandler> generateRequests() {
// build and add requests to the queue
List<RequestAndCompletionHandler> list = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider using Iterator.remove to optimize it.

    public Collection<RequestAndCompletionHandler> generateRequests() {
        // build and add requests to the queue
        List<RequestAndCompletionHandler> list = new ArrayList<>();
        long currentTimeMs = time.milliseconds();
        synchronized (nodesToTransactions) {
            var iter = nodesToTransactions.entrySet().iterator();
            while (iter.hasNext()) {
                var entry = iter.next();
                var node = entry.getKey(); // 第一次 next()
                var transactionDataAndCallbacks = entry.getValue(); // 第二次 next(),這裡有問題
                if (!inflightNodes.contains(node)) {
                    list.add(new RequestAndCompletionHandler(
                            currentTimeMs,
                            node,
                            AddPartitionsToTxnRequest.Builder.forBroker(transactionDataAndCallbacks.transactionData()),
                            new AddPartitionsToTxnHandler(node, transactionDataAndCallbacks)
                    ));
                    inflightNodes.add(node);
                    iter.remove();
                }
            }
        }
        return list;
    }

}
}

private Map.Entry<ListenerName, SecurityProtocol> getInterBrokerListenerNameAndSecurityProtocol() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interBrokerListenerNameAndSecurityProtocol

}
}

private static SecurityProtocol getSecurityProtocol(String protocolName, String configName) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

securityProtocol

@@ -275,7 +275,7 @@ abstract class QuorumTestHarness extends Logging {
formatter.addDirectory(metadataDir.getAbsolutePath)
formatter.setReleaseVersion(metadataVersion)
formatter.setUnstableFeatureVersionsEnabled(true)
formatter.setControllerListenerName(config.controllerListenerNames.head)
formatter.setControllerListenerName(config.controllerListenerNames.stream.findFirst.orElseThrow)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formatter.setControllerListenerName(config.controllerListenerNames.get(0))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with this approach at first just to keep things consistent—since .head throws a NoSuchElementException when the list is empty, and stream.findFirst().orElseThrow() behaves the same way. get(0) would throw an IndexOutOfBoundsException instead, so it felt a bit different. But perhaps I'm overthinking.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using Objects.requireNonNull?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ummm - you want to have consistent exception - that is not required I think - get(0) is sufficient enough

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment! I've applied the get(0) approach in this PR.

@@ -137,7 +137,9 @@ public ListenerName controllerListenerName() {
.next()
.config()
.controllerListenerNames()
.head()
.stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

            return new ListenerName(
                Objects.requireNonNull(controllers()
                    .values()
                    .iterator()
                    .next()
                    .config()
                    .controllerListenerNames()
                    .get(0))
            );

@@ -216,7 +208,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
def quotaConfig: QuotaConfig = _quotaConfig

/** ********* General Configuration ***********/
var brokerId: Int = getInt(ServerConfigs.BROKER_ID_CONFIG)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was the only remaining usage of these configs AddPartitionsToTxnManager?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, this config is used in quite a few places, e.g. Partition.scala. But yeah, moving it to AbstractKafkaConfig is totally fine since KafkaConfig.scala extends AbstractKafkaConfig.java.

@@ -126,7 +126,7 @@ object StorageTool extends Logging {
setClusterId(namespace.getString("cluster_id")).
setUnstableFeatureVersionsEnabled(config.unstableFeatureVersionsEnabled).
setIgnoreFormatted(namespace.getBoolean("ignore_formatted")).
setControllerListenerName(config.controllerListenerNames.head).
setControllerListenerName(config.controllerListenerNames.stream.findFirst.orElseThrow).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we do get(0) here like some of the other suggestions?

@jolshan
Copy link
Member

jolshan commented Jun 2, 2025

I reviewed the main bulk of the change. It looks pretty good! Just some small nits already mentioned. 👍

@chia7712
Copy link
Member

chia7712 commented Jun 5, 2025

> Task :core:compileTestScala
[Error] /home/chia7712/project/kafka/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:2450:32: type mismatch;
 found   : scala.collection.Seq[org.apache.kafka.common.TopicPartition]
 required: java.util.Collection[org.apache.kafka.common.TopicPartition]
[Error] /home/chia7712/project/kafka/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:2460:15: org.apache.kafka.server.transaction.AddPartitionsToTxnManager.AppendCallback does not take parameters
two errors found

@brandboat please fix the build error

* An interface which handles the Partition Response based on the Request Version and the exact operation.
*/
@FunctionalInterface
public interface TransactionSupportedOperation {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please consider using java enum instead?

    /**
     * handles the Partition Response based on the Request Version and the exact operation.
     */
    enum TransactionSupportedOperation {
        /**
         * This is the default workflow which maps to cases when the Produce Request Version or the
         * Txn_offset_commit request was lower than the first version supporting the new Error Class.
         */
        DEFAULT_ERROR(false),
        /**
         * This maps to the case when the clients are updated to handle the TransactionAbortableException.
         */
        GENERIC_ERROR_SUPPORTED(false),
        /**
         * This allows the partition to be added to the transactions inflight with the Produce and TxnOffsetCommit requests.
         * Plus the behaviors in genericErrorSupported.
         */
        ADD_PARTITION(true);
        private final boolean supportsEpochBump;
        TransactionSupportedOperation(boolean supportsEpochBump) {
            this.supportsEpochBump = supportsEpochBump;
        }
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, good call.

@chia7712 chia7712 merged commit 83fb40d into apache:trunk Jun 7, 2025
21 of 23 checks passed
@brandboat brandboat deleted the KAFKA-14895 branch June 7, 2025 16:54
@brandboat brandboat restored the KAFKA-14895 branch June 7, 2025 16:54
@brandboat brandboat deleted the KAFKA-14895 branch June 7, 2025 16:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Kafka Broker
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants