Skip to content

Commit

Permalink
MINOR: Remove Struct from Request/Response classes
Browse files Browse the repository at this point in the history
More details:
* Replaced `struct` field in Request/Response with a `toStruct` method. This
makes the performance model (including memory usage) easier to understand.
Note that requests have `toStruct()` while responses have `toStruct(version)`.
* Replaced mutable `version` field in `Request.Builder` with an immutable
field `desiredVersion` and a `version` parameter passed to the `build` method.
* Optimised `handleFetchRequest` to avoid unnecessary creation of `Struct`
instances (from 4 to 2 in the worst case and 2 to 1 in the best case).
* Various clean-ups in request/response classes and their test. In particular,
it is now clear what we are testing. Previously, it looked like we were testing
more than we really were.

With this in place, we could remove `AbstractRequest.Builder` in the future by
doing the following:
* Change `AbstractRequest.toStruct` to accept a version (like responses).
* Change `AbstractRequest.version` to be `desiredVersion` (like `Builder`).
* Change `ClientRequest` to take `AbstractRequest`.
* Move validation from the `build` methods to the request constructors or
static factory methods.
* Anything else required for the code to compile again.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Apurva Mehta <apurva.1618@gmail.com>, Jason Gustafson <jason@confluent.io>

Closes #2513 from ijuma/separate-struct
  • Loading branch information
ijuma authored and hachikuji committed Feb 17, 2017
1 parent 1f2ee5f commit fc1cfe4
Show file tree
Hide file tree
Showing 83 changed files with 1,681 additions and 1,819 deletions.
Expand Up @@ -74,9 +74,8 @@ public ApiKeys apiKey() {
return requestBuilder.apiKey();
}

public RequestHeader makeHeader() {
return new RequestHeader(requestBuilder.apiKey().id,
requestBuilder.version(), clientId, correlationId);
public RequestHeader makeHeader(short version) {
return new RequestHeader(requestBuilder.apiKey().id, version, clientId, correlationId);
}

public AbstractRequest.Builder<?> requestBuilder() {
Expand Down
21 changes: 12 additions & 9 deletions clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
Expand Up @@ -280,43 +280,46 @@ private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long
if (!canSendRequest(nodeId))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
}
AbstractRequest request = null;
AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
try {
NodeApiVersions versionInfo = nodeApiVersions.get(nodeId);
short version;
// Note: if versionInfo is null, we have no server version information. This would be
// the case when sending the initial ApiVersionRequest which fetches the version
// information itself. It is also the case when discoverBrokerVersions is set to false.
if (versionInfo == null) {
version = builder.desiredOrLatestVersion();
if (discoverBrokerVersions && log.isTraceEnabled())
log.trace("No version information found when sending message of type {} to node {}. " +
"Assuming version {}.", clientRequest.apiKey(), nodeId, builder.version());
"Assuming version {}.", clientRequest.apiKey(), nodeId, version);
} else {
short version = versionInfo.usableVersion(clientRequest.apiKey());
builder.setVersion(version);
version = versionInfo.usableVersion(clientRequest.apiKey());
}
// The call to build may also throw UnsupportedVersionException, if there are essential
// fields that cannot be represented in the chosen version.
request = builder.build();
doSend(clientRequest, isInternalRequest, now, builder.build(version));
} catch (UnsupportedVersionException e) {
// If the version is not supported, skip sending the request over the wire.
// Instead, simply add it to the local queue of aborted requests.
log.debug("Version mismatch when attempting to send {} to {}",
clientRequest.toString(), clientRequest.destination(), e);
ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(),
ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.desiredOrLatestVersion()),
clientRequest.callback(), clientRequest.destination(), now, now,
false, e, null);
abortedSends.add(clientResponse);
return;
}
RequestHeader header = clientRequest.makeHeader();
}

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
String nodeId = clientRequest.destination();
RequestHeader header = clientRequest.makeHeader(request.version());
if (log.isDebugEnabled()) {
int latestClientVersion = ProtoUtils.latestVersion(clientRequest.apiKey().id);
if (header.apiVersion() == latestClientVersion) {
log.trace("Sending {} to node {}.", request, nodeId);
} else {
log.debug("Using older server API v{} to send {} to node {}.",
header.apiVersion(), request, nodeId);
header.apiVersion(), request, nodeId);
}
}
Send send = request.toSend(nodeId, header);
Expand Down
Expand Up @@ -697,11 +697,10 @@ private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, Of
if (generation == null)
return RequestFuture.failure(new CommitFailedException());

OffsetCommitRequest.Builder builder =
new OffsetCommitRequest.Builder(this.groupId, offsetData).
setGenerationId(generation.generationId).
setMemberId(generation.memberId).
setRetentionTime(OffsetCommitRequest.DEFAULT_RETENTION_TIME);
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(this.groupId, offsetData).
setGenerationId(generation.generationId).
setMemberId(generation.memberId).
setRetentionTime(OffsetCommitRequest.DEFAULT_RETENTION_TIME);

log.trace("Sending OffsetCommit request with {} to coordinator {} for group {}", offsets, coordinator, groupId);

Expand Down
Expand Up @@ -343,8 +343,9 @@ private void checkDisconnects(long now) {
iterator.remove();
for (ClientRequest request : requestEntry.getValue()) {
RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
handler.onComplete(new ClientResponse(request.makeHeader(), request.callback(), request.destination(),
request.createdTimeMs(), now, true, null, null));
handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().desiredOrLatestVersion()),
request.callback(), request.destination(), request.createdTimeMs(), now, true,
null, null));
}
}
}
Expand Down
Expand Up @@ -200,11 +200,11 @@ public void onSuccess(ClientResponse resp) {
long fetchOffset = request.fetchData().get(partition).offset;
FetchResponse.PartitionData fetchData = entry.getValue();
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
request.version()));
resp.requestHeader().apiVersion()));
}

sensors.fetchLatency.record(resp.requestLatencyMs());
sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
sensors.fetchThrottleTimeSensor.record(response.throttleTimeMs());
}

@Override
Expand Down Expand Up @@ -603,13 +603,12 @@ public void onFailure(RuntimeException e) {
* @return A response which can be polled to obtain the corresponding timestamps and offsets.
*/
private RequestFuture<Map<TopicPartition, OffsetData>> sendListOffsetRequest(final Node node,
final Map<TopicPartition, Long> timestampsToSearch,
boolean requireTimestamp) {
ListOffsetRequest.Builder builder = new ListOffsetRequest.Builder().setTargetTimes(timestampsToSearch);

// If we need a timestamp in the response, the minimum RPC version we can send is v1.
// Otherwise, v0 is OK.
builder.setMinVersion(requireTimestamp ? (short) 1 : (short) 0);
final Map<TopicPartition, Long> timestampsToSearch,
boolean requireTimestamp) {
// If we need a timestamp in the response, the minimum RPC version we can send is v1. Otherwise, v0 is OK.
short minVersion = requireTimestamp ? (short) 1 : (short) 0;
ListOffsetRequest.Builder builder = ListOffsetRequest.Builder.forConsumer(minVersion)
.setTargetTimes(timestampsToSearch);

log.trace("Sending ListOffsetRequest {} to broker {}", builder, node);
return client.send(node, builder)
Expand Down Expand Up @@ -733,7 +732,7 @@ private Map<Node, FetchRequest.Builder> createFetchRequests() {
Map<Node, FetchRequest.Builder> requests = new HashMap<>();
for (Map.Entry<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
Node node = entry.getKey();
FetchRequest.Builder fetch = new FetchRequest.Builder(this.maxWaitMs, this.minBytes, entry.getValue()).
FetchRequest.Builder fetch = FetchRequest.Builder.forConsumer(this.maxWaitMs, this.minBytes, entry.getValue()).
setMaxBytes(this.maxBytes);
requests.put(node, fetch);
}
Expand Down
Expand Up @@ -25,40 +25,39 @@
import java.nio.ByteBuffer;

public abstract class AbstractRequest extends AbstractRequestResponse {
private final short version;

public static abstract class Builder<T extends AbstractRequest> {
private final ApiKeys apiKey;
private short version;
private final Short desiredVersion;

public Builder(ApiKeys apiKey) {
this(apiKey, null);
}

public Builder(ApiKeys apiKey, Short desiredVersion) {
this.apiKey = apiKey;
this.version = ProtoUtils.latestVersion(apiKey.id);
this.desiredVersion = desiredVersion;
}

public ApiKeys apiKey() {
return apiKey;
}

public Builder<T> setVersion(short version) {
this.version = version;
return this;
public short desiredOrLatestVersion() {
return desiredVersion == null ? ProtoUtils.latestVersion(apiKey.id) : desiredVersion;
}

public short version() {
return version;
public T build() {
return build(desiredOrLatestVersion());
}

public abstract T build();
public abstract T build(short version);
}

public AbstractRequest(Struct struct, short version) {
super(struct);
this.version = version;
}
private final short version;

public Send toSend(String destination, RequestHeader header) {
return new NetworkSend(destination, serialize(header, this));
public AbstractRequest(short version) {
this.version = version;
}

/**
Expand All @@ -68,6 +67,19 @@ public short version() {
return version;
}

public Send toSend(String destination, RequestHeader header) {
return new NetworkSend(destination, serialize(header));
}

/**
* Use with care, typically {@link #toSend(String, RequestHeader)} should be used instead.
*/
public ByteBuffer serialize(RequestHeader header) {
return serialize(header.toStruct(), toStruct());
}

protected abstract Struct toStruct();

/**
* Get an error response for a request
*/
Expand All @@ -76,54 +88,78 @@ public short version() {
/**
* Factory method for getting a request object based on ApiKey ID and a buffer
*/
public static AbstractRequest getRequest(int requestId, short versionId, ByteBuffer buffer) {
public static RequestAndSize getRequest(int requestId, short version, ByteBuffer buffer) {
ApiKeys apiKey = ApiKeys.forId(requestId);
Struct struct = ProtoUtils.parseRequest(apiKey.id, version, buffer);
AbstractRequest request;
switch (apiKey) {
case PRODUCE:
return ProduceRequest.parse(buffer, versionId);
request = new ProduceRequest(struct, version);
break;
case FETCH:
return FetchRequest.parse(buffer, versionId);
request = new FetchRequest(struct, version);
break;
case LIST_OFFSETS:
return ListOffsetRequest.parse(buffer, versionId);
request = new ListOffsetRequest(struct, version);
break;
case METADATA:
return MetadataRequest.parse(buffer, versionId);
request = new MetadataRequest(struct, version);
break;
case OFFSET_COMMIT:
return OffsetCommitRequest.parse(buffer, versionId);
request = new OffsetCommitRequest(struct, version);
break;
case OFFSET_FETCH:
return OffsetFetchRequest.parse(buffer, versionId);
request = new OffsetFetchRequest(struct, version);
break;
case GROUP_COORDINATOR:
return GroupCoordinatorRequest.parse(buffer, versionId);
request = new GroupCoordinatorRequest(struct, version);
break;
case JOIN_GROUP:
return JoinGroupRequest.parse(buffer, versionId);
request = new JoinGroupRequest(struct, version);
break;
case HEARTBEAT:
return HeartbeatRequest.parse(buffer, versionId);
request = new HeartbeatRequest(struct, version);
break;
case LEAVE_GROUP:
return LeaveGroupRequest.parse(buffer, versionId);
request = new LeaveGroupRequest(struct, version);
break;
case SYNC_GROUP:
return SyncGroupRequest.parse(buffer, versionId);
request = new SyncGroupRequest(struct, version);
break;
case STOP_REPLICA:
return StopReplicaRequest.parse(buffer, versionId);
request = new StopReplicaRequest(struct, version);
break;
case CONTROLLED_SHUTDOWN_KEY:
return ControlledShutdownRequest.parse(buffer, versionId);
request = new ControlledShutdownRequest(struct, version);
break;
case UPDATE_METADATA_KEY:
return UpdateMetadataRequest.parse(buffer, versionId);
request = new UpdateMetadataRequest(struct, version);
break;
case LEADER_AND_ISR:
return LeaderAndIsrRequest.parse(buffer, versionId);
request = new LeaderAndIsrRequest(struct, version);
break;
case DESCRIBE_GROUPS:
return DescribeGroupsRequest.parse(buffer, versionId);
request = new DescribeGroupsRequest(struct, version);
break;
case LIST_GROUPS:
return ListGroupsRequest.parse(buffer, versionId);
request = new ListGroupsRequest(struct, version);
break;
case SASL_HANDSHAKE:
return SaslHandshakeRequest.parse(buffer, versionId);
request = new SaslHandshakeRequest(struct, version);
break;
case API_VERSIONS:
return ApiVersionsRequest.parse(buffer, versionId);
request = new ApiVersionsRequest(struct, version);
break;
case CREATE_TOPICS:
return CreateTopicsRequest.parse(buffer, versionId);
request = new CreateTopicsRequest(struct, version);
break;
case DELETE_TOPICS:
return DeleteTopicsRequest.parse(buffer, versionId);
request = new DeleteTopicsRequest(struct, version);
break;
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " +
"code should be updated to do so.", apiKey));
}
return new RequestAndSize(request, struct.sizeOf());
}
}
Expand Up @@ -17,56 +17,13 @@
import java.nio.ByteBuffer;

public abstract class AbstractRequestResponse {
protected final Struct struct;

public AbstractRequestResponse(Struct struct) {
this.struct = struct;
}

public Struct toStruct() {
return struct;
}

/**
* Get the serialized size of this object
*/
public int sizeOf() {
return struct.sizeOf();
}

/**
* Write this object to a buffer
* Visible for testing.
*/
public void writeTo(ByteBuffer buffer) {
struct.writeTo(buffer);
}

@Override
public String toString() {
return struct.toString();
}

@Override
public int hashCode() {
return struct.hashCode();
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
AbstractRequestResponse other = (AbstractRequestResponse) obj;
return struct.equals(other.struct);
}

public static ByteBuffer serialize(AbstractRequestResponse header, AbstractRequestResponse body) {
ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
header.writeTo(buffer);
body.writeTo(buffer);
public static ByteBuffer serialize(Struct headerStruct, Struct bodyStruct) {
ByteBuffer buffer = ByteBuffer.allocate(headerStruct.sizeOf() + bodyStruct.sizeOf());
headerStruct.writeTo(buffer);
bodyStruct.writeTo(buffer);
buffer.rewind();
return buffer;
}
Expand Down

0 comments on commit fc1cfe4

Please sign in to comment.