diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index b270523334..b48126c021 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -119,7 +119,10 @@ public enum ApiKeys { PREPARE_S3_OBJECT(ApiMessageType.PREPARE_S3_OBJECT, false, true), COMMIT_WALOBJECT(ApiMessageType.COMMIT_WALOBJECT, false, true), COMMIT_STREAM_OBJECT(ApiMessageType.COMMIT_STREAM_OBJECT, false, true), - GET_STREAMS_OFFSET(ApiMessageType.GET_STREAMS_OFFSET, false, true); + GET_STREAMS_OFFSET(ApiMessageType.GET_STREAMS_OFFSET, false, true), + GET_KV(ApiMessageType.GET_KV, false, true), + PUT_KV(ApiMessageType.PUT_KV, false, true), + DELETE_KV(ApiMessageType.DELETE_KV, false, true); // Kafka on S3 inject end private static final Map> APIS_BY_LISTENER = diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 3da7f16225..ec67d03b21 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -30,10 +30,13 @@ import org.apache.kafka.common.requests.s3.CommitStreamObjectRequest; import org.apache.kafka.common.requests.s3.CommitWALObjectRequest; import org.apache.kafka.common.requests.s3.CreateStreamRequest; +import org.apache.kafka.common.requests.s3.DeleteKVRequest; import org.apache.kafka.common.requests.s3.DeleteStreamRequest; +import org.apache.kafka.common.requests.s3.GetKVRequest; import org.apache.kafka.common.requests.s3.GetStreamsOffsetRequest; import org.apache.kafka.common.requests.s3.OpenStreamRequest; import org.apache.kafka.common.requests.s3.PrepareS3ObjectRequest; +import org.apache.kafka.common.requests.s3.PutKVRequest; public abstract class AbstractRequest implements AbstractRequestResponse { @@ -330,6 +333,12 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion, return CommitStreamObjectRequest.parse(buffer, apiVersion); case GET_STREAMS_OFFSET: return GetStreamsOffsetRequest.parse(buffer, apiVersion); + case GET_KV: + return GetKVRequest.parse(buffer, apiVersion); + case PUT_KV: + return PutKVRequest.parse(buffer, apiVersion); + case DELETE_KV: + return DeleteKVRequest.parse(buffer, apiVersion); // Kafka on S3 inject end default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index d8f1aa008f..086df2aff9 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -33,10 +33,13 @@ import org.apache.kafka.common.requests.s3.CommitStreamObjectResponse; import org.apache.kafka.common.requests.s3.CommitWALObjectResponse; import org.apache.kafka.common.requests.s3.CreateStreamResponse; +import org.apache.kafka.common.requests.s3.DeleteKVResponse; import org.apache.kafka.common.requests.s3.DeleteStreamResponse; +import org.apache.kafka.common.requests.s3.GetKVResponse; import org.apache.kafka.common.requests.s3.GetStreamsOffsetResponse; import org.apache.kafka.common.requests.s3.OpenStreamResponse; import org.apache.kafka.common.requests.s3.PrepareS3ObjectResponse; +import org.apache.kafka.common.requests.s3.PutKVResponse; public abstract class AbstractResponse implements AbstractRequestResponse { public static final int DEFAULT_THROTTLE_TIME = 0; @@ -274,6 +277,12 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response return CommitWALObjectResponse.parse(responseBuffer, version); case GET_STREAMS_OFFSET: return GetStreamsOffsetResponse.parse(responseBuffer, version); + case GET_KV: + return GetKVResponse.parse(responseBuffer, version); + case PUT_KV: + return PutKVResponse.parse(responseBuffer, version); + case DELETE_KV: + return DeleteKVResponse.parse(responseBuffer, version); // Kafka on S3 inject end default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + diff --git a/clients/src/main/java/org/apache/kafka/common/requests/s3/DeleteKVRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/DeleteKVRequest.java new file mode 100644 index 0000000000..87b88df794 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/DeleteKVRequest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests.s3; + +import java.nio.ByteBuffer; +import org.apache.kafka.common.message.CreateStreamResponseData; +import org.apache.kafka.common.message.DeleteKVRequestData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.ApiError; + +public class DeleteKVRequest extends AbstractRequest { + public static class Builder extends AbstractRequest.Builder { + + private final DeleteKVRequestData data; + public Builder(DeleteKVRequestData data) { + super(ApiKeys.DELETE_KV); + this.data = data; + } + + @Override + public DeleteKVRequest build(short version) { + return new DeleteKVRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final DeleteKVRequestData data; + + public DeleteKVRequest(DeleteKVRequestData data, short version) { + super(ApiKeys.DELETE_KV, version); + this.data = data; + } + + @Override + public CreateStreamResponse getErrorResponse(int throttleTimeMs, Throwable e) { + ApiError apiError = ApiError.fromThrowable(e); + CreateStreamResponseData response = new CreateStreamResponseData() + .setErrorCode(apiError.error().code()) + .setThrottleTimeMs(throttleTimeMs); + return new CreateStreamResponse(response); + } + + @Override + public DeleteKVRequestData data() { + return data; + } + + public static DeleteKVRequest parse(ByteBuffer buffer, short version) { + return new DeleteKVRequest(new DeleteKVRequestData( + new ByteBufferAccessor(buffer), version), version); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/s3/DeleteKVResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/DeleteKVResponse.java new file mode 100644 index 0000000000..14909de5b8 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/DeleteKVResponse.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests.s3; + +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.kafka.common.message.DeleteKVResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; + +public class DeleteKVResponse extends AbstractResponse { + private final DeleteKVResponseData data; + + public DeleteKVResponse(DeleteKVResponseData data) { + super(ApiKeys.DELETE_KV); + this.data = data; + } + + @Override + public DeleteKVResponseData data() { + return data; + } + + @Override + public Map errorCounts() { + return errorCounts(Errors.forCode(data.errorCode())); + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public void maybeSetThrottleTimeMs(int throttleTimeMs) { + data.setThrottleTimeMs(throttleTimeMs); + } + + public static DeleteKVResponse parse(ByteBuffer buffer, short version) { + return new DeleteKVResponse(new DeleteKVResponseData( + new ByteBufferAccessor(buffer), version)); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/s3/GetKVRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/GetKVRequest.java new file mode 100644 index 0000000000..45184bd6c8 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/GetKVRequest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests.s3; + +import java.nio.ByteBuffer; +import org.apache.kafka.common.message.CreateStreamResponseData; +import org.apache.kafka.common.message.GetKVRequestData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.ApiError; + +public class GetKVRequest extends AbstractRequest { + + public static class Builder extends AbstractRequest.Builder { + + private final GetKVRequestData data; + public Builder(GetKVRequestData data) { + super(ApiKeys.GET_KV); + this.data = data; + } + + @Override + public GetKVRequest build(short version) { + return new GetKVRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final GetKVRequestData data; + + public GetKVRequest(GetKVRequestData data, short version) { + super(ApiKeys.GET_KV, version); + this.data = data; + } + + @Override + public CreateStreamResponse getErrorResponse(int throttleTimeMs, Throwable e) { + ApiError apiError = ApiError.fromThrowable(e); + CreateStreamResponseData response = new CreateStreamResponseData() + .setErrorCode(apiError.error().code()) + .setThrottleTimeMs(throttleTimeMs); + return new CreateStreamResponse(response); + } + + @Override + public GetKVRequestData data() { + return data; + } + + public static GetKVRequest parse(ByteBuffer buffer, short version) { + return new GetKVRequest(new GetKVRequestData( + new ByteBufferAccessor(buffer), version), version); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/s3/GetKVResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/GetKVResponse.java new file mode 100644 index 0000000000..248a0a33a8 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/GetKVResponse.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests.s3; + +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.kafka.common.message.GetKVResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; + +public class GetKVResponse extends AbstractResponse { + + private final GetKVResponseData data; + + public GetKVResponse(GetKVResponseData data) { + super(ApiKeys.GET_KV); + this.data = data; + } + + @Override + public GetKVResponseData data() { + return data; + } + + @Override + public Map errorCounts() { + return errorCounts(Errors.forCode(data.errorCode())); + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public void maybeSetThrottleTimeMs(int throttleTimeMs) { + data.setThrottleTimeMs(throttleTimeMs); + } + + public static GetKVResponse parse(ByteBuffer buffer, short version) { + return new GetKVResponse(new GetKVResponseData( + new ByteBufferAccessor(buffer), version)); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/s3/PutKVRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/PutKVRequest.java new file mode 100644 index 0000000000..11496b74c3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/PutKVRequest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests.s3; + +import java.nio.ByteBuffer; +import org.apache.kafka.common.message.CreateStreamResponseData; +import org.apache.kafka.common.message.PutKVRequestData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.ApiError; + +public class PutKVRequest extends AbstractRequest { + public static class Builder extends AbstractRequest.Builder { + + private final PutKVRequestData data; + public Builder(PutKVRequestData data) { + super(ApiKeys.PUT_KV); + this.data = data; + } + + @Override + public PutKVRequest build(short version) { + return new PutKVRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final PutKVRequestData data; + + public PutKVRequest(PutKVRequestData data, short version) { + super(ApiKeys.PUT_KV, version); + this.data = data; + } + + @Override + public CreateStreamResponse getErrorResponse(int throttleTimeMs, Throwable e) { + ApiError apiError = ApiError.fromThrowable(e); + CreateStreamResponseData response = new CreateStreamResponseData() + .setErrorCode(apiError.error().code()) + .setThrottleTimeMs(throttleTimeMs); + return new CreateStreamResponse(response); + } + + @Override + public PutKVRequestData data() { + return data; + } + + public static PutKVRequest parse(ByteBuffer buffer, short version) { + return new PutKVRequest(new PutKVRequestData( + new ByteBufferAccessor(buffer), version), version); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/s3/PutKVResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/PutKVResponse.java new file mode 100644 index 0000000000..f0e988c9c5 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/PutKVResponse.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests.s3; + +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.kafka.common.message.PutKVResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; + +public class PutKVResponse extends AbstractResponse { + private final PutKVResponseData data; + + public PutKVResponse(PutKVResponseData data) { + super(ApiKeys.PUT_KV); + this.data = data; + } + + @Override + public PutKVResponseData data() { + return data; + } + + @Override + public Map errorCounts() { + return errorCounts(Errors.forCode(data.errorCode())); + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public void maybeSetThrottleTimeMs(int throttleTimeMs) { + data.setThrottleTimeMs(throttleTimeMs); + } + + public static PutKVResponse parse(ByteBuffer buffer, short version) { + return new PutKVResponse(new PutKVResponseData( + new ByteBufferAccessor(buffer), version)); + } +} diff --git a/clients/src/main/resources/common/message/DeleteKVRequest.json b/clients/src/main/resources/common/message/DeleteKVRequest.json new file mode 100644 index 0000000000..3f27a9e36e --- /dev/null +++ b/clients/src/main/resources/common/message/DeleteKVRequest.json @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 511, + "type": "request", + "listeners": [ + "controller", + "broker" + ], + "name": "DeleteKVRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "Keys", + "type": "[]string", + "versions": "0+", + "about": "Keys" + } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/DeleteKVResponse.json b/clients/src/main/resources/common/message/DeleteKVResponse.json new file mode 100644 index 0000000000..366e5e85d2 --- /dev/null +++ b/clients/src/main/resources/common/message/DeleteKVResponse.json @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 511, + "type": "response", + "name": "DeleteKVResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "ErrorCode", + "type": "int16", + "versions": "0+", + "about": "The top level response error code" + }, + { + "name": "ThrottleTimeMs", + "type": "int32", + "versions": "0+", + "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." + } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/GetKVRequest.json b/clients/src/main/resources/common/message/GetKVRequest.json new file mode 100644 index 0000000000..1cfa335701 --- /dev/null +++ b/clients/src/main/resources/common/message/GetKVRequest.json @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 509, + "type": "request", + "listeners": [ + "controller", + "broker" + ], + "name": "GetKVRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "Keys", + "type": "[]string", + "versions": "0+", + "about": "Keys" + } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/GetKVResponse.json b/clients/src/main/resources/common/message/GetKVResponse.json new file mode 100644 index 0000000000..df8768e7bc --- /dev/null +++ b/clients/src/main/resources/common/message/GetKVResponse.json @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 509, + "type": "response", + "name": "GetKVResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "ErrorCode", + "type": "int16", + "versions": "0+", + "about": "The top level response error code" + }, + { + "name": "ThrottleTimeMs", + "type": "int32", + "versions": "0+", + "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." + }, + { + "name": "KeyValues", + "type": "[]KeyValue", + "versions": "0+", + "about": "The key-values", + "fields": [ + { + "name": "Key", + "type": "string", + "versions": "0+", + "about": "Key" + }, + { + "name": "Value", + "type": "bytes", + "versions": "0+", + "nullableVersions": "0+", + "about": "Value" + } + ] + } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/PutKVRequest.json b/clients/src/main/resources/common/message/PutKVRequest.json new file mode 100644 index 0000000000..f24f55a722 --- /dev/null +++ b/clients/src/main/resources/common/message/PutKVRequest.json @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 510, + "type": "request", + "listeners": [ + "controller", + "broker" + ], + "name": "PutKVRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "KeyValues", + "type": "[]KeyValue", + "versions": "0+", + "about": "Key-values", + "fields": [ + { + "name": "Key", + "type": "string", + "versions": "0+", + "about": "Key" + }, + { + "name": "Value", + "type": "bytes", + "versions": "0+", + "about": "Value" + } + ] + } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/PutKVResponse.json b/clients/src/main/resources/common/message/PutKVResponse.json new file mode 100644 index 0000000000..7665aa0660 --- /dev/null +++ b/clients/src/main/resources/common/message/PutKVResponse.json @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 510, + "type": "response", + "name": "PutKVResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "ErrorCode", + "type": "int16", + "versions": "0+", + "about": "The top level response error code" + }, + { + "name": "ThrottleTimeMs", + "type": "int32", + "versions": "0+", + "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." + } + ] +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/s3/ControllerKVClient.java b/core/src/main/scala/kafka/log/s3/ControllerKVClient.java new file mode 100644 index 0000000000..7270ac0f8b --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/ControllerKVClient.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3; + +import com.automq.elasticstream.client.api.KVClient; +import com.automq.elasticstream.client.api.KeyValue; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import kafka.log.s3.network.ControllerRequestSender; +import org.apache.kafka.common.message.DeleteKVRequestData; +import org.apache.kafka.common.message.GetKVRequestData; +import org.apache.kafka.common.message.GetKVResponseData; +import org.apache.kafka.common.message.PutKVRequestData; +import org.apache.kafka.common.message.PutKVResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.s3.DeleteKVRequest; +import org.apache.kafka.common.requests.s3.GetKVRequest; +import org.apache.kafka.common.requests.s3.PutKVRequest; +import org.apache.kafka.common.requests.s3.PutKVRequest.Builder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ControllerKVClient implements KVClient { + + private static final Logger LOGGER = LoggerFactory.getLogger(ControllerKVClient.class); + private final ControllerRequestSender requestSender; + + public ControllerKVClient(ControllerRequestSender requestSender) { + this.requestSender = requestSender; + } + + @Override + public CompletableFuture putKV(List list) { + LOGGER.trace("[ControllerKVClient]: Put KV: {}", list); + PutKVRequest.Builder requestBuilder = new Builder( + new PutKVRequestData() + .setKeyValues(list.stream().map(kv -> new PutKVRequestData.KeyValue() + .setKey(kv.key()) + .setValue(kv.value().array()) + ).collect(Collectors.toList())) + ); + return this.requestSender.send(requestBuilder, PutKVResponseData.class) + .thenApply(resp -> { + Errors code = Errors.forCode(resp.errorCode()); + switch (code) { + case NONE: + LOGGER.trace("[ControllerKVClient]: Put KV: {}, result: {}", list, resp); + return null; + default: + LOGGER.error("[ControllerKVClient]: Failed to Put KV: {}, code: {}", list, code); + throw code.exception(); + } + }); + } + + @Override + public CompletableFuture> getKV(List list) { + LOGGER.trace("[ControllerKVClient]: Get KV: {}", list); + GetKVRequest.Builder requestBuilder = new GetKVRequest.Builder( + new GetKVRequestData() + .setKeys(list) + ); + return this.requestSender.send(requestBuilder, GetKVResponseData.class) + .thenApply(resp -> { + Errors code = Errors.forCode(resp.errorCode()); + switch (code) { + case NONE: + List keyValues = resp.keyValues() + .stream() + .map(kv -> KeyValue.of(kv.key(), kv.value() != null ? ByteBuffer.wrap(kv.value()) : null)) + .collect(Collectors.toList()); + LOGGER.trace("[ControllerKVClient]: Get KV: {}, result: {}", list, keyValues); + return keyValues; + default: + LOGGER.error("[ControllerKVClient]: Failed to Get KV: {}, code: {}", String.join(",", list), code); + throw code.exception(); + } + }); + } + + @Override + public CompletableFuture delKV(List list) { + LOGGER.trace("[ControllerKVClient]: Delete KV: {}", String.join(",", list)); + DeleteKVRequest.Builder requestBuilder = new DeleteKVRequest.Builder( + new DeleteKVRequestData() + .setKeys(list) + ); + return this.requestSender.send(requestBuilder, PutKVResponseData.class) + .thenApply(resp -> { + Errors code = Errors.forCode(resp.errorCode()); + switch (code) { + case NONE: + LOGGER.trace("[ControllerKVClient]: Delete KV: {}, result: {}", list, resp); + return null; + default: + LOGGER.error("[ControllerKVClient]: Failed to Delete KV: {}, code: {}", String.join(",", list), code); + throw code.exception(); + } + }); + } +} diff --git a/core/src/main/scala/kafka/log/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java index 051bd1b0d0..4a427065a1 100644 --- a/core/src/main/scala/kafka/log/s3/DefaultS3Client.java +++ b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java @@ -20,7 +20,6 @@ import com.automq.elasticstream.client.api.Client; import com.automq.elasticstream.client.api.KVClient; import com.automq.elasticstream.client.api.StreamClient; -import kafka.log.es.MemoryClient.KVClientImpl; import kafka.log.s3.cache.DefaultS3BlockCache; import kafka.log.s3.cache.S3BlockCache; import kafka.log.s3.network.ControllerRequestSender; @@ -64,7 +63,7 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig config, S3Operator this.blockCache = new DefaultS3BlockCache(objectManager, operator); this.storage = new S3Storage(config, new MemoryWriteAheadLog(), objectManager, blockCache, operator); this.streamClient = new S3StreamClient(this.streamManager, this.storage); - this.kvClient = new KVClientImpl(); + this.kvClient = new ControllerKVClient(this.requestSender); } @Override diff --git a/core/src/main/scala/kafka/log/s3/KRaftKVClient.java b/core/src/main/scala/kafka/log/s3/KRaftKVClient.java deleted file mode 100644 index 240bae215a..0000000000 --- a/core/src/main/scala/kafka/log/s3/KRaftKVClient.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log.s3; - -import com.automq.elasticstream.client.api.KVClient; -import com.automq.elasticstream.client.api.KeyValue; - -import java.util.List; -import java.util.concurrent.CompletableFuture; - -public class KRaftKVClient implements KVClient { - - @Override - public CompletableFuture putKV(List list) { - return null; - } - - @Override - public CompletableFuture> getKV(List list) { - return null; - } - - @Override - public CompletableFuture delKV(List list) { - return null; - } -} diff --git a/core/src/main/scala/kafka/log/s3/S3Stream.java b/core/src/main/scala/kafka/log/s3/S3Stream.java index 2dceb750af..9f62f442ab 100644 --- a/core/src/main/scala/kafka/log/s3/S3Stream.java +++ b/core/src/main/scala/kafka/log/s3/S3Stream.java @@ -118,7 +118,7 @@ private CompletableFuture fetch0(long startOffset, long endOffset, if (status.isClosed()) { return FutureUtil.failedFuture(new ElasticStreamClientException(ErrorCode.STREAM_ALREADY_CLOSED, logIdent + " stream is already closed")); } - LOGGER.info("{} stream try fetch, startOffset: {}, endOffset: {}, maxBytes: {}", logIdent, startOffset, endOffset, maxBytes); + LOGGER.trace("{} stream try fetch, startOffset: {}, endOffset: {}, maxBytes: {}", logIdent, startOffset, endOffset, maxBytes); long confirmOffset = this.confirmOffset.get(); if (startOffset < startOffset() || endOffset > confirmOffset) { return FutureUtil.failedFuture( @@ -129,7 +129,7 @@ private CompletableFuture fetch0(long startOffset, long endOffset, } return storage.read(streamId, startOffset, endOffset, maxBytes).thenApply(dataBlock -> { List records = dataBlock.getRecords().stream().map(r -> new RecordBatchWithContextWrapper(r.getRecordBatch(), r.getBaseOffset())).collect(Collectors.toList()); - LOGGER.info("{} stream fetch, startOffset: {}, endOffset: {}, maxBytes: {}, records: {}", logIdent, startOffset, endOffset, maxBytes, records.size()); + LOGGER.trace("{} stream fetch, startOffset: {}, endOffset: {}, maxBytes: {}, records: {}", logIdent, startOffset, endOffset, maxBytes, records.size()); return new DefaultFetchResult(records); }); } @@ -178,7 +178,7 @@ private void updateConfirmOffset(long newOffset) { break; } if (confirmOffset.compareAndSet(oldConfirmOffset, newOffset)) { - LOGGER.info("{} stream update confirm offset from {} to {}", logIdent, oldConfirmOffset, newOffset); + LOGGER.trace("{} stream update confirm offset from {} to {}", logIdent, oldConfirmOffset, newOffset); break; } } diff --git a/core/src/main/scala/kafka/log/s3/StreamMetadataManager.java b/core/src/main/scala/kafka/log/s3/StreamMetadataManager.java index b5e8983235..9ca3677f0a 100644 --- a/core/src/main/scala/kafka/log/s3/StreamMetadataManager.java +++ b/core/src/main/scala/kafka/log/s3/StreamMetadataManager.java @@ -92,7 +92,7 @@ public synchronized List getObjects(long streamId, long startO } obj.setObjectSize(metadata.getObjectSize()); }); - LOGGER.info( + LOGGER.trace( "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and return all from metadataCache: startOffset: {}, endOffset: {}, object count: {}", streamId, startOffset, endOffset, limit, cachedInRangeObjects.startOffset(), cachedInRangeObjects.endOffset(), objects.size()); @@ -108,7 +108,7 @@ public synchronized List getObjects(long streamId, long startO return objects; } objects.addAll(inflightInRangeObjects.objects()); - LOGGER.info( + LOGGER.trace( "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and return all from inflight: startOffset: {}, endOffset: {}, object count: {}", streamId, startOffset, endOffset, limit, inflightInRangeObjects.startOffset(), inflightInRangeObjects.endOffset(), objects.size()); @@ -131,7 +131,7 @@ public synchronized List getObjects(long streamId, long startO obj.setObjectSize(metadata.getObjectSize()); }); if (objects.size() >= limit) { - LOGGER.info( + LOGGER.trace( "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and return all from metadataCache: startOffset: {}, endOffset: {}, object count: {}", streamId, startOffset, endOffset, limit, cachedInRangeObjects.startOffset(), cachedInRangeObjects.endOffset(), objects.size()); @@ -145,7 +145,7 @@ public synchronized List getObjects(long streamId, long startO return objects; } objects.addAll(inflightinRangeObjects.objects()); - LOGGER.info( + LOGGER.trace( "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and return all from metadataCache and inflight: startOffset: {}, endOffset: {}, object count: {}", streamId, startOffset, endOffset, limit, cachedInRangeObjects.startOffset(), inflightinRangeObjects.endOffset(), objects.size()); @@ -237,7 +237,7 @@ public void append(InflightWalObject object) { if (objects.size() == 1) { firstObjectId = object.objectId(); } - log.info("[AppendInflight]: append wal object: {}", object.objectId()); + log.trace("[AppendInflight]: append wal object: {}", object.objectId()); object.streamsIndex().forEach((stream, indexes) -> { // wal object only contains one index for each stream streamOffsets.putIfAbsent(stream, new OffsetRange(indexes.get(0).getStartOffset(), indexes.get(indexes.size() - 1).getEndOffset())); @@ -246,7 +246,7 @@ public void append(InflightWalObject object) { } public void trim(long objectId) { - log.info("[TrimInflight]: trim wal object <= {}", objectId); + log.trace("[TrimInflight]: trim wal object <= {}", objectId); // TODO: speed up by binary search int clearEndIndex = objects.size(); for (int i = 0; i < objects.size(); i++) { @@ -324,12 +324,10 @@ class CatchUpMetadataListener implements StreamMetadataListener { public void onChange(MetadataDelta delta, MetadataImage newImage) { BrokerS3WALMetadataImage walMetadataImage = newImage.streamsMetadata().brokerWALMetadata().get(config.brokerId()); if (walMetadataImage == null) { - LOGGER.warn("[CatchUpMetadataListener]: wal metadata image not exist"); return; } S3WALObject wal = walMetadataImage.getWalObjects().get(walMetadataImage.getWalObjects().size() - 1); if (wal == null) { - LOGGER.warn("[CatchUpMetadataListener]: wal object not exist"); return; } if (wal.objectId() < inflightWalObjects.firstObjectId) { diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index 31e04152c9..f21f9cc686 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -42,7 +42,7 @@ import org.apache.kafka.common.message.{CreateTopicsRequestData, _} import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors} import org.apache.kafka.common.requests._ -import org.apache.kafka.common.requests.s3.{CloseStreamRequest, CloseStreamResponse, CommitStreamObjectRequest, CommitStreamObjectResponse, CommitWALObjectRequest, CommitWALObjectResponse, CreateStreamRequest, CreateStreamResponse, DeleteStreamRequest, DeleteStreamResponse, GetStreamsOffsetRequest, GetStreamsOffsetResponse, OpenStreamRequest, OpenStreamResponse, PrepareS3ObjectRequest, PrepareS3ObjectResponse} +import org.apache.kafka.common.requests.s3.{CloseStreamRequest, CloseStreamResponse, CommitStreamObjectRequest, CommitStreamObjectResponse, CommitWALObjectRequest, CommitWALObjectResponse, CreateStreamRequest, CreateStreamResponse, DeleteKVRequest, DeleteKVResponse, DeleteStreamRequest, DeleteStreamResponse, GetKVRequest, GetKVResponse, GetStreamsOffsetRequest, GetStreamsOffsetResponse, OpenStreamRequest, OpenStreamResponse, PrepareS3ObjectRequest, PrepareS3ObjectResponse, PutKVRequest, PutKVResponse} import org.apache.kafka.common.resource.Resource.CLUSTER_NAME import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC} import org.apache.kafka.common.utils.Time @@ -119,6 +119,9 @@ class ControllerApis(val requestChannel: RequestChannel, case ApiKeys.COMMIT_WALOBJECT => handleCommitWALObject(request) case ApiKeys.COMMIT_STREAM_OBJECT => handleCommitStreamObject(request) case ApiKeys.GET_STREAMS_OFFSET => handleGetStreamsOffset(request) + case ApiKeys.GET_KV => handleGetKV(request) + case ApiKeys.PUT_KV => handlePutKV(request) + case ApiKeys.DELETE_KV => handleDeleteKV(request) // Kafka on S3 inject end case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}") } @@ -1005,5 +1008,71 @@ class ControllerApis(val requestChannel: RequestChannel, } } + def handleGetKV(request: RequestChannel.Request): CompletableFuture[Unit] = { + val getKVRequest = request.body[GetKVRequest] + val context = new ControllerRequestContext(request.context.header.data, request.context.principal, + OptionalLong.empty()) + controller.getKV(context, getKVRequest.data) + .handle[Unit] { (result, exception) => + if (exception != null) { + requestHelper.handleError(request, exception) + } else if (result == null) { + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { + new GetKVResponse(new GetKVResponseData(). + setThrottleTimeMs(requestThrottleMs). + setErrorCode(UNKNOWN_SERVER_ERROR.code)) + }) + } else { + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { + new GetKVResponse(result.setThrottleTimeMs(requestThrottleMs)) + }) + } + } + } + + def handlePutKV(request: RequestChannel.Request): CompletableFuture[Unit] = { + val putKVRequest = request.body[PutKVRequest] + val context = new ControllerRequestContext(request.context.header.data, request.context.principal, + OptionalLong.empty()) + controller.putKV(context, putKVRequest.data) + .handle[Unit] { (result, exception) => + if (exception != null) { + requestHelper.handleError(request, exception) + } else if (result == null) { + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { + new PutKVResponse(new PutKVResponseData(). + setThrottleTimeMs(requestThrottleMs). + setErrorCode(UNKNOWN_SERVER_ERROR.code)) + }) + } else { + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { + new PutKVResponse(result.setThrottleTimeMs(requestThrottleMs)) + }) + } + } + } + + def handleDeleteKV(request: RequestChannel.Request): CompletableFuture[Unit] = { + val deleteKVRequest = request.body[DeleteKVRequest] + val context = new ControllerRequestContext(request.context.header.data, request.context.principal, + OptionalLong.empty()) + controller.deleteKV(context, deleteKVRequest.data) + .handle[Unit] { (result, exception) => + if (exception != null) { + requestHelper.handleError(request, exception) + } else if (result == null) { + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { + new DeleteKVResponse(new DeleteKVResponseData(). + setThrottleTimeMs(requestThrottleMs). + setErrorCode(UNKNOWN_SERVER_ERROR.code)) + }) + } else { + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { + new DeleteKVResponse(result.setThrottleTimeMs(requestThrottleMs)) + }) + } + } + } + // Kafka on S3 inject end } diff --git a/core/src/test/java/kafka/test/MockController.java b/core/src/test/java/kafka/test/MockController.java index f1beba9bd1..71ab0eaa91 100644 --- a/core/src/test/java/kafka/test/MockController.java +++ b/core/src/test/java/kafka/test/MockController.java @@ -45,10 +45,14 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; import org.apache.kafka.common.message.CreateTopicsResponseData; import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.DeleteKVRequestData; +import org.apache.kafka.common.message.DeleteKVResponseData; import org.apache.kafka.common.message.DeleteStreamRequestData; import org.apache.kafka.common.message.DeleteStreamResponseData; import org.apache.kafka.common.message.ElectLeadersRequestData; import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.message.GetKVRequestData; +import org.apache.kafka.common.message.GetKVResponseData; import org.apache.kafka.common.message.GetStreamsOffsetRequestData; import org.apache.kafka.common.message.GetStreamsOffsetResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; @@ -57,6 +61,8 @@ import org.apache.kafka.common.message.OpenStreamResponseData; import org.apache.kafka.common.message.PrepareS3ObjectRequestData; import org.apache.kafka.common.message.PrepareS3ObjectResponseData; +import org.apache.kafka.common.message.PutKVRequestData; +import org.apache.kafka.common.message.PutKVResponseData; import org.apache.kafka.common.message.UpdateFeaturesRequestData; import org.apache.kafka.common.message.UpdateFeaturesResponseData; import org.apache.kafka.common.protocol.Errors; @@ -522,5 +528,21 @@ public CompletableFuture commitStreamObject(Cont public CompletableFuture getStreamsOffset(ControllerRequestContext context, GetStreamsOffsetRequestData request) { throw new UnsupportedOperationException(); } + + @Override + public CompletableFuture getKV(ControllerRequestContext context, GetKVRequestData request) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture putKV(ControllerRequestContext context, PutKVRequestData request) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture deleteKV(ControllerRequestContext context, DeleteKVRequestData request) { + throw new UnsupportedOperationException(); + } + // Kafka on S3 inject end } diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index 84de4814b8..07c0d3e6e5 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -73,7 +73,8 @@ object MetadataCacheTest { image.producerIds(), image.acls(), image.streamsMetadata(), - image.objectsMetadata()) + image.objectsMetadata(), + image.kv()) val delta = new MetadataDelta.Builder().setImage(partialImage).build() def toRecord(broker: UpdateMetadataBroker): RegisterBrokerRecord = { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index c45ec85932..7e0e50a74f 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -54,7 +54,7 @@ import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{LogContext, Time, Utils} import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} -import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, MetadataProvenance, ProducerIdsImage, S3ObjectsImage, S3StreamsMetadataImage, TopicsDelta, TopicsImage} +import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, KVImage, MetadataImage, MetadataProvenance, ProducerIdsImage, S3ObjectsImage, S3StreamsMetadataImage, TopicsDelta, TopicsImage} import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 @@ -4131,6 +4131,7 @@ class ReplicaManagerTest { AclsImage.EMPTY, S3StreamsMetadataImage.EMPTY, S3ObjectsImage.EMPTY, + KVImage.EMPTY ) } diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java index 9cf248128b..922addb8b7 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java +++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java @@ -40,10 +40,14 @@ import org.apache.kafka.common.message.CreateStreamResponseData; import org.apache.kafka.common.message.CreateTopicsRequestData; import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.DeleteKVRequestData; +import org.apache.kafka.common.message.DeleteKVResponseData; import org.apache.kafka.common.message.DeleteStreamRequestData; import org.apache.kafka.common.message.DeleteStreamResponseData; import org.apache.kafka.common.message.ElectLeadersRequestData; import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.message.GetKVRequestData; +import org.apache.kafka.common.message.GetKVResponseData; import org.apache.kafka.common.message.GetStreamsOffsetRequestData; import org.apache.kafka.common.message.GetStreamsOffsetResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; @@ -52,6 +56,8 @@ import org.apache.kafka.common.message.OpenStreamResponseData; import org.apache.kafka.common.message.PrepareS3ObjectRequestData; import org.apache.kafka.common.message.PrepareS3ObjectResponseData; +import org.apache.kafka.common.message.PutKVRequestData; +import org.apache.kafka.common.message.PutKVResponseData; import org.apache.kafka.common.message.UpdateFeaturesRequestData; import org.apache.kafka.common.message.UpdateFeaturesResponseData; import org.apache.kafka.common.quota.ClientQuotaAlteration; @@ -451,5 +457,29 @@ CompletableFuture getStreamsOffset( GetStreamsOffsetRequestData request ); + /** + * Broker trys to get value from KV store embedded in controller. + */ + CompletableFuture getKV( + ControllerRequestContext context, + GetKVRequestData request + ); + + /** + * Broker trys to put key-value into KV store embedded in controller. + */ + CompletableFuture putKV( + ControllerRequestContext context, + PutKVRequestData request + ); + + /** + * Broker trys to delete key-value from KV store embedded in controller. + */ + CompletableFuture deleteKV( + ControllerRequestContext context, + DeleteKVRequestData request + ); + // Kafka on S3 inject end } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java index 13725c0dfb..2dc2fa35a0 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java @@ -162,6 +162,8 @@ void replay(ApiMessage message) { case BROKER_WALMETADATA_RECORD: case REMOVE_BROKER_WALMETADATA_RECORD: case ADVANCE_RANGE_RECORD: + case KVRECORD: + case REMOVE_KVRECORD: // Kafka on S3 inject end // These record types do not affect metrics break; diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 6bad292a73..10093dc424 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -50,10 +50,14 @@ import org.apache.kafka.common.message.CreateStreamResponseData; import org.apache.kafka.common.message.CreateTopicsRequestData; import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.DeleteKVRequestData; +import org.apache.kafka.common.message.DeleteKVResponseData; import org.apache.kafka.common.message.DeleteStreamRequestData; import org.apache.kafka.common.message.DeleteStreamResponseData; import org.apache.kafka.common.message.ElectLeadersRequestData; import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.message.GetKVRequestData; +import org.apache.kafka.common.message.GetKVResponseData; import org.apache.kafka.common.message.GetStreamsOffsetRequestData; import org.apache.kafka.common.message.GetStreamsOffsetResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; @@ -62,6 +66,8 @@ import org.apache.kafka.common.message.OpenStreamResponseData; import org.apache.kafka.common.message.PrepareS3ObjectRequestData; import org.apache.kafka.common.message.PrepareS3ObjectResponseData; +import org.apache.kafka.common.message.PutKVRequestData; +import org.apache.kafka.common.message.PutKVResponseData; import org.apache.kafka.common.message.UpdateFeaturesRequestData; import org.apache.kafka.common.message.UpdateFeaturesResponseData; import org.apache.kafka.common.metadata.AccessControlEntryRecord; @@ -73,6 +79,7 @@ import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.metadata.FeatureLevelRecord; import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.KVRecord; import org.apache.kafka.common.metadata.MetadataRecordType; import org.apache.kafka.common.metadata.NoOpRecord; import org.apache.kafka.common.metadata.PartitionChangeRecord; @@ -82,6 +89,7 @@ import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord; import org.apache.kafka.common.metadata.RemoveBrokerWALMetadataRecord; +import org.apache.kafka.common.metadata.RemoveKVRecord; import org.apache.kafka.common.metadata.RemoveRangeRecord; import org.apache.kafka.common.metadata.RemoveS3ObjectRecord; import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; @@ -103,6 +111,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.controller.stream.KVControlManager; import org.apache.kafka.controller.stream.MockS3Operator; import org.apache.kafka.controller.stream.S3ObjectControlManager; import org.apache.kafka.controller.stream.StreamControlManager; @@ -1409,6 +1418,7 @@ private void handleFeatureControlChange() { * @param batchLastOffset The offset of the last record in the log batch, or the lastContainedLogOffset * if this record is from a snapshot, this is used along with RegisterBrokerRecord */ + @SuppressWarnings("all") private void replay(ApiMessage message, Optional snapshotId, long batchLastOffset) { logReplayTracker.replay(message); MetadataRecordType type = MetadataRecordType.fromId(message.apiKey()); @@ -1510,7 +1520,12 @@ private void replay(ApiMessage message, Optional snapshotId, lon case REMOVE_BROKER_WALMETADATA_RECORD: streamControlManager.replay((RemoveBrokerWALMetadataRecord) message); break; - + case KVRECORD: + kvControlManager.replay((KVRecord) message); + break; + case REMOVE_KVRECORD: + kvControlManager.replay((RemoveKVRecord) message); + break; // Kafka on S3 inject end default: throw new RuntimeException("Unhandled record type " + type); @@ -1757,6 +1772,12 @@ private enum ImbalanceSchedule { */ private final StreamControlManager streamControlManager; + /** + * An object which stores the controller's view of the KV objects. + * This must be accessed only by the event queue thread. + */ + private final KVControlManager kvControlManager; + // Kafka on S3 inject end private QuorumController( @@ -1866,6 +1887,7 @@ private QuorumController( this.s3ObjectControlManager = new S3ObjectControlManager( this, snapshotRegistry, logContext, clusterId, s3Config, new MockS3Operator()); this.streamControlManager = new StreamControlManager(snapshotRegistry, logContext, this.s3ObjectControlManager); + this.kvControlManager = new KVControlManager(snapshotRegistry, logContext); // Kafka on S3 inject end updateWriteOffset(-1); @@ -2296,6 +2318,24 @@ public CompletableFuture getStreamsOffset(Controll () -> streamControlManager.getStreamsOffset(request)); } + @Override + public CompletableFuture getKV(ControllerRequestContext context, GetKVRequestData request) { + return appendReadEvent("getKV", context.deadlineNs(), + () -> kvControlManager.getKV(request)); + } + + @Override + public CompletableFuture putKV(ControllerRequestContext context, PutKVRequestData request) { + return appendWriteEvent("putKV", context.deadlineNs(), + () -> kvControlManager.putKV(request)); + } + + @Override + public CompletableFuture deleteKV(ControllerRequestContext context, DeleteKVRequestData request) { + return appendWriteEvent("deleteKV", context.deadlineNs(), + () -> kvControlManager.deleteKV(request)); + } + // Kafka on S3 inject end // VisibleForTesting diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/KVControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/KVControlManager.java new file mode 100644 index 0000000000..96f821a645 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/KVControlManager.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller.stream; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.kafka.common.message.DeleteKVRequestData; +import org.apache.kafka.common.message.DeleteKVResponseData; +import org.apache.kafka.common.message.GetKVRequestData; +import org.apache.kafka.common.message.GetKVResponseData; +import org.apache.kafka.common.message.PutKVRequestData; +import org.apache.kafka.common.message.PutKVResponseData; +import org.apache.kafka.common.metadata.KVRecord; +import org.apache.kafka.common.metadata.KVRecord.KeyValue; +import org.apache.kafka.common.metadata.RemoveKVRecord; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.ControllerResult; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +public class KVControlManager { + + private final SnapshotRegistry registry; + private final Logger log; + private final TimelineHashMap kv; + + public KVControlManager(SnapshotRegistry registry, LogContext logContext) { + this.registry = registry; + this.log = logContext.logger(KVControlManager.class); + this.kv = new TimelineHashMap<>(registry, 0); + } + + public GetKVResponseData getKV(GetKVRequestData request) { + GetKVResponseData resp = new GetKVResponseData(); + resp.setKeyValues( + request.keys().stream().map(key -> { + byte[] value = kv.containsKey(key) ? kv.get(key).array() : null; + return new GetKVResponseData.KeyValue() + .setKey(key) + .setValue(value); + }).collect(Collectors.toList())); + log.trace("GetKVResponseData: req: {}, resp: {}", request, resp); + // generate kv record + return resp; + } + + public ControllerResult putKV(PutKVRequestData request) { + log.trace("PutKVRequestData: {}", request); + PutKVResponseData resp = new PutKVResponseData(); + // generate kv record + ApiMessageAndVersion record = new ApiMessageAndVersion(new KVRecord() + .setKeyValues(request.keyValues().stream().map(kv -> new KeyValue() + .setKey(kv.key()) + .setValue(kv.value()) + ).collect(Collectors.toList())), (short) 0); + return ControllerResult.of(Collections.singletonList(record), resp); + } + + public ControllerResult deleteKV(DeleteKVRequestData request) { + log.trace("DeleteKVRequestData: {}", request); + DeleteKVResponseData resp = new DeleteKVResponseData(); + // generate remove-kv record + ApiMessageAndVersion record = new ApiMessageAndVersion(new RemoveKVRecord() + .setKeys(request.keys()), (short) 0); + return ControllerResult.of(Collections.singletonList(record), resp); + } + + public void replay(KVRecord record) { + List keyValues = record.keyValues(); + for (KeyValue keyValue : keyValues) { + kv.put(keyValue.key(), ByteBuffer.wrap(keyValue.value())); + } + } + + public void replay(RemoveKVRecord record) { + List keys = record.keys(); + for (String key : keys) { + kv.remove(key); + } + } + + public Map kv() { + return kv; + } +} diff --git a/metadata/src/main/java/org/apache/kafka/image/KVDelta.java b/metadata/src/main/java/org/apache/kafka/image/KVDelta.java new file mode 100644 index 0000000000..21d83a1f17 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/image/KVDelta.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.common.metadata.KVRecord; +import org.apache.kafka.common.metadata.RemoveKVRecord; + +public final class KVDelta { + private final KVImage image; + + private final Map changedKV = new HashMap<>(); + private final Set removedKeys = new HashSet<>(); + + public KVDelta(KVImage image) { + this.image = image; + } + + public KVImage image() { + return image; + } + + public void replay(KVRecord record) { + record.keyValues().forEach(keyValue -> { + changedKV.put(keyValue.key(), ByteBuffer.wrap(keyValue.value())); + removedKeys.remove(keyValue.key()); + }); + } + + public void replay(RemoveKVRecord record) { + record.keys().forEach(key -> { + removedKeys.add(key); + changedKV.remove(key); + }); + } + + public KVImage apply() { + Map newKV = new HashMap<>(image.kv()); + newKV.putAll(changedKV); + removedKeys.forEach(newKV::remove); + return new KVImage(newKV); + } + +} diff --git a/metadata/src/main/java/org/apache/kafka/image/KVImage.java b/metadata/src/main/java/org/apache/kafka/image/KVImage.java new file mode 100644 index 0000000000..5529b175e0 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/image/KVImage.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.kafka.common.metadata.KVRecord; +import org.apache.kafka.common.metadata.KVRecord.KeyValue; +import org.apache.kafka.image.writer.ImageWriter; +import org.apache.kafka.image.writer.ImageWriterOptions; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +public final class KVImage { + + public static final KVImage EMPTY = new KVImage(Collections.emptyMap()); + + private final Map kv; + + public KVImage(final Map kv) { + this.kv = kv; + } + + public Map kv() { + return kv; + } + + public void write(ImageWriter writer, ImageWriterOptions options) { + List kvs = kv.entrySet().stream().map(kv -> { + return new KeyValue() + .setKey(kv.getKey()) + .setValue(kv.getValue().array()); + }).collect(Collectors.toList()); + writer.write(new ApiMessageAndVersion(new KVRecord() + .setKeyValues(kvs), (short) 0)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + KVImage kvImage = (KVImage) o; + return kv.equals(kvImage.kv); + } + + public boolean isEmpty() { + return kv.isEmpty(); + } + + @Override + public int hashCode() { + return Objects.hash(kv); + } +} diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java index 81f61eeaed..db97072b87 100644 --- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.metadata.FeatureLevelRecord; import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.KVRecord; import org.apache.kafka.common.metadata.MetadataRecordType; import org.apache.kafka.common.metadata.PartitionChangeRecord; import org.apache.kafka.common.metadata.PartitionRecord; @@ -34,6 +35,7 @@ import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord; import org.apache.kafka.common.metadata.RemoveBrokerWALMetadataRecord; +import org.apache.kafka.common.metadata.RemoveKVRecord; import org.apache.kafka.common.metadata.RemoveRangeRecord; import org.apache.kafka.common.metadata.RemoveS3ObjectRecord; import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; @@ -93,6 +95,8 @@ public MetadataDelta build() { private S3ObjectsDelta s3ObjectsDelta = null; + private KVDelta kvDelta = null; + // Kafka on S3 inject end public MetadataDelta(MetadataImage image) { @@ -204,6 +208,13 @@ public S3ObjectsDelta getOrCreateObjectsMetadataDelta() { return s3ObjectsDelta; } + public KVDelta getOrCreateKVDelta() { + if (kvDelta == null) { + kvDelta = new KVDelta(image.kv()); + } + return kvDelta; + } + // Kafka on S3 inject end public Optional metadataVersionChanged() { @@ -214,6 +225,7 @@ public Optional metadataVersionChanged() { } } + @SuppressWarnings("all") public void replay(ApiMessage record) { MetadataRecordType type = MetadataRecordType.fromId(record.apiKey()); switch (type) { @@ -313,6 +325,12 @@ public void replay(ApiMessage record) { case REMOVE_BROKER_WALMETADATA_RECORD: replay((RemoveBrokerWALMetadataRecord) record); break; + case KVRECORD: + replay((KVRecord) record); + break; + case REMOVE_KVRECORD: + replay((RemoveKVRecord) record); + break; // Kafka on S3 inject end default: throw new RuntimeException("Unknown metadata record type " + type); @@ -447,6 +465,14 @@ public void replay(RemoveBrokerWALMetadataRecord record) { getOrCreateStreamsMetadataDelta().replay(record); } + public void replay(KVRecord record) { + getOrCreateKVDelta().replay(record); + } + + public void replay(RemoveKVRecord record) { + getOrCreateKVDelta().replay(record); + } + // Kafka on S3 inject end /** @@ -460,6 +486,7 @@ public void finishSnapshot() { getOrCreateClientQuotasDelta().finishSnapshot(); getOrCreateProducerIdsDelta().finishSnapshot(); getOrCreateAclsDelta().finishSnapshot(); + // TODO: fill new added delta } public MetadataImage apply(MetadataProvenance provenance) { @@ -509,6 +536,7 @@ public MetadataImage apply(MetadataProvenance provenance) { // Kafka on S3 inject start S3StreamsMetadataImage newStreamMetadata = getNewS3StreamsMetadataImage(); S3ObjectsImage newS3ObjectsMetadata = getNewS3ObjectsMetadataImage(); + KVImage newKVImage = getNewKVImage(); // Kafka on S3 inject end return new MetadataImage( provenance, @@ -520,7 +548,8 @@ public MetadataImage apply(MetadataProvenance provenance) { newProducerIds, newAcls, newStreamMetadata, - newS3ObjectsMetadata + newS3ObjectsMetadata, + newKVImage ); } @@ -536,6 +565,11 @@ private S3ObjectsImage getNewS3ObjectsMetadataImage() { image.objectsMetadata() : s3ObjectsDelta.apply(); } + private KVImage getNewKVImage() { + return kvDelta == null ? + image.kv() : kvDelta.apply(); + } + // Kafka on S3 inject end @Override diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java index 744d6f041a..6cd7e0da88 100644 --- a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java @@ -40,7 +40,8 @@ public final class MetadataImage { ProducerIdsImage.EMPTY, AclsImage.EMPTY, S3StreamsMetadataImage.EMPTY, - S3ObjectsImage.EMPTY); + S3ObjectsImage.EMPTY, + KVImage.EMPTY); private final MetadataProvenance provenance; @@ -64,6 +65,8 @@ public final class MetadataImage { private final S3ObjectsImage objectsMetadata; + private final KVImage kv; + // Kafka on S3 inject end public MetadataImage( @@ -76,7 +79,8 @@ public MetadataImage( ProducerIdsImage producerIds, AclsImage acls, S3StreamsMetadataImage streamMetadata, - S3ObjectsImage s3ObjectsImage + S3ObjectsImage s3ObjectsImage, + KVImage kvImage ) { this.provenance = provenance; this.features = features; @@ -88,6 +92,7 @@ public MetadataImage( this.acls = acls; this.streamMetadata = streamMetadata; this.objectsMetadata = s3ObjectsImage; + this.kv = kvImage; } public boolean isEmpty() { @@ -98,7 +103,9 @@ public boolean isEmpty() { clientQuotas.isEmpty() && producerIds.isEmpty() && acls.isEmpty() && - streamMetadata.isEmpty(); + streamMetadata.isEmpty() && + objectsMetadata.isEmpty() && + kv.isEmpty(); } public MetadataProvenance provenance() { @@ -151,6 +158,10 @@ public S3ObjectsImage objectsMetadata() { return objectsMetadata; } + public KVImage kv() { + return kv; + } + // Kafka on S3 inject end public void write(ImageWriter writer, ImageWriterOptions options) { @@ -166,6 +177,7 @@ public void write(ImageWriter writer, ImageWriterOptions options) { // Kafka on S3 inject start streamMetadata.write(writer, options); objectsMetadata.write(writer, options); + kv.write(writer, options); // Kafka on S3 inject end writer.close(true); } @@ -183,7 +195,8 @@ public boolean equals(Object o) { producerIds.equals(other.producerIds) && acls.equals(other.acls) && streamMetadata.equals(other.streamMetadata) && - objectsMetadata.equals(other.objectsMetadata); + objectsMetadata.equals(other.objectsMetadata) && + kv.equals(other.kv); } @Override @@ -198,7 +211,8 @@ public int hashCode() { producerIds, acls, streamMetadata, - objectsMetadata); + objectsMetadata, + kv); } @Override @@ -214,6 +228,7 @@ public String toString() { ", acls=" + acls + ", streamMetadata=" + streamMetadata + ", objectsMetadata=" + objectsMetadata + + ", kv=" + kv + ")"; } } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3ObjectsImage.java b/metadata/src/main/java/org/apache/kafka/image/S3ObjectsImage.java index a27f615189..637dd52a98 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3ObjectsImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3ObjectsImage.java @@ -77,6 +77,10 @@ public boolean equals(Object o) { objectsMetadata.equals(that.objectsMetadata); } + public boolean isEmpty() { + return objectsMetadata.isEmpty(); + } + @Override public int hashCode() { return Objects.hash(nextAssignedObjectId, objectsMetadata); diff --git a/metadata/src/main/resources/common/metadata/KVRecord.json b/metadata/src/main/resources/common/metadata/KVRecord.json new file mode 100644 index 0000000000..62b70c2ceb --- /dev/null +++ b/metadata/src/main/resources/common/metadata/KVRecord.json @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 516, + "type": "metadata", + "name": "KVRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "KeyValues", + "type": "[]KeyValue", + "versions": "0+", + "about": "Key-values", + "fields": [ + { + "name": "Key", + "type": "string", + "versions": "0+", + "about": "Key" + }, + { + "name": "Value", + "type": "bytes", + "versions": "0+", + "about": "Value" + } + ] + } + ] +} \ No newline at end of file diff --git a/metadata/src/main/resources/common/metadata/RemoveKVRecord.json b/metadata/src/main/resources/common/metadata/RemoveKVRecord.json new file mode 100644 index 0000000000..049d939be2 --- /dev/null +++ b/metadata/src/main/resources/common/metadata/RemoveKVRecord.json @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 517, + "type": "metadata", + "name": "RemoveKVRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "Keys", + "type": "[]string", + "versions": "0+", + "about": "Keys" + } + ] +} \ No newline at end of file diff --git a/metadata/src/test/java/org/apache/kafka/controller/KVControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/KVControlManagerTest.java new file mode 100644 index 0000000000..c8af60b892 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/controller/KVControlManagerTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.kafka.common.message.DeleteKVRequestData; +import org.apache.kafka.common.message.DeleteKVResponseData; +import org.apache.kafka.common.message.GetKVRequestData; +import org.apache.kafka.common.message.GetKVResponseData; +import org.apache.kafka.common.message.PutKVRequestData; +import org.apache.kafka.common.message.PutKVRequestData.KeyValue; +import org.apache.kafka.common.message.PutKVResponseData; +import org.apache.kafka.common.metadata.KVRecord; +import org.apache.kafka.common.metadata.MetadataRecordType; +import org.apache.kafka.common.metadata.RemoveKVRecord; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.stream.KVControlManager; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +@Timeout(40) +@Tag("S3Unit") +public class KVControlManagerTest { + + private KVControlManager manager; + + @BeforeEach + public void setUp() { + LogContext logContext = new LogContext(); + SnapshotRegistry registry = new SnapshotRegistry(logContext); + this.manager = new KVControlManager(registry, logContext); + } + + @Test + public void testBasicReadWrite() { + ControllerResult result0 = manager.putKV(new PutKVRequestData() + .setKeyValues(List.of( + new KeyValue() + .setKey("key1") + .setValue("value1".getBytes()), + new KeyValue() + .setKey("key2") + .setValue("value2".getBytes())))); + assertEquals(1, result0.records().size()); + replay(manager, result0.records()); + assertEquals(2, manager.kv().size()); + + GetKVResponseData resp1 = manager.getKV(new GetKVRequestData() + .setKeys(List.of("key1", "key2", "key3"))); + assertEquals(3, resp1.keyValues().size()); + assertEquals("key1", resp1.keyValues().get(0).key()); + assertEquals("value1", new String(resp1.keyValues().get(0).value())); + assertEquals("key2", resp1.keyValues().get(1).key()); + assertEquals("value2", new String(resp1.keyValues().get(1).value())); + assertEquals("key3", resp1.keyValues().get(2).key()); + assertNull(resp1.keyValues().get(2).value()); + + ControllerResult result2 = manager.deleteKV(new DeleteKVRequestData() + .setKeys(List.of("key1", "key3"))); + assertEquals(1, result2.records().size()); + replay(manager, result2.records()); + assertEquals(1, manager.kv().size()); + + GetKVResponseData resp3 = manager.getKV(new GetKVRequestData() + .setKeys(List.of("key1", "key2", "key3"))); + assertEquals(3, resp3.keyValues().size()); + assertEquals("key1", resp3.keyValues().get(0).key()); + assertNull(resp3.keyValues().get(0).value()); + assertEquals("key2", resp3.keyValues().get(1).key()); + assertEquals("value2", new String(resp3.keyValues().get(1).value())); + assertEquals("key3", resp3.keyValues().get(2).key()); + assertNull(resp3.keyValues().get(2).value()); + } + + private void replay(KVControlManager manager, List records) { + List messages = records.stream().map(x -> x.message()) + .collect(Collectors.toList()); + for (ApiMessage message : messages) { + MetadataRecordType type = MetadataRecordType.fromId(message.apiKey()); + switch (type) { + case KVRECORD: + manager.replay((KVRecord) message); + break; + case REMOVE_KVRECORD: + manager.replay((RemoveKVRecord) message); + break; + default: + throw new IllegalStateException("Unknown metadata record type " + type); + } + } + } +} diff --git a/metadata/src/test/java/org/apache/kafka/image/KVImageTest.java b/metadata/src/test/java/org/apache/kafka/image/KVImageTest.java new file mode 100644 index 0000000000..818a19cece --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/image/KVImageTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.metadata.KVRecord; +import org.apache.kafka.common.metadata.KVRecord.KeyValue; +import org.apache.kafka.common.metadata.RemoveKVRecord; +import org.apache.kafka.image.writer.ImageWriterOptions; +import org.apache.kafka.image.writer.RecordListWriter; +import org.apache.kafka.metadata.RecordTestUtils; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +@Timeout(40) +@Tag("S3Unit") +public class KVImageTest { + + final static KVImage IMAGE1; + + final static List DELTA1_RECORDS; + + final static KVDelta DELTA1; + + final static KVImage IMAGE2; + + + static { + Map map = Map.of( + "key1", ByteBuffer.wrap(new String("value1").getBytes()), + "key2", ByteBuffer.wrap(new String("value2").getBytes())); + IMAGE1 = new KVImage(map); + DELTA1_RECORDS = new ArrayList<>(); + DELTA1_RECORDS.add(new ApiMessageAndVersion(new KVRecord() + .setKeyValues(List.of( + new KeyValue() + .setKey("key3") + .setValue("value3".getBytes()), + new KeyValue() + .setKey("key2") + .setValue("value2".getBytes()))), (short) 0)); + DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveKVRecord() + .setKeys(List.of("key1")), (short) 0)); + DELTA1 = new KVDelta(IMAGE1); + RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS); + + Map map2 = Map.of( + "key2", ByteBuffer.wrap(new String("value2").getBytes()), + "key3", ByteBuffer.wrap(new String("value3").getBytes())); + IMAGE2 = new KVImage(map2); + } + + @Test + public void testEmptyImageRoundTrip() { + testToImageAndBack(KVImage.EMPTY); + } + + @Test + public void testImage1RoundTrip() { + testToImageAndBack(IMAGE1); + } + + @Test + public void testApplyDelta1() { + assertEquals(IMAGE2, DELTA1.apply()); + } + + @Test + public void testImage2RoundTrip() { + testToImageAndBack(IMAGE2); + } + + private void testToImageAndBack(KVImage image) { + RecordListWriter writer = new RecordListWriter(); + ImageWriterOptions options = new ImageWriterOptions.Builder().build(); + image.write(writer, options); + KVDelta delta = new KVDelta(KVImage.EMPTY); + RecordTestUtils.replayAll(delta, writer.records()); + KVImage newImage = delta.apply(); + assertEquals(image, newImage); + } +} diff --git a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java index b11ec57d0f..1820f355bb 100644 --- a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java @@ -45,7 +45,8 @@ public class MetadataImageTest { ProducerIdsImageTest.IMAGE1, AclsImageTest.IMAGE1, S3StreamsMetadataImageTest.IMAGE1, - S3ObjectsImageTest.IMAGE1); + S3ObjectsImageTest.IMAGE1, + KVImageTest.IMAGE1); DELTA1 = new MetadataDelta.Builder(). setImage(IMAGE1). @@ -58,6 +59,8 @@ public class MetadataImageTest { RecordTestUtils.replayAll(DELTA1, ProducerIdsImageTest.DELTA1_RECORDS); RecordTestUtils.replayAll(DELTA1, AclsImageTest.DELTA1_RECORDS); RecordTestUtils.replayAll(DELTA1, S3StreamsMetadataImageTest.DELTA1_RECORDS); + RecordTestUtils.replayAll(DELTA1, S3ObjectsImageTest.DELTA1_RECORDS); + RecordTestUtils.replayAll(DELTA1, KVImageTest.DELTA1_RECORDS); IMAGE2 = new MetadataImage( new MetadataProvenance(200, 5, 4000), @@ -69,7 +72,8 @@ public class MetadataImageTest { ProducerIdsImageTest.IMAGE2, AclsImageTest.IMAGE2, S3StreamsMetadataImageTest.IMAGE2, - S3ObjectsImageTest.IMAGE2); + S3ObjectsImageTest.IMAGE2, + KVImageTest.IMAGE2); } @Test