Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ public enum ApiKeys {
PREPARE_S3_OBJECT(ApiMessageType.PREPARE_S3_OBJECT, false, true),
COMMIT_WALOBJECT(ApiMessageType.COMMIT_WALOBJECT, false, true),
COMMIT_STREAM_OBJECT(ApiMessageType.COMMIT_STREAM_OBJECT, false, true),
GET_STREAMS_OFFSET(ApiMessageType.GET_STREAMS_OFFSET, false, true);
GET_STREAMS_OFFSET(ApiMessageType.GET_STREAMS_OFFSET, false, true),
GET_KV(ApiMessageType.GET_KV, false, true),
PUT_KV(ApiMessageType.PUT_KV, false, true),
DELETE_KV(ApiMessageType.DELETE_KV, false, true);
// Kafka on S3 inject end

private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@
import org.apache.kafka.common.requests.s3.CommitStreamObjectRequest;
import org.apache.kafka.common.requests.s3.CommitWALObjectRequest;
import org.apache.kafka.common.requests.s3.CreateStreamRequest;
import org.apache.kafka.common.requests.s3.DeleteKVRequest;
import org.apache.kafka.common.requests.s3.DeleteStreamRequest;
import org.apache.kafka.common.requests.s3.GetKVRequest;
import org.apache.kafka.common.requests.s3.GetStreamsOffsetRequest;
import org.apache.kafka.common.requests.s3.OpenStreamRequest;
import org.apache.kafka.common.requests.s3.PrepareS3ObjectRequest;
import org.apache.kafka.common.requests.s3.PutKVRequest;

public abstract class AbstractRequest implements AbstractRequestResponse {

Expand Down Expand Up @@ -330,6 +333,12 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion,
return CommitStreamObjectRequest.parse(buffer, apiVersion);
case GET_STREAMS_OFFSET:
return GetStreamsOffsetRequest.parse(buffer, apiVersion);
case GET_KV:
return GetKVRequest.parse(buffer, apiVersion);
case PUT_KV:
return PutKVRequest.parse(buffer, apiVersion);
case DELETE_KV:
return DeleteKVRequest.parse(buffer, apiVersion);
// Kafka on S3 inject end
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@
import org.apache.kafka.common.requests.s3.CommitStreamObjectResponse;
import org.apache.kafka.common.requests.s3.CommitWALObjectResponse;
import org.apache.kafka.common.requests.s3.CreateStreamResponse;
import org.apache.kafka.common.requests.s3.DeleteKVResponse;
import org.apache.kafka.common.requests.s3.DeleteStreamResponse;
import org.apache.kafka.common.requests.s3.GetKVResponse;
import org.apache.kafka.common.requests.s3.GetStreamsOffsetResponse;
import org.apache.kafka.common.requests.s3.OpenStreamResponse;
import org.apache.kafka.common.requests.s3.PrepareS3ObjectResponse;
import org.apache.kafka.common.requests.s3.PutKVResponse;

public abstract class AbstractResponse implements AbstractRequestResponse {
public static final int DEFAULT_THROTTLE_TIME = 0;
Expand Down Expand Up @@ -274,6 +277,12 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response
return CommitWALObjectResponse.parse(responseBuffer, version);
case GET_STREAMS_OFFSET:
return GetStreamsOffsetResponse.parse(responseBuffer, version);
case GET_KV:
return GetKVResponse.parse(responseBuffer, version);
case PUT_KV:
return PutKVResponse.parse(responseBuffer, version);
case DELETE_KV:
return DeleteKVResponse.parse(responseBuffer, version);
// Kafka on S3 inject end
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.requests.s3;

import java.nio.ByteBuffer;
import org.apache.kafka.common.message.CreateStreamResponseData;
import org.apache.kafka.common.message.DeleteKVRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiError;

public class DeleteKVRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<DeleteKVRequest> {

private final DeleteKVRequestData data;
public Builder(DeleteKVRequestData data) {
super(ApiKeys.DELETE_KV);
this.data = data;
}

@Override
public DeleteKVRequest build(short version) {
return new DeleteKVRequest(data, version);
}

@Override
public String toString() {
return data.toString();
}
}

private final DeleteKVRequestData data;

public DeleteKVRequest(DeleteKVRequestData data, short version) {
super(ApiKeys.DELETE_KV, version);
this.data = data;
}

@Override
public CreateStreamResponse getErrorResponse(int throttleTimeMs, Throwable e) {
ApiError apiError = ApiError.fromThrowable(e);
CreateStreamResponseData response = new CreateStreamResponseData()
.setErrorCode(apiError.error().code())
.setThrottleTimeMs(throttleTimeMs);
return new CreateStreamResponse(response);
}

@Override
public DeleteKVRequestData data() {
return data;
}

public static DeleteKVRequest parse(ByteBuffer buffer, short version) {
return new DeleteKVRequest(new DeleteKVRequestData(
new ByteBufferAccessor(buffer), version), version);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.requests.s3;

import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.kafka.common.message.DeleteKVResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;

public class DeleteKVResponse extends AbstractResponse {
private final DeleteKVResponseData data;

public DeleteKVResponse(DeleteKVResponseData data) {
super(ApiKeys.DELETE_KV);
this.data = data;
}

@Override
public DeleteKVResponseData data() {
return data;
}

@Override
public Map<Errors, Integer> errorCounts() {
return errorCounts(Errors.forCode(data.errorCode()));
}

@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}

@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}

public static DeleteKVResponse parse(ByteBuffer buffer, short version) {
return new DeleteKVResponse(new DeleteKVResponseData(
new ByteBufferAccessor(buffer), version));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.requests.s3;

import java.nio.ByteBuffer;
import org.apache.kafka.common.message.CreateStreamResponseData;
import org.apache.kafka.common.message.GetKVRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiError;

public class GetKVRequest extends AbstractRequest {

public static class Builder extends AbstractRequest.Builder<GetKVRequest> {

private final GetKVRequestData data;
public Builder(GetKVRequestData data) {
super(ApiKeys.GET_KV);
this.data = data;
}

@Override
public GetKVRequest build(short version) {
return new GetKVRequest(data, version);
}

@Override
public String toString() {
return data.toString();
}
}

private final GetKVRequestData data;

public GetKVRequest(GetKVRequestData data, short version) {
super(ApiKeys.GET_KV, version);
this.data = data;
}

@Override
public CreateStreamResponse getErrorResponse(int throttleTimeMs, Throwable e) {
ApiError apiError = ApiError.fromThrowable(e);
CreateStreamResponseData response = new CreateStreamResponseData()
.setErrorCode(apiError.error().code())
.setThrottleTimeMs(throttleTimeMs);
return new CreateStreamResponse(response);
}

@Override
public GetKVRequestData data() {
return data;
}

public static GetKVRequest parse(ByteBuffer buffer, short version) {
return new GetKVRequest(new GetKVRequestData(
new ByteBufferAccessor(buffer), version), version);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.requests.s3;

import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.kafka.common.message.GetKVResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;

public class GetKVResponse extends AbstractResponse {

private final GetKVResponseData data;

public GetKVResponse(GetKVResponseData data) {
super(ApiKeys.GET_KV);
this.data = data;
}

@Override
public GetKVResponseData data() {
return data;
}

@Override
public Map<Errors, Integer> errorCounts() {
return errorCounts(Errors.forCode(data.errorCode()));
}

@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}

@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}

public static GetKVResponse parse(ByteBuffer buffer, short version) {
return new GetKVResponse(new GetKVResponseData(
new ByteBufferAccessor(buffer), version));
}

}
Loading