Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@
files="(ReplicationControlManager|ReplicationControlManagerTest).java"/>
<suppress checks="ClassFanOutComplexity"
files="(QuorumController|ReplicationControlManager).java"/>
<suppress checks="ParameterNumber"
files="(QuorumController).java"/>
<suppress checks="CyclomaticComplexity"
files="(ReplicationControlManager).java"/>
<suppress checks="NPathComplexity"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public long backoff(long attempts) {
}
double exp = Math.min(attempts, this.expMax);
double term = initialInterval * Math.pow(multiplier, exp);
double randomFactor = ThreadLocalRandom.current().nextDouble(1 - jitter, 1 + jitter);
double randomFactor = jitter < Double.MIN_NORMAL ? 1.0 :
ThreadLocalRandom.current().nextDouble(1 - jitter, 1 + jitter);
return (long) (randomFactor * term);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,13 @@ public void testExponentialBackoff() {
}
}
}

@Test
public void testExponentialBackoffWithoutJitter() {
ExponentialBackoff exponentialBackoff = new ExponentialBackoff(100, 2, 400, 0.0);
assertEquals(100, exponentialBackoff.backoff(0));
assertEquals(200, exponentialBackoff.backoff(1));
assertEquals(400, exponentialBackoff.backoff(2));
assertEquals(400, exponentialBackoff.backoff(3));
}
}
5 changes: 5 additions & 0 deletions core/src/test/java/kafka/test/MockController.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ public CompletableFuture<Void> waitForReadyBrokers(int minBrokers) {
throw new UnsupportedOperationException();
}

@Override
public CompletableFuture<Long> beginWritingSnapshot() {
throw new UnsupportedOperationException();
}

@Override
public void beginShutdown() {
this.active = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.config.internals.QuotaConfigs;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.metadata.QuotaRecord;
import org.apache.kafka.common.metadata.QuotaRecord.EntityData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
Expand All @@ -35,18 +36,20 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.stream.Collectors;


public class ClientQuotaControlManager {

private final SnapshotRegistry snapshotRegistry;

final TimelineHashMap<ClientQuotaEntity, Map<String, Double>> clientQuotaData;
final TimelineHashMap<ClientQuotaEntity, TimelineHashMap<String, Double>> clientQuotaData;

ClientQuotaControlManager(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
Expand Down Expand Up @@ -98,7 +101,7 @@ public void replay(QuotaRecord record) {
Map<String, String> entityMap = new HashMap<>(2);
record.entity().forEach(entityData -> entityMap.put(entityData.entityType(), entityData.entityName()));
ClientQuotaEntity entity = new ClientQuotaEntity(entityMap);
Map<String, Double> quotas = clientQuotaData.get(entity);
TimelineHashMap<String, Double> quotas = clientQuotaData.get(entity);
if (quotas == null) {
quotas = new TimelineHashMap<>(snapshotRegistry, 0);
clientQuotaData.put(entity, quotas);
Expand Down Expand Up @@ -136,14 +139,15 @@ private void alterClientQuotaEntity(
}

// Don't share objects between different records
Supplier<List<QuotaRecord.EntityData>> recordEntitySupplier = () ->
validatedEntityMap.entrySet().stream().map(mapEntry -> new QuotaRecord.EntityData()
Supplier<List<EntityData>> recordEntitySupplier = () ->
validatedEntityMap.entrySet().stream().map(mapEntry -> new EntityData()
.setEntityType(mapEntry.getKey())
.setEntityName(mapEntry.getValue()))
.collect(Collectors.toList());

List<ApiMessageAndVersion> newRecords = new ArrayList<>(newQuotaConfigs.size());
Map<String, Double> currentQuotas = clientQuotaData.getOrDefault(entity, Collections.emptyMap());
Map<String, Double> currentQuotas = clientQuotaData.containsKey(entity) ?
clientQuotaData.get(entity) : Collections.emptyMap();
newQuotaConfigs.forEach((key, newValue) -> {
if (newValue == null) {
if (currentQuotas.containsKey(key)) {
Expand Down Expand Up @@ -249,7 +253,7 @@ private ApiError validateEntity(ClientQuotaEntity entity, Map<String, String> va
return new ApiError(Errors.INVALID_REQUEST, "Invalid empty client quota entity");
}

for (Map.Entry<String, String> entityEntry : entity.entries().entrySet()) {
for (Entry<String, String> entityEntry : entity.entries().entrySet()) {
String entityType = entityEntry.getKey();
String entityName = entityEntry.getValue();
if (validatedEntityMap.containsKey(entityType)) {
Expand All @@ -272,4 +276,44 @@ private ApiError validateEntity(ClientQuotaEntity entity, Map<String, String> va

return ApiError.NONE;
}

class ClientQuotaControlIterator implements Iterator<List<ApiMessageAndVersion>> {
private final long epoch;
private final Iterator<Entry<ClientQuotaEntity, TimelineHashMap<String, Double>>> iterator;

ClientQuotaControlIterator(long epoch) {
this.epoch = epoch;
this.iterator = clientQuotaData.entrySet(epoch).iterator();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're getting the entry set of a particular epoch, does this mean we'll be getting consistent snapshots?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, snapshots are consistent.

}

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public List<ApiMessageAndVersion> next() {
if (!hasNext()) throw new NoSuchElementException();
Entry<ClientQuotaEntity, TimelineHashMap<String, Double>> entry = iterator.next();
ClientQuotaEntity entity = entry.getKey();
List<ApiMessageAndVersion> records = new ArrayList<>();
for (Entry<String, Double> quotaEntry : entry.getValue().entrySet(epoch)) {
QuotaRecord record = new QuotaRecord();
for (Entry<String, String> entityEntry : entity.entries().entrySet()) {
record.entity().add(new EntityData().
setEntityType(entityEntry.getKey()).
setEntityName(entityEntry.getValue()));
}
record.setKey(quotaEntry.getKey());
record.setValue(quotaEntry.getValue());
record.setRemove(false);
records.add(new ApiMessageAndVersion(record, (short) 0));
}
return records;
}
}

ClientQuotaControlIterator iterator(long epoch) {
return new ClientQuotaControlIterator(epoch);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerFeature;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerFeatureCollection;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.security.auth.SecurityProtocol;
Expand All @@ -40,8 +44,11 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -184,7 +191,7 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(
setBrokerEpoch(brokerEpoch).
setRack(request.rack());
for (BrokerRegistrationRequestData.Listener listener : request.listeners()) {
record.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint().
record.endPoints().add(new BrokerEndpoint().
setHost(listener.host()).
setName(listener.name()).
setPort(listener.port()).
Expand All @@ -199,7 +206,7 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(
"the broker has an unsupported version of " + feature.name());
}
}
record.features().add(new RegisterBrokerRecord.BrokerFeature().
record.features().add(new BrokerFeature().
setName(feature.name()).
setMinSupportedVersion(feature.minSupportedVersion()).
setMaxSupportedVersion(feature.maxSupportedVersion()));
Expand All @@ -219,13 +226,13 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(
public void replay(RegisterBrokerRecord record) {
int brokerId = record.brokerId();
List<Endpoint> listeners = new ArrayList<>();
for (RegisterBrokerRecord.BrokerEndpoint endpoint : record.endPoints()) {
for (BrokerEndpoint endpoint : record.endPoints()) {
listeners.add(new Endpoint(endpoint.name(),
SecurityProtocol.forId(endpoint.securityProtocol()),
endpoint.host(), endpoint.port()));
}
Map<String, VersionRange> features = new HashMap<>();
for (RegisterBrokerRecord.BrokerFeature feature : record.features()) {
for (BrokerFeature feature : record.features()) {
features.put(feature.name(), new VersionRange(
feature.minSupportedVersion(), feature.maxSupportedVersion()));
}
Expand Down Expand Up @@ -343,4 +350,56 @@ public void addReadyBrokersFuture(CompletableFuture<Void> future, int minBrokers
readyBrokersFuture = Optional.empty();
}
}

class ClusterControlIterator implements Iterator<List<ApiMessageAndVersion>> {
private final Iterator<Entry<Integer, BrokerRegistration>> iterator;

ClusterControlIterator(long epoch) {
this.iterator = brokerRegistrations.entrySet(epoch).iterator();
}

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public List<ApiMessageAndVersion> next() {
if (!hasNext()) throw new NoSuchElementException();
Entry<Integer, BrokerRegistration> entry = iterator.next();
int brokerId = entry.getKey();
BrokerRegistration registration = entry.getValue();
BrokerEndpointCollection endpoints = new BrokerEndpointCollection();
for (Entry<String, Endpoint> endpointEntry : registration.listeners().entrySet()) {
endpoints.add(new BrokerEndpoint().setName(endpointEntry.getKey()).
setHost(endpointEntry.getValue().host()).
setPort(endpointEntry.getValue().port()).
setSecurityProtocol(endpointEntry.getValue().securityProtocol().id));
}
BrokerFeatureCollection features = new BrokerFeatureCollection();
for (Entry<String, VersionRange> featureEntry : registration.supportedFeatures().entrySet()) {
features.add(new BrokerFeature().setName(featureEntry.getKey()).
setMaxSupportedVersion(featureEntry.getValue().max()).
setMinSupportedVersion(featureEntry.getValue().min()));
}
List<ApiMessageAndVersion> batch = new ArrayList<>();
batch.add(new ApiMessageAndVersion(new RegisterBrokerRecord().
setBrokerId(brokerId).
setIncarnationId(registration.incarnationId()).
setBrokerEpoch(registration.epoch()).
setEndPoints(endpoints).
setFeatures(features).
setRack(registration.rack().orElse(null)), (short) 0));
if (!registration.fenced()) {
batch.add(new ApiMessageAndVersion(new UnfenceBrokerRecord().
setId(brokerId).
setEpoch(registration.epoch()), (short) 0));
}
return batch;
}
}

ClusterControlIterator iterator(long epoch) {
return new ClusterControlIterator(epoch);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Objects;

import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;


public class ConfigurationControlManager {
private final Logger log;
private final SnapshotRegistry snapshotRegistry;
Expand Down Expand Up @@ -316,6 +318,9 @@ public void replay(ConfigRecord record) {
} else {
configs.put(record.name(), record.value());
}
if (configs.isEmpty()) {
configData.remove(configResource);
}
log.info("{}: set configuration {} to {}", configResource, record.name(), record.value());
}

Expand Down Expand Up @@ -368,4 +373,39 @@ public Map<ConfigResource, ResultOrError<Map<String, String>>> describeConfigs(
void deleteTopicConfigs(String name) {
configData.remove(new ConfigResource(Type.TOPIC, name));
}

class ConfigurationControlIterator implements Iterator<List<ApiMessageAndVersion>> {
private final long epoch;
private final Iterator<Entry<ConfigResource, TimelineHashMap<String, String>>> iterator;

ConfigurationControlIterator(long epoch) {
this.epoch = epoch;
this.iterator = configData.entrySet(epoch).iterator();
}

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public List<ApiMessageAndVersion> next() {
if (!hasNext()) throw new NoSuchElementException();
List<ApiMessageAndVersion> records = new ArrayList<>();
Entry<ConfigResource, TimelineHashMap<String, String>> entry = iterator.next();
ConfigResource resource = entry.getKey();
for (Entry<String, String> configEntry : entry.getValue().entrySet(epoch)) {
records.add(new ApiMessageAndVersion(new ConfigRecord().
setResourceName(resource.name()).
setResourceType(resource.type().id()).
setName(configEntry.getKey()).
setValue(configEntry.getValue()), (short) 0));
}
return records;
}
}

ConfigurationControlIterator iterator(long epoch) {
return new ConfigurationControlIterator(epoch);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,14 @@ CompletableFuture<Map<ClientQuotaEntity, ApiError>> alterClientQuotas(
Collection<ClientQuotaAlteration> quotaAlterations, boolean validateOnly
);

/**
* Begin writing a controller snapshot. If there was already an ongoing snapshot, it
* simply returns information about that snapshot rather than starting a new one.
*
* @return A future yielding the epoch of the snapshot.
*/
CompletableFuture<Long> beginWritingSnapshot();

/**
* Begin shutting down, but don't block. You must still call close to clean up all
* resources.
Expand Down
Loading