Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-8988. Replace CreatePartitions Request/Response with automated protocol #7493

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -71,6 +71,10 @@
import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsAssignment;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicConfigs;
Expand Down Expand Up @@ -127,7 +131,6 @@
import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
import org.apache.kafka.common.requests.CreatePartitionsRequest;
import org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails;
import org.apache.kafka.common.requests.CreatePartitionsResponse;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.CreateTopicsResponse;
Expand Down Expand Up @@ -2313,35 +2316,55 @@ public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPar
for (String topic : newPartitions.keySet()) {
futures.put(topic, new KafkaFutureImpl<>());
}
final Map<String, PartitionDetails> requestMap = newPartitions.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> partitionDetails(e.getValue())));

final List<CreatePartitionsTopic> topics = newPartitions.entrySet().stream()
.map(partitionsEntry -> {
NewPartitions newPartition = partitionsEntry.getValue();
List<List<Integer>> newAssignments = newPartition.assignments();
List<CreatePartitionsAssignment> assignments = newAssignments == null ? null :
newAssignments.stream()
.map(brokerIds -> new CreatePartitionsAssignment().setBrokerIds(brokerIds))
.collect(Collectors.toList());
return new CreatePartitionsTopic()
.setName(partitionsEntry.getKey())
.setCount(newPartition.totalCount())
.setAssignments(assignments);
})
.collect(Collectors.toList());

final long now = time.milliseconds();
runnable.call(new Call("createPartitions", calcDeadlineMs(now, options.timeoutMs()),
new ControllerNodeProvider()) {

@Override
public CreatePartitionsRequest.Builder createRequest(int timeoutMs) {
return new CreatePartitionsRequest.Builder(requestMap, timeoutMs, options.validateOnly());
CreatePartitionsRequestData requestData = new CreatePartitionsRequestData()
.setTopics(topics)
.setValidateOnly(options.validateOnly())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving this here for lack of an alternative. I was looking at the defaults used in the schema definitions. We do not override them in any cases, but I wonder if it would make sense in the case of validateOnly. If we do not override, the default would be false, but maybe true is a safer value?

Another thing I was looking at is how we handle the ErrorMessage field in the response. The previous serialization logic used ApiError which attempts to avoid serialization when the error message matches the standard value. I suspect that we might have broken this optimization in other cases as well, though I'm not sure how much it matters. Also, the old code used null as the default instead of the empty string. Perhaps we should do the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that its safer to have validateOnly default to true, but I think the common case is to have it false. If I am user of this api, I will assume the api call to create partitions by default. And if there is a need for validation only, only then I will look deeper into api to find a flag for doing that. So in that sense it matches user expectation.

Interesting comment about the ErrorMessage. But when creating CreatePartitionsResponse in CreatePartitionsRequest::getErrorResponse, we use apiError.message to populate the error message, and the method returns null if the message is default. It seems if we use apiError.messageWithFallback then we will get into situation that you are describing. I may be missing something here.

Updated json to set null for ErrorMessage field. Thanks for pointing that out.

.setTimeoutMs(timeoutMs);

return new CreatePartitionsRequest.Builder(requestData);
}

@Override
public void handleResponse(AbstractResponse abstractResponse) {
CreatePartitionsResponse response = (CreatePartitionsResponse) abstractResponse;
// Check for controller change
for (ApiError error : response.errors().values()) {
if (error.error() == Errors.NOT_CONTROLLER) {
for (CreatePartitionsTopicResult topicResult: response.data().results()) {
Errors error = Errors.forCode(topicResult.errorCode());
if (error == Errors.NOT_CONTROLLER) {
metadataManager.clearController();
metadataManager.requestUpdate();
throw error.exception();
}
}
for (Map.Entry<String, ApiError> result : response.errors().entrySet()) {
KafkaFutureImpl<Void> future = futures.get(result.getKey());
if (result.getValue().isSuccess()) {
for (CreatePartitionsTopicResult topicResult: response.data().results()) {
Errors error = Errors.forCode(topicResult.errorCode());
KafkaFutureImpl<Void> future = futures.get(topicResult.name());
if (error == Errors.NONE) {
future.complete(null);
} else {
future.completeExceptionally(result.getValue().exception());
future.completeExceptionally(error.exception(topicResult.errorMessage()));
}
}
}
Expand Down Expand Up @@ -2851,10 +2874,6 @@ private boolean handleGroupRequestError(Errors error, KafkaFutureImpl<?> future)
return false;
}

private PartitionDetails partitionDetails(NewPartitions newPartitions) {
return new PartitionDetails(newPartitions.totalCount(), newPartitions.assignments());
}

private final static class ListConsumerGroupsResults {
private final List<Throwable> errors;
private final HashMap<String, ConsumerGroupListing> listings;
Expand Down
Expand Up @@ -23,6 +23,8 @@
import org.apache.kafka.common.message.ControlledShutdownResponseData;
import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreatePartitionsResponseData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.DeleteGroupsRequestData;
Expand Down Expand Up @@ -94,8 +96,6 @@
import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
import org.apache.kafka.common.requests.CreateAclsRequest;
import org.apache.kafka.common.requests.CreateAclsResponse;
import org.apache.kafka.common.requests.CreatePartitionsRequest;
import org.apache.kafka.common.requests.CreatePartitionsResponse;
import org.apache.kafka.common.requests.DeleteAclsRequest;
import org.apache.kafka.common.requests.DeleteAclsResponse;
import org.apache.kafka.common.requests.DeleteRecordsRequest;
Expand Down Expand Up @@ -191,8 +191,8 @@ public Struct parseResponse(short version, ByteBuffer buffer) {
DescribeLogDirsResponse.schemaVersions()),
SASL_AUTHENTICATE(36, "SaslAuthenticate", SaslAuthenticateRequestData.SCHEMAS,
SaslAuthenticateResponseData.SCHEMAS),
CREATE_PARTITIONS(37, "CreatePartitions", CreatePartitionsRequest.schemaVersions(),
CreatePartitionsResponse.schemaVersions()),
CREATE_PARTITIONS(37, "CreatePartitions", CreatePartitionsRequestData.SCHEMAS,
CreatePartitionsResponseData.SCHEMAS),
CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken", CreateDelegationTokenRequestData.SCHEMAS, CreateDelegationTokenResponseData.SCHEMAS),
RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", RenewDelegationTokenRequestData.SCHEMAS, RenewDelegationTokenResponseData.SCHEMAS),
EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequestData.SCHEMAS, ExpireDelegationTokenResponseData.SCHEMAS),
Expand Down
Expand Up @@ -155,7 +155,7 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, Struct struct, shor
case SASL_AUTHENTICATE:
return new SaslAuthenticateResponse(struct, version);
case CREATE_PARTITIONS:
return new CreatePartitionsResponse(struct);
return new CreatePartitionsResponse(struct, version);
case CREATE_DELEGATION_TOKEN:
return new CreateDelegationTokenResponse(struct, version);
case RENEW_DELEGATION_TOKEN:
Expand Down
Expand Up @@ -17,221 +17,71 @@

package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
import org.apache.kafka.common.message.CreatePartitionsResponseData;
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
import static org.apache.kafka.common.protocol.types.Type.INT32;

public class CreatePartitionsRequest extends AbstractRequest {

private static final String TOPIC_PARTITIONS_KEY_NAME = "topic_partitions";
private static final String NEW_PARTITIONS_KEY_NAME = "new_partitions";
private static final String COUNT_KEY_NAME = "count";
private static final String ASSIGNMENT_KEY_NAME = "assignment";
private static final String TIMEOUT_KEY_NAME = "timeout";
private static final String VALIDATE_ONLY_KEY_NAME = "validate_only";

private static final Schema CREATE_PARTITIONS_REQUEST_V0 = new Schema(
new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(
new Schema(
TOPIC_NAME,
new Field(NEW_PARTITIONS_KEY_NAME, new Schema(
new Field(COUNT_KEY_NAME, INT32, "The new partition count."),
new Field(ASSIGNMENT_KEY_NAME, ArrayOf.nullable(new ArrayOf(INT32)),
"The assigned brokers.")
)))),
"List of topic and the corresponding new partitions."),
new Field(TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for the partitions to be created."),
new Field(VALIDATE_ONLY_KEY_NAME, BOOLEAN,
"If true then validate the request, but don't actually increase the number of partitions."));

/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
*/
private static final Schema CREATE_PARTITIONS_REQUEST_V1 = CREATE_PARTITIONS_REQUEST_V0;

public static Schema[] schemaVersions() {
return new Schema[]{CREATE_PARTITIONS_REQUEST_V0, CREATE_PARTITIONS_REQUEST_V1};
}

// It is an error for duplicate topics to be present in the request,
// so track duplicates here to allow KafkaApis to report per-topic errors.
private final Set<String> duplicates;
private final Map<String, PartitionDetails> newPartitions;
private final int timeout;
private final boolean validateOnly;

public static class PartitionDetails {

private final int totalCount;

private final List<List<Integer>> newAssignments;

public PartitionDetails(int totalCount) {
this(totalCount, null);
}

public PartitionDetails(int totalCount, List<List<Integer>> newAssignments) {
this.totalCount = totalCount;
this.newAssignments = newAssignments;
}

public int totalCount() {
return totalCount;
}

public List<List<Integer>> newAssignments() {
return newAssignments;
}

@Override
public String toString() {
return "(totalCount=" + totalCount() + ", newAssignments=" + newAssignments() + ")";
}

}
private final CreatePartitionsRequestData data;

public static class Builder extends AbstractRequest.Builder<CreatePartitionsRequest> {

private final Map<String, PartitionDetails> newPartitions;
private final int timeout;
private final boolean validateOnly;
private final CreatePartitionsRequestData data;

public Builder(Map<String, PartitionDetails> newPartitions, int timeout, boolean validateOnly) {
public Builder(CreatePartitionsRequestData data) {
super(ApiKeys.CREATE_PARTITIONS);
this.newPartitions = newPartitions;
this.timeout = timeout;
this.validateOnly = validateOnly;
this.data = data;
}

@Override
public CreatePartitionsRequest build(short version) {
return new CreatePartitionsRequest(newPartitions, timeout, validateOnly, version);
return new CreatePartitionsRequest(data, version);
}

@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("(type=CreatePartitionsRequest").
append(", newPartitions=").append(newPartitions).
append(", timeout=").append(timeout).
append(", validateOnly=").append(validateOnly).
append(")");
return bld.toString();
return data.toString();
}
}

CreatePartitionsRequest(Map<String, PartitionDetails> newPartitions, int timeout, boolean validateOnly, short apiVersion) {
CreatePartitionsRequest(CreatePartitionsRequestData data, short apiVersion) {
super(ApiKeys.CREATE_PARTITIONS, apiVersion);
this.newPartitions = newPartitions;
this.duplicates = Collections.emptySet();
this.timeout = timeout;
this.validateOnly = validateOnly;
this.data = data;
}

public CreatePartitionsRequest(Struct struct, short apiVersion) {
super(ApiKeys.CREATE_PARTITIONS, apiVersion);
Object[] topicCountArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME);
Map<String, PartitionDetails> counts = new HashMap<>(topicCountArray.length);
Set<String> dupes = new HashSet<>();
for (Object topicPartitionCountObj : topicCountArray) {
Struct topicPartitionCountStruct = (Struct) topicPartitionCountObj;
String topic = topicPartitionCountStruct.get(TOPIC_NAME);
Struct partitionCountStruct = topicPartitionCountStruct.getStruct(NEW_PARTITIONS_KEY_NAME);
int count = partitionCountStruct.getInt(COUNT_KEY_NAME);
Object[] assignmentsArray = partitionCountStruct.getArray(ASSIGNMENT_KEY_NAME);
PartitionDetails newPartition;
if (assignmentsArray != null) {
List<List<Integer>> assignments = new ArrayList<>(assignmentsArray.length);
for (Object replicas : assignmentsArray) {
Object[] replicasArray = (Object[]) replicas;
List<Integer> replicasList = new ArrayList<>(replicasArray.length);
assignments.add(replicasList);
for (Object broker : replicasArray) {
replicasList.add((Integer) broker);
}
}
newPartition = new PartitionDetails(count, assignments);
} else {
newPartition = new PartitionDetails(count);
}
PartitionDetails dupe = counts.put(topic, newPartition);
if (dupe != null) {
dupes.add(topic);
}
}
this.newPartitions = counts;
this.duplicates = dupes;
this.timeout = struct.getInt(TIMEOUT_KEY_NAME);
this.validateOnly = struct.getBoolean(VALIDATE_ONLY_KEY_NAME);
}

public Set<String> duplicates() {
return duplicates;
}

public Map<String, PartitionDetails> newPartitions() {
return newPartitions;
}

public int timeout() {
return timeout;
}

public boolean validateOnly() {
return validateOnly;
this(new CreatePartitionsRequestData(struct, apiVersion), apiVersion);
}

@Override
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.CREATE_PARTITIONS.requestSchema(version()));
List<Struct> topicPartitionsList = new ArrayList<>();
for (Map.Entry<String, PartitionDetails> topicPartitionCount : this.newPartitions.entrySet()) {
Struct topicPartitionCountStruct = struct.instance(TOPIC_PARTITIONS_KEY_NAME);
topicPartitionCountStruct.set(TOPIC_NAME, topicPartitionCount.getKey());
PartitionDetails partitionDetails = topicPartitionCount.getValue();
Struct partitionCountStruct = topicPartitionCountStruct.instance(NEW_PARTITIONS_KEY_NAME);
partitionCountStruct.set(COUNT_KEY_NAME, partitionDetails.totalCount());
Object[][] assignments = null;
if (partitionDetails.newAssignments() != null) {
assignments = new Object[partitionDetails.newAssignments().size()][];
int i = 0;
for (List<Integer> partitionAssignment : partitionDetails.newAssignments()) {
assignments[i] = partitionAssignment.toArray(new Object[0]);
i++;
}
}
partitionCountStruct.set(ASSIGNMENT_KEY_NAME, assignments);
topicPartitionCountStruct.set(NEW_PARTITIONS_KEY_NAME, partitionCountStruct);
topicPartitionsList.add(topicPartitionCountStruct);
}
struct.set(TOPIC_PARTITIONS_KEY_NAME, topicPartitionsList.toArray(new Object[0]));
struct.set(TIMEOUT_KEY_NAME, this.timeout);
struct.set(VALIDATE_ONLY_KEY_NAME, this.validateOnly);
return struct;
return data.toStruct(version());
}

public CreatePartitionsRequestData data() {
return data;
}

@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Map<String, ApiError> topicErrors = new HashMap<>();
for (String topic : newPartitions.keySet()) {
topicErrors.put(topic, ApiError.fromThrowable(e));
CreatePartitionsResponseData response = new CreatePartitionsResponseData();
response.setThrottleTimeMs(throttleTimeMs);

ApiError apiError = ApiError.fromThrowable(e);
for (CreatePartitionsTopic topic : data.topics()) {
response.results().add(new CreatePartitionsTopicResult()
.setName(topic.name())
.setErrorCode(apiError.error().code())
.setErrorMessage(apiError.message())
);
}
return new CreatePartitionsResponse(throttleTimeMs, topicErrors);
return new CreatePartitionsResponse(response);
}

public static CreatePartitionsRequest parse(ByteBuffer buffer, short version) {
Expand Down