Skip to content

Commit

Permalink
KAFKA-14701; Move PartitionAssignor to new group-coordinator-api
Browse files Browse the repository at this point in the history
…module (apache#16198)

This patch moves the `PartitionAssignor` interface and all the related classes to a newly created `group-coordinator/api` module, following the pattern used by the storage and tools modules.

Reviewers: Ritika Reddy <rreddy@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
dajac committed Jun 6, 2024
1 parent 79ea7d6 commit 7d832cf
Show file tree
Hide file tree
Showing 43 changed files with 561 additions and 405 deletions.
63 changes: 63 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,7 @@ project(':core') {
api libs.scalaLibrary

implementation project(':server-common')
implementation project(':group-coordinator:group-coordinator-api')
implementation project(':group-coordinator')
implementation project(':transaction-coordinator')
implementation project(':metadata')
Expand Down Expand Up @@ -1400,6 +1401,66 @@ project(':metadata') {
}
}

project(':group-coordinator:group-coordinator-api') {
base {
archivesName = "kafka-group-coordinator-api"
}

dependencies {
implementation project(':clients')
}

task createVersionFile() {
def receiptFile = file("$buildDir/kafka/$buildVersionFileName")
inputs.property "commitId", commitId
inputs.property "version", version
outputs.file receiptFile

doLast {
def data = [
commitId: commitId,
version: version,
]

receiptFile.parentFile.mkdirs()
def content = data.entrySet().collect { "$it.key=$it.value" }.sort().join("\n")
receiptFile.setText(content, "ISO-8859-1")
}
}

sourceSets {
main {
java {
srcDirs = ["src/main/java"]
}
}
test {
java {
srcDirs = ["src/test/java"]
}
}
}

jar {
dependsOn createVersionFile
from("$buildDir") {
include "kafka/$buildVersionFileName"
}
}

clean.doFirst {
delete "$buildDir/kafka/"
}

javadoc {
include "**/org/apache/kafka/coordinator/group/api/**"
}

checkstyle {
configProperties = checkstyleConfigProperties("import-control-group-coordinator.xml")
}
}

project(':group-coordinator') {
base {
archivesName = "kafka-group-coordinator"
Expand All @@ -1413,6 +1474,7 @@ project(':group-coordinator') {
implementation project(':server-common')
implementation project(':clients')
implementation project(':metadata')
implementation project(':group-coordinator:group-coordinator-api')
implementation project(':storage')
implementation libs.jacksonDatabind
implementation libs.jacksonJDK8Datatypes
Expand Down Expand Up @@ -2957,6 +3019,7 @@ project(':jmh-benchmarks') {
implementation project(':raft')
implementation project(':clients')
implementation project(':group-coordinator')
implementation project(':group-coordinator:group-coordinator-api')
implementation project(':metadata')
implementation project(':storage')
implementation project(':streams')
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.api.assignor;

import org.apache.kafka.common.annotation.InterfaceStability;

/**
* Server-side partition assignor for consumer groups used by the GroupCoordinator.
*
* The interface is kept in an internal module until KIP-848 is fully
* implemented and ready to be released.
* The new consumer group protocol is in preview so this interface is considered
* unstable until Apache Kafka 4.0.
*/
@InterfaceStability.Unstable
public interface ConsumerGroupPartitionAssignor extends PartitionAssignor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.api.assignor;

import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Map;
import java.util.Objects;

/**
* The partition assignment for a consumer group.
*/
@InterfaceStability.Unstable
public class GroupAssignment {
/**
* The member assignments keyed by member id.
Expand All @@ -31,8 +34,7 @@ public class GroupAssignment {
public GroupAssignment(
Map<String, MemberAssignment> members
) {
Objects.requireNonNull(members);
this.members = members;
this.members = Objects.requireNonNull(members);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.api.assignor;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Collection;
import java.util.Map;
import java.util.Set;

/**
* The group metadata specifications required to compute the target assignment.
*/
@InterfaceStability.Unstable
public interface GroupSpec {
/**
* @return All the member Ids of the consumer group.
Expand All @@ -45,18 +45,18 @@ public interface GroupSpec {
/**
* Gets the member subscription specification for a member.
*
* @param memberId The member Id.
* @param memberId The member Id.
* @return The member's subscription metadata.
* @throws IllegalArgumentException If the member Id isn't found.
*/
MemberSubscriptionSpec memberSubscription(String memberId);
MemberSubscription memberSubscription(String memberId);

/**
* Gets the current assignment of the member.
*
* @param memberId The member Id.
* @return A map of topic Ids to sets of partition numbers.
* An empty map is returned if the member Id isn't found.
* @param memberId The member Id.
* @return The member's assignment or an empty assignment if the
* member does not have one.
*/
Map<Uuid, Set<Integer>> memberAssignment(String memberId);
MemberAssignment memberAssignment(String memberId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.api.assignor;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Map;
import java.util.Set;

/**
* The partition assignment for a consumer group member.
*/
@InterfaceStability.Unstable
public interface MemberAssignment {
/**
* @return The assigned partitions keyed by topic Ids.
*/
Map<Uuid, Set<Integer>> partitions();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.api.assignor;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Optional;
import java.util.Set;

/**
* Interface representing the subscription metadata for a group member.
*/
public interface MemberSubscriptionSpec {
@InterfaceStability.Unstable
public interface MemberSubscription {
/**
* Gets the rack Id if present.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.api.assignor;

import org.apache.kafka.common.annotation.InterfaceStability;

/**
* Server-side partition assignor used by the GroupCoordinator.
*
* The interface is kept in an internal module until KIP-848 is fully
* implemented and ready to be released.
* The new consumer group protocol is in preview so this interface is considered
* unstable until Apache Kafka 4.0.
*/
@InterfaceStability.Unstable
public interface PartitionAssignor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.api.assignor;

import org.apache.kafka.common.errors.ApiException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.api.assignor;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.annotation.InterfaceStability;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.api.assignor;

import org.apache.kafka.common.annotation.InterfaceStability;

/**
* The subscription type followed by a consumer group.
*/
@InterfaceStability.Unstable
public enum SubscriptionType {
/**
* A homogeneous subscription type means that all the members
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.kafka.coordinator.group;

import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.apache.kafka.coordinator.group.assignor.UniformAssignor;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.consumer.Assignment;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
Expand Down Expand Up @@ -1926,7 +1926,7 @@ private Assignment updateTargetAssignment(

MemberAssignment newMemberAssignment = assignmentResult.targetAssignment().get(updatedMember.memberId());
if (newMemberAssignment != null) {
return new Assignment(newMemberAssignment.targetPartitions());
return new Assignment(newMemberAssignment.partitions());
} else {
return Assignment.EMPTY;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package org.apache.kafka.coordinator.group.assignor;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.server.common.TopicIdPartition;

import java.util.Collection;
Expand All @@ -42,7 +45,7 @@ protected static void addPartitionToAssignment(
int partition
) {
memberAssignments.get(memberId)
.targetPartitions()
.partitions()
.computeIfAbsent(topicId, __ -> new HashSet<>())
.add(partition);
}
Expand Down
Loading

0 comments on commit 7d832cf

Please sign in to comment.