Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/log/s3/ObjectReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,20 @@ public CompletableFuture<DataBlock> read(DataBlockIndex block) {
}

private void asyncGetBasicObjectInfo() {
asyncGetBasicObjectInfo0(Math.max(0, metadata.getObjectSize() - 1024 * 1024));
asyncGetBasicObjectInfo0(Math.max(0, metadata.objectSize() - 1024 * 1024));
}

private void asyncGetBasicObjectInfo0(long startPosition) {
CompletableFuture<ByteBuf> cf = s3Operator.rangeRead(objectKey, startPosition, metadata.getObjectSize());
CompletableFuture<ByteBuf> cf = s3Operator.rangeRead(objectKey, startPosition, metadata.objectSize());
cf.thenAccept(buf -> {
try {
BasicObjectInfo basicObjectInfo = BasicObjectInfo.parse(buf, metadata.getObjectSize());
BasicObjectInfo basicObjectInfo = BasicObjectInfo.parse(buf, metadata.objectSize());
basicObjectInfoCf.complete(basicObjectInfo);
} catch (IndexBlockParseException ex) {
asyncGetBasicObjectInfo0(ex.indexBlockPosition);
}
}).exceptionally(ex -> {
LOGGER.warn("s3 range read from {} [{}, {}) failed", objectKey, startPosition, metadata.getObjectSize(), ex);
LOGGER.warn("s3 range read from {} [{}, {}) failed", objectKey, startPosition, metadata.objectSize(), ex);
// TODO: delay retry.
asyncGetBasicObjectInfo0(startPosition);
return null;
Expand Down
45 changes: 20 additions & 25 deletions core/src/main/scala/kafka/log/s3/StreamObjectsCompactionTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import kafka.log.s3.operator.S3Operator;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.stream.S3ObjectMetadata;
import org.apache.kafka.metadata.stream.S3ObjectType;
import org.apache.kafka.metadata.stream.S3StreamObjectMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,7 +39,7 @@
*/
public class StreamObjectsCompactionTask {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamObjectsCompactionTask.class);
private Queue<List<S3StreamObjectMetadata>> compactGroups;
private Queue<List<S3ObjectMetadata>> compactGroups;
private final long compactedStreamObjectMaxSize;
private final long compactableStreamObjectLivingTimeInMs;
private long nextStartSearchingOffset;
Expand All @@ -58,16 +56,14 @@ public StreamObjectsCompactionTask(ObjectManager objectManager, S3Operator s3Ope
this.nextStartSearchingOffset = stream.startOffset();
}

private CompletableFuture<Void> doCompaction(List<S3StreamObjectMetadata> streamObjectMetadataList) {
List<S3ObjectMetadata> objectMetadatas = streamObjectMetadataList.stream().map(metadata ->
new S3ObjectMetadata(metadata.objectId(), metadata.objectSize(), S3ObjectType.STREAM)
).collect(Collectors.toList());
private CompletableFuture<Void> doCompaction(List<S3ObjectMetadata> streamObjectMetadataList) {
List<S3ObjectMetadata> objectMetadatas = streamObjectMetadataList;

long startOffset = streamObjectMetadataList.get(0).startOffset();
long endOffset = streamObjectMetadataList.get(streamObjectMetadataList.size() - 1).endOffset();
List<Long> sourceObjectIds = streamObjectMetadataList
.stream()
.map(S3StreamObjectMetadata::objectId)
.map(S3ObjectMetadata::objectId)
.collect(Collectors.toList());

if (stream.isClosed()) {
Expand Down Expand Up @@ -99,7 +95,7 @@ private CompletableFuture<Void> doCompaction(List<S3StreamObjectMetadata> stream
public CompletableFuture<Void> doCompactions() {
CompletableFuture<Void> lastCompactionFuture = CompletableFuture.completedFuture(null);
while (!compactGroups.isEmpty()) {
List<S3StreamObjectMetadata> streamObjectMetadataList = compactGroups.poll();
List<S3ObjectMetadata> streamObjectMetadataList = compactGroups.poll();
CompletableFuture<Void> future = new CompletableFuture<>();
lastCompactionFuture.whenComplete((v, ex) -> {
if (ex != null) {
Expand Down Expand Up @@ -132,17 +128,16 @@ public void prepare() {
* @param startSearchingOffset start searching offset.
* @return compact groups.
*/
public Queue<List<S3StreamObjectMetadata>> prepareCompactGroups(long startSearchingOffset) {
public Queue<List<S3ObjectMetadata>> prepareCompactGroups(long startSearchingOffset) {
long startOffset = Utils.max(startSearchingOffset, stream.startOffset());
List<S3StreamObjectMetadata> rawFetchedStreamObjects = objectManager
List<S3ObjectMetadata> rawFetchedStreamObjects = objectManager
.getStreamObjects(stream.streamId(), startOffset, stream.nextOffset(), Integer.MAX_VALUE)
.stream()
.sorted()
.collect(Collectors.toList());

this.nextStartSearchingOffset = calculateNextStartSearchingOffset(rawFetchedStreamObjects, startOffset);

List<S3StreamObjectMetadata> streamObjects = rawFetchedStreamObjects
List<S3ObjectMetadata> streamObjects = rawFetchedStreamObjects
.stream()
.filter(streamObject -> streamObject.objectSize() < compactedStreamObjectMaxSize)
.collect(Collectors.toList());
Expand Down Expand Up @@ -170,7 +165,7 @@ public void close() {}
* @param rawStartSearchingOffset raw start searching offset.
* @return next start searching offset.
*/
private long calculateNextStartSearchingOffset(List<S3StreamObjectMetadata> streamObjects,
private long calculateNextStartSearchingOffset(List<S3ObjectMetadata> streamObjects,
long rawStartSearchingOffset) {
long lastEndOffset = rawStartSearchingOffset;
if (streamObjects == null || streamObjects.isEmpty()) {
Expand All @@ -193,16 +188,16 @@ private long calculateNextStartSearchingOffset(List<S3StreamObjectMetadata> stre
* @param streamObjects stream objects.
* @return object groups.
*/
private List<List<S3StreamObjectMetadata>> groupContinuousObjects(List<S3StreamObjectMetadata> streamObjects) {
private List<List<S3ObjectMetadata>> groupContinuousObjects(List<S3ObjectMetadata> streamObjects) {
if (streamObjects == null || streamObjects.size() <= 1) {
return new LinkedList<>();
}

List<Stack<S3StreamObjectMetadata>> stackList = new LinkedList<>();
Stack<S3StreamObjectMetadata> stack = new Stack<>();
List<Stack<S3ObjectMetadata>> stackList = new LinkedList<>();
Stack<S3ObjectMetadata> stack = new Stack<>();
stackList.add(stack);

for (S3StreamObjectMetadata object : streamObjects) {
for (S3ObjectMetadata object : streamObjects) {
if (stack.isEmpty()) {
stack.push(object);
} else {
Expand Down Expand Up @@ -234,19 +229,19 @@ private List<List<S3StreamObjectMetadata>> groupContinuousObjects(List<S3StreamO
* @param streamObjects stream objects.
* @return stream object subgroups.
*/
private Queue<List<S3StreamObjectMetadata>> groupEligibleObjects(List<S3StreamObjectMetadata> streamObjects) {
private Queue<List<S3ObjectMetadata>> groupEligibleObjects(List<S3ObjectMetadata> streamObjects) {
if (streamObjects == null || streamObjects.size() <= 1) {
return new LinkedList<>();
}

Queue<List<S3StreamObjectMetadata>> groups = new LinkedList<>();
Queue<List<S3ObjectMetadata>> groups = new LinkedList<>();

int startIndex = 0;
int endIndex = 0;
while (startIndex < streamObjects.size() - 1) {
endIndex = startIndex + 1;
while (endIndex <= streamObjects.size()) {
List<S3StreamObjectMetadata> subGroup = streamObjects.subList(startIndex, endIndex);
List<S3ObjectMetadata> subGroup = streamObjects.subList(startIndex, endIndex);
// The subgroup is too new or too big, then break;
if (calculateTimePassedInMs(subGroup) < compactableStreamObjectLivingTimeInMs ||
calculateTotalSize(subGroup) > compactedStreamObjectMaxSize) {
Expand All @@ -264,12 +259,12 @@ private Queue<List<S3StreamObjectMetadata>> groupEligibleObjects(List<S3StreamOb
return groups;
}

private long calculateTimePassedInMs(List<S3StreamObjectMetadata> streamObjects) {
return System.currentTimeMillis() - streamObjects.stream().mapToLong(S3StreamObjectMetadata::timestamp).max().orElse(0L);
private long calculateTimePassedInMs(List<S3ObjectMetadata> streamObjects) {
return System.currentTimeMillis() - streamObjects.stream().mapToLong(S3ObjectMetadata::committedTimestamp).max().orElse(0L);
}

private long calculateTotalSize(List<S3StreamObjectMetadata> streamObjects) {
return streamObjects.stream().mapToLong(S3StreamObjectMetadata::objectSize).sum();
private long calculateTotalSize(List<S3ObjectMetadata> streamObjects) {
return streamObjects.stream().mapToLong(S3ObjectMetadata::objectSize).sum();
}

public static class HaltException extends RuntimeException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.kafka.metadata.stream.S3Object;
import org.apache.kafka.metadata.stream.S3ObjectMetadata;
import org.apache.kafka.metadata.stream.S3ObjectState;
import org.apache.kafka.metadata.stream.S3StreamObjectMetadata;
import org.apache.kafka.metadata.stream.S3StreamConstant;
import org.apache.kafka.metadata.stream.S3StreamObject;
import org.apache.kafka.metadata.stream.S3WALObject;
Expand Down Expand Up @@ -159,10 +158,11 @@ public CompletableFuture<CommitWALObjectResponse> commitWALObject(CommitWALObjec
throw new RuntimeException("Object " + objectId + " is not in prepared state");
}
// commit object
this.objectsMetadata.put(objectId, new S3Object(
objectId, objectSize, object.getObjectKey(),
object.getPreparedTimeInMs(), object.getExpiredTimeInMs(), System.currentTimeMillis(), -1,
S3ObjectState.COMMITTED)
S3Object s3Object = new S3Object(
objectId, objectSize, object.getObjectKey(),
object.getPreparedTimeInMs(), object.getExpiredTimeInMs(), System.currentTimeMillis(), -1,
S3ObjectState.COMMITTED);
this.objectsMetadata.put(objectId, s3Object
);
// build metadata
MemoryBrokerWALMetadata walMetadata = this.brokerWALMetadata.computeIfAbsent(MOCK_BROKER_ID,
Expand All @@ -184,8 +184,7 @@ public CompletableFuture<CommitWALObjectResponse> commitWALObject(CommitWALObjec
streamMetadata.addStreamObject(s3StreamObject);
streamMetadata.endOffset = Math.max(streamMetadata.endOffset, streamObject.getEndOffset());
});

S3WALObject walObject = new S3WALObject(objectId, MOCK_BROKER_ID, index, request.getOrderId());
S3WALObject walObject = new S3WALObject(objectId, MOCK_BROKER_ID, index, request.getOrderId(), s3Object.getCommittedTimeInMs());
walMetadata.walObjects.add(walObject);
return resp;
});
Expand Down Expand Up @@ -231,6 +230,7 @@ public List<S3ObjectMetadata> getServerObjects() {
}
}


@Override
public List<S3ObjectMetadata> getObjects(long streamId, long startOffset, long endOffset, int limit) {
// TODO: support search not only in wal objects
Expand Down Expand Up @@ -269,8 +269,8 @@ public List<S3ObjectMetadata> getObjects(long streamId, long startOffset, long e
}

@Override
public List<S3StreamObjectMetadata> getStreamObjects(long streamId, long startOffset, long endOffset, int limit) {
return Collections.emptyList();
public List<S3ObjectMetadata> getStreamObjects(long streamId, long startOffset, long endOffset, int limit) {
throw new UnsupportedOperationException("Not support");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.kafka.metadata.stream.InRangeObjects;
import org.apache.kafka.metadata.stream.S3Object;
import org.apache.kafka.metadata.stream.S3ObjectMetadata;
import org.apache.kafka.metadata.stream.S3StreamConstant;
import org.apache.kafka.metadata.stream.S3StreamObject;
import org.apache.kafka.metadata.stream.S3StreamObjectMetadata;
import org.apache.kafka.metadata.stream.StreamOffsetRange;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.slf4j.Logger;
Expand Down Expand Up @@ -89,6 +89,9 @@ private void onImageChanged(MetadataDelta delta, MetadataImage newImage) {
// remove all catch up pending tasks
List<GetObjectsTask> retryTasks = removePendingTasks();
// retry all pending tasks
if (retryTasks == null || retryTasks.isEmpty()) {
return;
}
this.pendingExecutorService.submit(() -> {
retryPendingTasks(retryTasks);
});
Expand Down Expand Up @@ -166,13 +169,16 @@ public CompletableFuture<InRangeObjects> fetch(long streamId, long startOffset,
}
}

public CompletableFuture<List<S3StreamObjectMetadata>> getStreamObjects(long streamId, long startOffset, long endOffset, int limit) {
public CompletableFuture<List<S3ObjectMetadata>> getStreamObjects(long streamId, long startOffset, long endOffset, int limit) {
synchronized (StreamMetadataManager.this) {
try {
List<S3StreamObject> streamObjects = streamsImage.getStreamObjects(streamId, startOffset, endOffset, limit);
List<S3StreamObjectMetadata> s3StreamObjectMetadataList = streamObjects.stream().map(object -> {
long committedTimeInMs = objectsImage.getObjectMetadata(object.objectId()).getCommittedTimeInMs();
return new S3StreamObjectMetadata(object, committedTimeInMs);
List<S3ObjectMetadata> s3StreamObjectMetadataList = streamObjects.stream().map(object -> {
S3Object objectMetadata = objectsImage.getObjectMetadata(object.objectId());
long committedTimeInMs = objectMetadata.getCommittedTimeInMs();
long objectSize = objectMetadata.getObjectSize();
return new S3ObjectMetadata(object.objectId(), object.objectType(), List.of(object.streamOffsetRange()), object.dataTimeInMs(),
committedTimeInMs, objectSize, S3StreamConstant.INVALID_ORDER_ID);
}).collect(Collectors.toList());
return CompletableFuture.completedFuture(s3StreamObjectMetadataList);
} catch (Exception e) {
Expand Down Expand Up @@ -205,9 +211,9 @@ private CompletableFuture<InRangeObjects> fetch0(long streamId, long startOffset
streamId, startOffset, endOffset, limit);
return CompletableFuture.completedFuture(InRangeObjects.INVALID);
}
// fill the objects' size
// fill the objects' size and committed-timestamp
for (S3ObjectMetadata object : cachedInRangeObjects.objects()) {
S3Object objectMetadata = objectsImage.getObjectMetadata(object.getObjectId());
S3Object objectMetadata = objectsImage.getObjectMetadata(object.objectId());
if (objectMetadata == null) {
// should not happen
LOGGER.error(
Expand All @@ -216,6 +222,7 @@ private CompletableFuture<InRangeObjects> fetch0(long streamId, long startOffset
return CompletableFuture.completedFuture(InRangeObjects.INVALID);
}
object.setObjectSize(objectMetadata.getObjectSize());
object.setCommittedTimestamp(objectMetadata.getCommittedTimeInMs());
}
LOGGER.trace(
"[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache success with result: {}",
Expand All @@ -224,17 +231,18 @@ private CompletableFuture<InRangeObjects> fetch0(long streamId, long startOffset
}

public void retryPendingTasks(List<GetObjectsTask> tasks) {
if (!tasks.isEmpty()) {
LOGGER.info("[RetryPendingTasks]: retry tasks count: {}", tasks.size());
tasks.forEach(task -> {
long streamId = task.streamId;
long startOffset = task.startOffset;
long endOffset = task.endOffset;
int limit = task.limit;
CompletableFuture<InRangeObjects> newCf = this.fetch(streamId, startOffset, endOffset, limit);
FutureUtil.propagate(newCf, task.cf);
});
if (tasks == null || tasks.isEmpty()) {
return;
}
LOGGER.info("[RetryPendingTasks]: retry tasks count: {}", tasks.size());
tasks.forEach(task -> {
long streamId = task.streamId;
long startOffset = task.startOffset;
long endOffset = task.endOffset;
int limit = task.limit;
CompletableFuture<InRangeObjects> newCf = this.fetch(streamId, startOffset, endOffset, limit);
FutureUtil.propagate(newCf, task.cf);
});
}

static class GetObjectsTask {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.kafka.common.requests.s3.PrepareS3ObjectRequest.Builder;
import org.apache.kafka.metadata.stream.InRangeObjects;
import org.apache.kafka.metadata.stream.S3ObjectMetadata;
import org.apache.kafka.metadata.stream.S3StreamObjectMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -140,18 +139,19 @@ public List<S3ObjectMetadata> getObjects(long streamId, long startOffset, long e
}

@Override
public List<S3StreamObjectMetadata> getStreamObjects(long streamId, long startOffset, long endOffset, int limit) {
public List<S3ObjectMetadata> getServerObjects() {
return null;
}

@Override
public List<S3ObjectMetadata> getStreamObjects(long streamId, long startOffset, long endOffset, int limit) {
try {
return this.metadataManager.getStreamObjects(streamId, startOffset, endOffset, limit).get();
} catch (Exception e) {
LOGGER.error("Error while get objects, streamId: {}, startOffset: {}, endOffset: {}, limit: {}", streamId, startOffset, endOffset, limit,
LOGGER.error("Error while get stream objects, streamId: {}, startOffset: {}, endOffset: {}, limit: {}", streamId, startOffset, endOffset,
limit,
e);
return Collections.emptyList();
}
}

@Override
public List<S3ObjectMetadata> getServerObjects() {
return null;
}
}
Loading