Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import kafka.server.BrokerServer;
import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;
import org.apache.kafka.controller.stream.S3StreamConstant;
import org.apache.kafka.metadata.stream.S3StreamConstant;
import org.apache.kafka.image.BrokerS3WALMetadataImage;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import kafka.log.s3.objects.ObjectStreamRange;
import kafka.log.s3.objects.OpenStreamMetadata;
import org.apache.kafka.common.errors.s3.StreamNotClosedException;
import org.apache.kafka.controller.stream.S3StreamConstant;
import org.apache.kafka.metadata.stream.S3StreamConstant;
import org.apache.kafka.metadata.stream.S3ObjectMetadata;
import kafka.log.s3.streams.StreamManager;
import org.apache.kafka.metadata.stream.ObjectUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.metadata.stream.RangeMetadata;
import org.apache.kafka.metadata.stream.S3ObjectStreamIndex;
import org.apache.kafka.metadata.stream.S3StreamConstant;
import org.apache.kafka.metadata.stream.S3StreamObject;
import org.apache.kafka.metadata.stream.S3WALObject;
import org.apache.kafka.metadata.stream.StreamState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@

package org.apache.kafka.image;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.metadata.BrokerWALMetadataRecord;
import org.apache.kafka.common.metadata.RemoveWALObjectRecord;
import org.apache.kafka.common.metadata.WALObjectRecord;
import org.apache.kafka.metadata.stream.S3WALObject;
import org.apache.kafka.metadata.stream.SortedWALObjects;
import org.apache.kafka.metadata.stream.SortedWALObjectsList;

public class BrokerS3WALMetadataDelta {

Expand Down Expand Up @@ -58,7 +58,7 @@ public void replay(RemoveWALObjectRecord record) {
}

public BrokerS3WALMetadataImage apply() {
List<S3WALObject> newS3WALObjects = new ArrayList<>(image.getWalObjects());
SortedWALObjects newS3WALObjects = new SortedWALObjectsList(image.getWalObjects());
// add all changed WAL objects
newS3WALObjects.addAll(addedS3WALObjects.values());
// remove all removed WAL objects
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,26 @@

package org.apache.kafka.image;

import java.util.List;
import java.util.Iterator;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.kafka.common.metadata.BrokerWALMetadataRecord;
import org.apache.kafka.metadata.stream.S3StreamConstant;
import org.apache.kafka.metadata.stream.S3WALObject;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.metadata.stream.SortedWALObjects;
import org.apache.kafka.metadata.stream.SortedWALObjectsList;
import org.apache.kafka.server.common.ApiMessageAndVersion;

public class BrokerS3WALMetadataImage {

public static final BrokerS3WALMetadataImage EMPTY = new BrokerS3WALMetadataImage(-1, List.of());
public static final BrokerS3WALMetadataImage EMPTY = new BrokerS3WALMetadataImage(S3StreamConstant.INVALID_BROKER_ID, new SortedWALObjectsList());
private final int brokerId;
private final List<S3WALObject> s3WalObjects;
private final SortedWALObjects s3WalObjects;

public BrokerS3WALMetadataImage(int brokerId, List<S3WALObject> s3WalObjects) {
public BrokerS3WALMetadataImage(int brokerId, SortedWALObjects sourceWALObjects) {
this.brokerId = brokerId;
this.s3WalObjects = s3WalObjects;
this.s3WalObjects = new SortedWALObjectsList(sourceWALObjects);
}

@Override
Expand All @@ -58,10 +60,14 @@ public int hashCode() {
public void write(ImageWriter writer, ImageWriterOptions options) {
writer.write(new ApiMessageAndVersion(new BrokerWALMetadataRecord()
.setBrokerId(brokerId), (short) 0));
s3WalObjects.forEach(walObject -> writer.write(walObject.toRecord()));
Iterator<S3WALObject> iterator = s3WalObjects.iterator();
while (iterator.hasNext()) {
S3WALObject s3WALObject = iterator.next();
writer.write(s3WALObject.toRecord());
}
}

public List<S3WALObject> getWalObjects() {
public SortedWALObjects getWalObjects() {
return s3WalObjects;
}

Expand All @@ -73,9 +79,7 @@ public int getBrokerId() {
public String toString() {
return "BrokerS3WALMetadataImage{" +
"brokerId=" + brokerId +
", s3WalObjects=" + s3WalObjects.stream()
.map(wal -> wal.toString())
.collect(Collectors.joining(", ")) +
", s3WalObjects=" + s3WalObjects +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.kafka.common.metadata.S3StreamRecord;
import org.apache.kafka.controller.stream.S3StreamConstant;
import org.apache.kafka.metadata.stream.S3StreamConstant;
import org.apache.kafka.metadata.stream.RangeMetadata;
import org.apache.kafka.metadata.stream.S3StreamObject;
import org.apache.kafka.image.writer.ImageWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public InRangeObjects getObjects(int limit) {
if (wal == null) {
return InRangeObjects.INVALID;
}
List<ObjectStreamRange> walObjects = wal.getWalObjects().stream()
List<ObjectStreamRange> walObjects = wal.getWalObjects().list().stream()
.filter(obj -> obj.streamsIndex().containsKey(streamId) && obj.streamsIndex().get(streamId).size() != 0)
.flatMap(obj -> {
List<S3ObjectStreamIndex> indexes = obj.streamsIndex().get(streamId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.kafka.controller.stream;
package org.apache.kafka.metadata.stream;

public class S3StreamConstant {

Expand All @@ -33,6 +33,8 @@ public class S3StreamConstant {

public static final long INVALID_OFFSET = -1L;

public static final int INVALID_BROKER_ID = -1;

public static final long MAX_OBJECT_ID = Long.MAX_VALUE;

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ public class S3WALObject implements Comparable<S3WALObject> {

private final S3ObjectType objectType = S3ObjectType.UNKNOWN;

public S3WALObject(long objectId, int brokerId, final Map<Long, List<S3ObjectStreamIndex>> streamsIndex) {
// default orderId is equal to objectId
this(objectId, brokerId, streamsIndex, objectId);
}

public S3WALObject(long objectId, int brokerId, final Map<Long, List<S3ObjectStreamIndex>> streamsIndex, long orderId) {
this.orderId = orderId;
this.objectId = objectId;
Expand All @@ -71,6 +66,7 @@ public ApiMessageAndVersion toRecord() {
return new ApiMessageAndVersion(new WALObjectRecord()
.setObjectId(objectId)
.setBrokerId(brokerId)
.setOrderId(orderId)
.setStreamsIndex(
streamsIndex.values().stream().flatMap(List::stream)
.map(S3ObjectStreamIndex::toRecordStreamIndex)
Expand All @@ -82,7 +78,7 @@ public static S3WALObject of(WALObjectRecord record) {
.map(index -> new S3ObjectStreamIndex(index.streamId(), index.startOffset(), index.endOffset()))
.collect(Collectors.groupingBy(S3ObjectStreamIndex::getStreamId));
S3WALObject s3WalObject = new S3WALObject(record.objectId(), record.brokerId(),
collect);
collect, record.orderId());
return s3WalObject;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.metadata.stream;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.function.Predicate;

public interface SortedWALObjects {

int size();

boolean isEmpty();

Iterator<S3WALObject> iterator();

List<S3WALObject> list();

boolean contains(Object o);

boolean add(S3WALObject s3WALObject);

default boolean addAll(Collection<S3WALObject> walObjects) {
walObjects.forEach(this::add);
return true;
}

boolean remove(Object o);

default boolean removeIf(Predicate<S3WALObject> filter) {
return this.list().removeIf(filter);
}

S3WALObject get(int index);

void clear();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.metadata.stream;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

public class SortedWALObjectsList implements SortedWALObjects {

private final List<S3WALObject> list;

public SortedWALObjectsList(SortedWALObjects source) {
this.list = new LinkedList<>(source.list());
}

public SortedWALObjectsList() {
this.list = new LinkedList<>();
}

/**
* Construct a SortedWALObjectsList from a list of S3WALObjects.
* @param list the list of S3WALObjects, must guarantee that the list is sorted
*/
public SortedWALObjectsList(List<S3WALObject> list) {
this.list = list;
}

@Override
public int size() {
return this.list.size();
}

@Override
public boolean isEmpty() {
return this.list.isEmpty();
}

@Override
public Iterator<S3WALObject> iterator() {
return this.list.iterator();
}

@Override
public List<S3WALObject> list() {
return list;
}

@Override
public boolean contains(Object o) {
return this.list.contains(o);
}

@Override
public boolean add(S3WALObject s3WALObject) {
// TODO: optimize by binary search
for (int index = 0; index < this.list.size(); index++) {
S3WALObject current = this.list.get(index);
if (s3WALObject.compareTo(current) <= 0) {
this.list.add(index, s3WALObject);
return true;
}
}
this.list.add(s3WALObject);
return true;
}

@Override
public boolean remove(Object o) {
// TODO: optimize by binary search
return this.list.remove(o);
}



@Override
public S3WALObject get(int index) {
return this.list.get(index);
}

@Override
public void clear() {
this.list.clear();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SortedWALObjectsList that = (SortedWALObjectsList) o;
return Objects.equals(list, that.list);
}

@Override
public int hashCode() {
return Objects.hash(list);
}

@Override
public String toString() {
return "SortedWALObjectsList{" +
"list=" + list.stream().map(S3WALObject::toString).collect(Collectors.joining(",")) +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.controller.stream.S3ObjectControlManager;
import org.apache.kafka.controller.stream.S3StreamConstant;
import org.apache.kafka.metadata.stream.S3StreamConstant;
import org.apache.kafka.controller.stream.StreamControlManager;
import org.apache.kafka.controller.stream.StreamControlManager.S3StreamMetadata;
import org.apache.kafka.metadata.stream.RangeMetadata;
Expand Down
Loading