Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@

import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.kafka.common.requests.s3.CloseStreamRequest;
import org.apache.kafka.common.requests.s3.CommitStreamObjectRequest;
import org.apache.kafka.common.requests.s3.CommitWALObjectRequest;
import org.apache.kafka.common.requests.s3.CreateStreamRequest;
import org.apache.kafka.common.requests.s3.DeleteStreamRequest;
import org.apache.kafka.common.requests.s3.GetStreamsOffsetRequest;
import org.apache.kafka.common.requests.s3.OpenStreamRequest;
import org.apache.kafka.common.requests.s3.PrepareS3ObjectRequest;

public abstract class AbstractRequest implements AbstractRequestResponse {

Expand Down Expand Up @@ -165,6 +173,7 @@ public static RequestAndSize parseRequest(ApiKeys apiKey, short apiVersion, Byte
return new RequestAndSize(doParseRequest(apiKey, apiVersion, buffer), bufferSize);
}

@SuppressWarnings("all")
private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion, ByteBuffer buffer) {
switch (apiKey) {
case PRODUCE:
Expand Down Expand Up @@ -303,6 +312,25 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion,
return ListTransactionsRequest.parse(buffer, apiVersion);
case ALLOCATE_PRODUCER_IDS:
return AllocateProducerIdsRequest.parse(buffer, apiVersion);

// Kafka on S3 inject start
case CREATE_STREAM:
return CreateStreamRequest.parse(buffer, apiVersion);
case OPEN_STREAM:
return OpenStreamRequest.parse(buffer, apiVersion);
case CLOSE_STREAM:
return CloseStreamRequest.parse(buffer, apiVersion);
case DELETE_STREAM:
return DeleteStreamRequest.parse(buffer, apiVersion);
case PREPARE_S3_OBJECT:
return PrepareS3ObjectRequest.parse(buffer, apiVersion);
case COMMIT_WALOBJECT:
return CommitWALObjectRequest.parse(buffer, apiVersion);
case COMMIT_STREAM_OBJECT:
return CommitStreamObjectRequest.parse(buffer, apiVersion);
case GET_STREAMS_OFFSET:
return GetStreamsOffsetRequest.parse(buffer, apiVersion);
// Kafka on S3 inject end
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.requests.s3.CloseStreamResponse;
import org.apache.kafka.common.requests.s3.CommitStreamObjectResponse;
import org.apache.kafka.common.requests.s3.CommitWALObjectResponse;
import org.apache.kafka.common.requests.s3.CreateStreamResponse;
import org.apache.kafka.common.requests.s3.DeleteStreamResponse;
import org.apache.kafka.common.requests.s3.GetStreamsOffsetResponse;
import org.apache.kafka.common.requests.s3.OpenStreamResponse;
import org.apache.kafka.common.requests.s3.PrepareS3ObjectResponse;

public abstract class AbstractResponse implements AbstractRequestResponse {
public static final int DEFAULT_THROTTLE_TIME = 0;
Expand Down Expand Up @@ -109,6 +117,7 @@ public static AbstractResponse parseResponse(ByteBuffer buffer, RequestHeader re
return AbstractResponse.parseResponse(apiKey, buffer, apiVersion);
}

@SuppressWarnings("all")
public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer responseBuffer, short version) {
switch (apiKey) {
case PRODUCE:
Expand Down Expand Up @@ -247,6 +256,25 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response
return ListTransactionsResponse.parse(responseBuffer, version);
case ALLOCATE_PRODUCER_IDS:
return AllocateProducerIdsResponse.parse(responseBuffer, version);

// Kafka on S3 inject start
case CREATE_STREAM:
return CreateStreamResponse.parse(responseBuffer, version);
case OPEN_STREAM:
return OpenStreamResponse.parse(responseBuffer, version);
case DELETE_STREAM:
return DeleteStreamResponse.parse(responseBuffer, version);
case CLOSE_STREAM:
return CloseStreamResponse.parse(responseBuffer, version);
case PREPARE_S3_OBJECT:
return PrepareS3ObjectResponse.parse(responseBuffer, version);
case COMMIT_STREAM_OBJECT:
return CommitStreamObjectResponse.parse(responseBuffer, version);
case COMMIT_WALOBJECT:
return CommitWALObjectResponse.parse(responseBuffer, version);
case GET_STREAMS_OFFSET:
return GetStreamsOffsetResponse.parse(responseBuffer, version);
// Kafka on S3 inject end
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.kafka.common.requests.s3;

import java.nio.ByteBuffer;
import org.apache.kafka.common.message.CloseStreamRequestData;
import org.apache.kafka.common.message.CloseStreamResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiError;

Expand Down Expand Up @@ -63,4 +65,9 @@ public CloseStreamResponse getErrorResponse(int throttleTimeMs, Throwable e) {
public CloseStreamRequestData data() {
return data;
}

public static CloseStreamRequest parse(ByteBuffer buffer, short version) {
return new CloseStreamRequest(new CloseStreamRequestData(
new ByteBufferAccessor(buffer), version), version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.kafka.common.requests.s3;

import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.kafka.common.message.CloseStreamResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;

Expand Down Expand Up @@ -51,4 +53,9 @@ public int throttleTimeMs() {
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}

public static CloseStreamResponse parse(ByteBuffer buffer, short version) {
return new CloseStreamResponse(new CloseStreamResponseData(
new ByteBufferAccessor(buffer), version));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.kafka.common.requests.s3;

import java.nio.ByteBuffer;
import org.apache.kafka.common.message.CommitStreamObjectRequestData;
import org.apache.kafka.common.message.CommitStreamObjectResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiError;

Expand Down Expand Up @@ -65,4 +67,9 @@ public CommitStreamObjectResponse getErrorResponse(int throttleTimeMs, Throwable
public CommitStreamObjectRequestData data() {
return data;
}

public static CommitStreamObjectRequest parse(ByteBuffer buffer, short version) {
return new CommitStreamObjectRequest(new CommitStreamObjectRequestData(
new ByteBufferAccessor(buffer), version), version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.kafka.common.requests.s3;

import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.kafka.common.message.CommitStreamObjectResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;

Expand Down Expand Up @@ -52,4 +54,9 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}

public static CommitStreamObjectResponse parse(ByteBuffer buffer, short version) {
return new CommitStreamObjectResponse(new CommitStreamObjectResponseData(
new ByteBufferAccessor(buffer), version));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.kafka.common.requests.s3;

import java.nio.ByteBuffer;
import org.apache.kafka.common.message.CommitWALObjectRequestData;
import org.apache.kafka.common.message.CommitWALObjectResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiError;

Expand Down Expand Up @@ -64,4 +66,9 @@ public CommitWALObjectRequestData data() {
return data;
}

public static CommitWALObjectRequest parse(ByteBuffer buffer, short version) {
return new CommitWALObjectRequest(new CommitWALObjectRequestData(
new ByteBufferAccessor(buffer), version), version);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.kafka.common.requests.s3;

import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.kafka.common.message.CommitWALObjectResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;

Expand Down Expand Up @@ -52,5 +54,10 @@ public int throttleTimeMs() {
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}

public static CommitWALObjectResponse parse(ByteBuffer buffer, short version) {
return new CommitWALObjectResponse(new CommitWALObjectResponseData(
new ByteBufferAccessor(buffer), version));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.kafka.common.requests.s3;

import java.nio.ByteBuffer;
import org.apache.kafka.common.message.CreateStreamRequestData;
import org.apache.kafka.common.message.CreateStreamResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiError;

Expand Down Expand Up @@ -64,4 +66,9 @@ public CreateStreamResponse getErrorResponse(int throttleTimeMs, Throwable e) {
public CreateStreamRequestData data() {
return data;
}

public static CreateStreamRequest parse(ByteBuffer buffer, short version) {
return new CreateStreamRequest(new CreateStreamRequestData(
new ByteBufferAccessor(buffer), version), version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.kafka.common.requests.s3;

import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.kafka.common.message.CreateStreamResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;

Expand Down Expand Up @@ -51,4 +53,9 @@ public int throttleTimeMs() {
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}

public static CreateStreamResponse parse(ByteBuffer buffer, short version) {
return new CreateStreamResponse(new CreateStreamResponseData(
new ByteBufferAccessor(buffer), version));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.kafka.common.requests.s3;

import java.nio.ByteBuffer;
import org.apache.kafka.common.message.DeleteStreamRequestData;
import org.apache.kafka.common.message.DeleteStreamResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiError;

Expand Down Expand Up @@ -62,4 +64,9 @@ public DeleteStreamResponse getErrorResponse(int throttleTimeMs, Throwable e) {
public DeleteStreamRequestData data() {
return data;
}

public static DeleteStreamRequest parse(ByteBuffer buffer, short version) {
return new DeleteStreamRequest(new DeleteStreamRequestData(
new ByteBufferAccessor(buffer), version), version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
*/
package org.apache.kafka.common.requests.s3;

import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.kafka.common.message.DeleteStreamResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;

Expand Down Expand Up @@ -50,4 +52,9 @@ public int throttleTimeMs() {
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}

public static DeleteStreamResponse parse(ByteBuffer buffer, short version) {
return new DeleteStreamResponse(new DeleteStreamResponseData(
new ByteBufferAccessor(buffer), version));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.kafka.common.requests.s3;

import java.nio.ByteBuffer;
import org.apache.kafka.common.message.GetStreamsOffsetRequestData;
import org.apache.kafka.common.message.CreateStreamResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiError;

Expand Down Expand Up @@ -64,5 +66,10 @@ public CreateStreamResponse getErrorResponse(int throttleTimeMs, Throwable e) {
public GetStreamsOffsetRequestData data() {
return data;
}

public static GetStreamsOffsetRequest parse(ByteBuffer buffer, short version) {
return new GetStreamsOffsetRequest(new GetStreamsOffsetRequestData(
new ByteBufferAccessor(buffer), version), version);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.kafka.common.requests.s3;

import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.kafka.common.message.GetStreamsOffsetResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;

Expand Down Expand Up @@ -50,4 +52,9 @@ public int throttleTimeMs() {
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}

public static GetStreamsOffsetResponse parse(ByteBuffer buffer, short version) {
return new GetStreamsOffsetResponse(new GetStreamsOffsetResponseData(
new ByteBufferAccessor(buffer), version));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.kafka.common.requests.s3;

import java.nio.ByteBuffer;
import org.apache.kafka.common.message.OpenStreamRequestData;
import org.apache.kafka.common.message.OpenStreamResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiError;

Expand All @@ -29,7 +31,7 @@ public static class Builder extends AbstractRequest.Builder<OpenStreamRequest> {

private final OpenStreamRequestData data;
public Builder(OpenStreamRequestData data) {
super(ApiKeys.CREATE_STREAM);
super(ApiKeys.OPEN_STREAM);
this.data = data;
}

Expand Down Expand Up @@ -63,4 +65,9 @@ public OpenStreamResponse getErrorResponse(int throttleTimeMs, Throwable e) {
public OpenStreamRequestData data() {
return data;
}

public static OpenStreamRequest parse(ByteBuffer buffer, short version) {
return new OpenStreamRequest(new OpenStreamRequestData(
new ByteBufferAccessor(buffer), version), version);
}
}
Loading