Skip to content

Commit

Permalink
MINOR: Code cleanup in metadata module (apache#16065)
Browse files Browse the repository at this point in the history
Reviewers: Mickael Maison <mickael.maison@gmail.com>
  • Loading branch information
sjhajharia committed Jun 6, 2024
1 parent ebe1e96 commit 226f3c5
Show file tree
Hide file tree
Showing 43 changed files with 390 additions and 401 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ boolean check() {
*/
private final boolean zkMigrationEnabled;

private BrokerUncleanShutdownHandler brokerUncleanShutdownHandler;
private final BrokerUncleanShutdownHandler brokerUncleanShutdownHandler;

/**
* Maps controller IDs to controller registrations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ public static boolean changeRecordIsNoOp(PartitionChangeRecord record) {
if (record.removingReplicas() != null) return false;
if (record.addingReplicas() != null) return false;
if (record.leaderRecoveryState() != LeaderRecoveryState.NO_CHANGE) return false;
if (record.directories() != null) return false;
return true;
return record.directories() == null;
}

/**
Expand Down Expand Up @@ -515,7 +514,7 @@ private void maybeUpdateLastKnownLeader(PartitionChangeRecord record) {
if (record.isr() != null && record.isr().isEmpty() && (partition.lastKnownElr.length != 1 ||
partition.lastKnownElr[0] != partition.leader)) {
// Only update the last known leader when the first time the partition becomes leaderless.
record.setLastKnownElr(Arrays.asList(partition.leader));
record.setLastKnownElr(Collections.singletonList(partition.leader));
} else if ((record.leader() >= 0 || (partition.leader != NO_LEADER && record.leader() != NO_LEADER))
&& partition.lastKnownElr.length > 0) {
// Clear the LastKnownElr field if the partition will have or continues to have a valid leader.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
Expand Down Expand Up @@ -1405,7 +1404,7 @@ private void maybeScheduleNextWriteNoOpRecord() {
maybeScheduleNextWriteNoOpRecord();

return ControllerResult.of(
Arrays.asList(new ApiMessageAndVersion(new NoOpRecord(), (short) 0)),
Collections.singletonList(new ApiMessageAndVersion(new NoOpRecord(), (short) 0)),
null
);
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1904,7 +1904,7 @@ void generateLeaderAndIsrUpdates(String context,
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
if (brokerWithUncleanShutdown != NO_LEADER) {
builder.setUncleanShutdownReplicas(Arrays.asList(brokerWithUncleanShutdown));
builder.setUncleanShutdownReplicas(Collections.singletonList(brokerWithUncleanShutdown));
}

// Note: if brokerToRemove and brokerWithUncleanShutdown were passed as NO_LEADER, this is a no-op (the new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ public static boolean isTimeoutException(Throwable exception) {
exception = exception.getCause();
if (exception == null) return false;
}
if (!(exception instanceof TimeoutException)) return false;
return true;
return exception instanceof TimeoutException;
}

/**
Expand All @@ -53,8 +52,7 @@ public static boolean isNotControllerException(Throwable exception) {
exception = exception.getCause();
if (exception == null) return false;
}
if (!(exception instanceof NotControllerException)) return false;
return true;
return exception instanceof NotControllerException;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ static boolean exceptionClassesAndMessagesMatch(Throwable a, Throwable b) {
if (a == null) return b == null;
if (b == null) return false;
if (!a.getClass().equals(b.getClass())) return false;
if (!Objects.equals(a.getMessage(), b.getMessage())) return false;
return true;
return Objects.equals(a.getMessage(), b.getMessage());
}

EventHandlerExceptionInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,26 +366,24 @@ public boolean equals(Object o) {

@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("BrokerRegistration(id=").append(id);
bld.append(", epoch=").append(epoch);
bld.append(", incarnationId=").append(incarnationId);
bld.append(", listeners=[").append(
listeners.keySet().stream().sorted().
map(n -> listeners.get(n).toString()).
collect(Collectors.joining(", ")));
bld.append("], supportedFeatures={").append(
supportedFeatures.keySet().stream().sorted().
map(k -> k + ": " + supportedFeatures.get(k)).
collect(Collectors.joining(", ")));
bld.append("}");
bld.append(", rack=").append(rack);
bld.append(", fenced=").append(fenced);
bld.append(", inControlledShutdown=").append(inControlledShutdown);
bld.append(", isMigratingZkBroker=").append(isMigratingZkBroker);
bld.append(", directories=").append(directories);
bld.append(")");
return bld.toString();
return "BrokerRegistration(id=" + id +
", epoch=" + epoch +
", incarnationId=" + incarnationId +
", listeners=[" +
listeners.keySet().stream().sorted().
map(n -> listeners.get(n).toString()).
collect(Collectors.joining(", ")) +
"], supportedFeatures={" +
supportedFeatures.keySet().stream().sorted().
map(k -> k + ": " + supportedFeatures.get(k)).
collect(Collectors.joining(", ")) +
"}" +
", rack=" + rack +
", fenced=" + fenced +
", inControlledShutdown=" + inControlledShutdown +
", isMigratingZkBroker=" + isMigratingZkBroker +
", directories=" + directories +
")";
}

public BrokerRegistration cloneWith(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,20 +214,18 @@ public boolean equals(Object o) {

@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("ControllerRegistration(id=").append(id);
bld.append(", incarnationId=").append(incarnationId);
bld.append(", zkMigrationReady=").append(zkMigrationReady);
bld.append(", listeners=[").append(
listeners.keySet().stream().sorted().
map(n -> listeners.get(n).toString()).
collect(Collectors.joining(", ")));
bld.append("], supportedFeatures={").append(
supportedFeatures.keySet().stream().sorted().
map(k -> k + ": " + supportedFeatures.get(k)).
collect(Collectors.joining(", ")));
bld.append("}");
bld.append(")");
return bld.toString();
return "ControllerRegistration(id=" + id +
", incarnationId=" + incarnationId +
", zkMigrationReady=" + zkMigrationReady +
", listeners=[" +
listeners.keySet().stream().sorted().
map(n -> listeners.get(n).toString()).
collect(Collectors.joining(", ")) +
"], supportedFeatures={" +
supportedFeatures.keySet().stream().sorted().
map(k -> k + ": " + supportedFeatures.get(k)).
collect(Collectors.joining(", ")) +
"}" +
")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,9 @@ public boolean equals(Object o) {

@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("{");
bld.append("featureMap=").append(featureMap.toString());
bld.append(", epoch=").append(epoch);
bld.append("}");
return bld.toString();
return "FinalizedControllerFeatures(" +
"featureMap=" + featureMap.toString() +
", epoch=" + epoch +
")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -448,20 +448,18 @@ public boolean equals(Object o) {

@Override
public String toString() {
StringBuilder builder = new StringBuilder("PartitionRegistration(");
builder.append("replicas=").append(Arrays.toString(replicas));
builder.append(", directories=").append(Arrays.toString(directories));
builder.append(", isr=").append(Arrays.toString(isr));
builder.append(", removingReplicas=").append(Arrays.toString(removingReplicas));
builder.append(", addingReplicas=").append(Arrays.toString(addingReplicas));
builder.append(", elr=").append(Arrays.toString(elr));
builder.append(", lastKnownElr=").append(Arrays.toString(lastKnownElr));
builder.append(", leader=").append(leader);
builder.append(", leaderRecoveryState=").append(leaderRecoveryState);
builder.append(", leaderEpoch=").append(leaderEpoch);
builder.append(", partitionEpoch=").append(partitionEpoch);
builder.append(")");
return builder.toString();
return "PartitionRegistration(" + "replicas=" + Arrays.toString(replicas) +
", directories=" + Arrays.toString(directories) +
", isr=" + Arrays.toString(isr) +
", removingReplicas=" + Arrays.toString(removingReplicas) +
", addingReplicas=" + Arrays.toString(addingReplicas) +
", elr=" + Arrays.toString(elr) +
", lastKnownElr=" + Arrays.toString(lastKnownElr) +
", leader=" + leader +
", leaderRecoveryState=" + leaderRecoveryState +
", leaderEpoch=" + leaderEpoch +
", partitionEpoch=" + partitionEpoch +
")";
}

public boolean hasSameAssignment(PartitionRegistration registration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,16 +318,16 @@ public void testDeleteDedupe() {
AclBinding aclBinding = new AclBinding(new ResourcePattern(TOPIC, "topic-1", LITERAL),
new AccessControlEntry("User:user", "10.0.0.1", AclOperation.ALL, ALLOW));

ControllerResult<List<AclCreateResult>> createResult = manager.createAcls(Arrays.asList(aclBinding));
ControllerResult<List<AclCreateResult>> createResult = manager.createAcls(Collections.singletonList(aclBinding));
Uuid id = ((AccessControlEntryRecord) createResult.records().get(0).message()).id();
assertEquals(1, createResult.records().size());

ControllerResult<List<AclDeleteResult>> deleteAclResultsAnyFilter = manager.deleteAcls(Arrays.asList(AclBindingFilter.ANY));
ControllerResult<List<AclDeleteResult>> deleteAclResultsAnyFilter = manager.deleteAcls(Collections.singletonList(AclBindingFilter.ANY));
assertEquals(1, deleteAclResultsAnyFilter.records().size());
assertEquals(id, ((RemoveAccessControlEntryRecord) deleteAclResultsAnyFilter.records().get(0).message()).id());
assertEquals(1, deleteAclResultsAnyFilter.response().size());

ControllerResult<List<AclDeleteResult>> deleteAclResultsSpecificFilter = manager.deleteAcls(Arrays.asList(aclBinding.toFilter()));
ControllerResult<List<AclDeleteResult>> deleteAclResultsSpecificFilter = manager.deleteAcls(Collections.singletonList(aclBinding.toFilter()));
assertEquals(1, deleteAclResultsSpecificFilter.records().size());
assertEquals(id, ((RemoveAccessControlEntryRecord) deleteAclResultsSpecificFilter.records().get(0).message()).id());
assertEquals(1, deleteAclResultsSpecificFilter.response().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,19 +228,19 @@ public void testEntityTypes() throws Exception {
new EntityData().setEntityType("user").setEntityName("user-3"),
new EntityData().setEntityType("client-id").setEntityName(null))).
setKey("request_percentage").setValue(55.55).setRemove(false), (short) 0),
new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Collections.singletonList(
new EntityData().setEntityType("user").setEntityName("user-1"))).
setKey("request_percentage").setValue(56.56).setRemove(false), (short) 0),
new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Collections.singletonList(
new EntityData().setEntityType("user").setEntityName("user-2"))).
setKey("request_percentage").setValue(57.57).setRemove(false), (short) 0),
new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Collections.singletonList(
new EntityData().setEntityType("user").setEntityName("user-3"))).
setKey("request_percentage").setValue(58.58).setRemove(false), (short) 0),
new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Collections.singletonList(
new EntityData().setEntityType("user").setEntityName(null))).
setKey("request_percentage").setValue(59.59).setRemove(false), (short) 0),
new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Collections.singletonList(
new EntityData().setEntityType("client-id").setEntityName("client-id-2"))).
setKey("request_percentage").setValue(60.60).setRemove(false), (short) 0));
records = new ArrayList<>(records);
Expand Down Expand Up @@ -323,7 +323,7 @@ public void testIsValidIpEntityWithLocalhost() {

@Test
public void testConfigKeysForEntityTypeWithUser() {
testConfigKeysForEntityType(Arrays.asList(ClientQuotaEntity.USER),
testConfigKeysForEntityType(Collections.singletonList(ClientQuotaEntity.USER),
Arrays.asList(
"producer_byte_rate",
"consumer_byte_rate",
Expand All @@ -334,7 +334,7 @@ public void testConfigKeysForEntityTypeWithUser() {

@Test
public void testConfigKeysForEntityTypeWithClientId() {
testConfigKeysForEntityType(Arrays.asList(ClientQuotaEntity.CLIENT_ID),
testConfigKeysForEntityType(Collections.singletonList(ClientQuotaEntity.CLIENT_ID),
Arrays.asList(
"producer_byte_rate",
"consumer_byte_rate",
Expand All @@ -356,8 +356,8 @@ public void testConfigKeysForEntityTypeWithUserAndClientId() {

@Test
public void testConfigKeysForEntityTypeWithIp() {
testConfigKeysForEntityType(Arrays.asList(ClientQuotaEntity.IP),
Arrays.asList(
testConfigKeysForEntityType(Collections.singletonList(ClientQuotaEntity.IP),
Collections.singletonList(
"connection_creation_rate"
));
}
Expand Down Expand Up @@ -386,7 +386,7 @@ private static void testConfigKeysForEntityType(

@Test
public void testConfigKeysForEmptyEntity() {
testConfigKeysError(Arrays.asList(),
testConfigKeysError(Collections.emptyList(),
new ApiError(Errors.INVALID_REQUEST, "Invalid empty client quota entity"));
}

Expand Down Expand Up @@ -427,7 +427,7 @@ private static void testConfigKeysError(
static {
VALID_CLIENT_ID_QUOTA_KEYS = new HashMap<>();
assertEquals(ApiError.NONE, ClientQuotaControlManager.configKeysForEntityType(
keysToEntity(Arrays.asList(ClientQuotaEntity.CLIENT_ID)), VALID_CLIENT_ID_QUOTA_KEYS));
keysToEntity(Collections.singletonList(ClientQuotaEntity.CLIENT_ID)), VALID_CLIENT_ID_QUOTA_KEYS));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,14 +327,14 @@ public void testRegisterBrokerRecordVersion(MetadataVersion metadataVersion) {
short expectedVersion = metadataVersion.registerBrokerRecordVersion();

assertEquals(
asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
Collections.singletonList(new ApiMessageAndVersion(new RegisterBrokerRecord().
setBrokerEpoch(123L).
setBrokerId(0).
setRack(null).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")).
setFenced(true).
setLogDirs(logDirs).
setFeatures(new RegisterBrokerRecord.BrokerFeatureCollection(asList(
setFeatures(new RegisterBrokerRecord.BrokerFeatureCollection(Collections.singletonList(
new RegisterBrokerRecord.BrokerFeature().
setName(MetadataVersion.FEATURE_NAME).
setMinSupportedVersion((short) 1).
Expand Down Expand Up @@ -673,7 +673,7 @@ public void testDefaultDir() {
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().setBrokerEpoch(100).setBrokerId(1).setLogDirs(Collections.emptyList());
brokerRecord.endPoints().add(new BrokerEndpoint().setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).setPort((short) 9092).setName("PLAINTEXT").setHost("127.0.0.1"));
clusterControl.replay(brokerRecord, 100L);
registerNewBrokerWithDirs(clusterControl, 2, asList(Uuid.fromString("singleOnlineDirectoryA")));
registerNewBrokerWithDirs(clusterControl, 2, Collections.singletonList(Uuid.fromString("singleOnlineDirectoryA")));
registerNewBrokerWithDirs(clusterControl, 3, asList(Uuid.fromString("s4fRmyNFSH6J0vI8AVA5ew"), Uuid.fromString("UbtxBcqYSnKUEMcnTyZFWw")));
assertEquals(DirectoryId.MIGRATING, clusterControl.defaultDir(1));
assertEquals(Uuid.fromString("singleOnlineDirectoryA"), clusterControl.defaultDir(2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata;

import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
Expand All @@ -57,7 +56,7 @@
import static org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
import static org.apache.kafka.server.config.ConfigSynonym.HOURS_TO_MILLISECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertNull;


@Timeout(value = 40)
Expand All @@ -80,9 +79,9 @@ public class ConfigurationControlManagerTest {
public static final Map<String, List<ConfigSynonym>> SYNONYMS = new HashMap<>();

static {
SYNONYMS.put("abc", Arrays.asList(new ConfigSynonym("foo.bar")));
SYNONYMS.put("def", Arrays.asList(new ConfigSynonym("baz")));
SYNONYMS.put("quuux", Arrays.asList(new ConfigSynonym("quux", HOURS_TO_MILLISECONDS)));
SYNONYMS.put("abc", Collections.singletonList(new ConfigSynonym("foo.bar")));
SYNONYMS.put("def", Collections.singletonList(new ConfigSynonym("baz")));
SYNONYMS.put("quuux", Collections.singletonList(new ConfigSynonym("quux", HOURS_TO_MILLISECONDS)));
}

static final KafkaConfigSchema SCHEMA = new KafkaConfigSchema(CONFIGS, SYNONYMS);
Expand Down Expand Up @@ -138,7 +137,7 @@ public void testReplay() throws Exception {
assertEquals(toMap(entry("abc", "x,y,z"), entry("def", "blah")),
manager.getConfigs(MYTOPIC));
assertEquals("x,y,z", manager.getTopicConfig(MYTOPIC.name(), "abc"));
assertTrue(manager.getTopicConfig(MYTOPIC.name(), "none-exists") == null);
assertNull(manager.getTopicConfig(MYTOPIC.name(), "none-exists"));
}

@Test
Expand Down Expand Up @@ -382,7 +381,7 @@ expectedRecords1, toMap(entry(MYTOPIC, ApiError.NONE))),
for (ApiMessageAndVersion message : expectedRecords1) {
manager.replay((ConfigRecord) message.message());
}
assertEquals(ControllerResult.atomicOf(asList(
assertEquals(ControllerResult.atomicOf(Collections.singletonList(
new ApiMessageAndVersion(
new ConfigRecord()
.setResourceType(TOPIC.id())
Expand Down
Loading

0 comments on commit 226f3c5

Please sign in to comment.