Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,15 @@ public enum ApiKeys {
UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true),
DESCRIBE_TRANSACTIONS(ApiMessageType.DESCRIBE_TRANSACTIONS),
LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS),
ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true);
ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true),

// Kafka on S3 inject start

CREATE_STREAM(ApiMessageType.CREATE_STREAM, false, true),
DELETE_STREAM(ApiMessageType.DELETE_STREAM, false, true),
OPEN_STREAM(ApiMessageType.OPEN_STREAM, false, true),
CLOSE_STREAM(ApiMessageType.CLOSE_STREAM, false, true);
// Kafka on S3 inject end

private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);
Expand Down
29 changes: 29 additions & 0 deletions clients/src/main/resources/common/message/CloseStreamRequest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 70,
"type": "request",
"listeners": ["controller", "broker"],
"name": "CloseStreamRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The ID of the requesting broker" },
{ "name": "StreamId", "type": "int64", "versions": "0+", "entityType": "streamId",
"about": "The id of the requesting stream" }
]
}
26 changes: 26 additions & 0 deletions clients/src/main/resources/common/message/CloseStreamResponse.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 70,
"type": "response",
"name": "CloseStreamResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top level response error code" }
]
}
27 changes: 27 additions & 0 deletions clients/src/main/resources/common/message/CreateStreamRequest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 68,
"type": "request",
"listeners": ["controller", "broker"],
"name": "CreateStreamRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "StreamId", "type": "int64", "versions": "0+", "entityType": "streamId",
"about": "The id of the requesting stream" }
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 68,
"type": "response",
"name": "CreateStreamResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top level response error code" }
]
}
27 changes: 27 additions & 0 deletions clients/src/main/resources/common/message/DeleteStreamRequest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 71,
"type": "request",
"listeners": ["controller", "broker"],
"name": "DeleteStreamRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "StreamId", "type": "int64", "versions": "0+", "entityType": "streamId",
"about": "The id of the requesting stream" }
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 71,
"type": "response",
"name": "DeleteStreamResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top level response error code" }
]
}
29 changes: 29 additions & 0 deletions clients/src/main/resources/common/message/OpenStreamRequest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 69,
"type": "request",
"listeners": ["controller", "broker"],
"name": "OpenStreamRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The ID of the requesting broker" },
{ "name": "StreamId", "type": "int64", "versions": "0+", "entityType": "streamId",
"about": "The id of the requesting stream" }
]
}
26 changes: 26 additions & 0 deletions clients/src/main/resources/common/message/OpenStreamResponse.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 69,
"type": "response",
"name": "OpenStreamResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top level response error code" }
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ object MetadataCacheTest {
image.configs(),
image.clientQuotas(),
image.producerIds(),
image.acls())
image.acls(),
image.streamsMetadata())
val delta = new MetadataDelta.Builder().setImage(partialImage).build()

def toRecord(broker: UpdateMetadataBroker): RegisterBrokerRecord = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, MetadataProvenance, ProducerIdsImage, TopicsDelta, TopicsImage}
import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, MetadataProvenance, ProducerIdsImage, TopicsDelta, TopicsImage, S3StreamsMetadataImage}
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
Expand Down Expand Up @@ -4128,7 +4128,8 @@ class ReplicaManagerTest {
ConfigurationsImage.EMPTY,
ClientQuotasImage.EMPTY,
ProducerIdsImage.EMPTY,
AclsImage.EMPTY
AclsImage.EMPTY,
S3StreamsMetadataImage.EMPTY
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kafka.message;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.message.FieldType.Int64FieldType;

public enum EntityType {
@JsonProperty("unknown")
Expand All @@ -36,8 +37,12 @@ public enum EntityType {
TOPIC_NAME(FieldType.StringFieldType.INSTANCE),

@JsonProperty("brokerId")
BROKER_ID(FieldType.Int32FieldType.INSTANCE);
BROKER_ID(FieldType.Int32FieldType.INSTANCE),

// Kafka on S3 inject start
@JsonProperty("streamId")
STREAM_ID(Int64FieldType.INSTANCE);
// Kafka on S3 inject end
private final FieldType baseType;

EntityType(FieldType baseType) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.controller.stream;

import java.util.LinkedList;
import java.util.Queue;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.stream.S3Object;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.slf4j.Logger;

/**
* The S3ObjectControlManager manages all S3Object's lifecycle, such as apply, create, destroy, etc.
*/
public class S3ObjectControlManager {
private final SnapshotRegistry snapshotRegistry;
private final Logger log;

private final TimelineHashMap<Long/*objectId*/, S3Object> objectsMetadata;

/**
* The objectId of the next object to be applied. (start from 0)
*/
private Long nextApplyObjectId = 0L;

// TODO: add timer task to periodically check if there are objects to be destroyed or created
private final Queue<Long/*objectId*/> appliedObjects;
private final Queue<Long/*objectId*/> markDestroyedObjects;

public S3ObjectControlManager(
SnapshotRegistry snapshotRegistry,
LogContext logContext) {
this.snapshotRegistry = snapshotRegistry;
this.log = logContext.logger(S3ObjectControlManager.class);
this.objectsMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
this.appliedObjects = new LinkedList<>();
this.markDestroyedObjects = new LinkedList<>();
}

public Long appliedObjectNum() {
return nextApplyObjectId;
}

}
Loading