Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/src/main/scala/kafka/log/s3/ObjectReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

import static kafka.log.s3.ObjectWriter.Footer.FOOTER_SIZE;

public class ObjectReader {
private static final Logger LOGGER = LoggerFactory.getLogger(ObjectReader.class);
private final S3ObjectMetadata metadata;
Expand Down Expand Up @@ -110,9 +112,9 @@ public BasicObjectInfo(long dataBlockSize, IndexBlock indexBlock, int blockCount
}

public static BasicObjectInfo parse(ByteBuf objectTailBuf, long objectSize) throws IndexBlockParseException {
long indexBlockPosition = objectTailBuf.getLong(objectTailBuf.readableBytes() - 48);
long indexBlockPosition = objectTailBuf.getLong(objectTailBuf.readableBytes() - FOOTER_SIZE);
int indexBlockSize = objectTailBuf.getInt(objectTailBuf.readableBytes() - 40);
if (indexBlockPosition + objectTailBuf.readableBytes() < objectSize) {
if (indexBlockPosition + indexBlockSize + FOOTER_SIZE < objectSize) {
throw new IndexBlockParseException(indexBlockPosition);
} else {
int indexRelativePosition = objectTailBuf.readableBytes() - (int) (objectSize - indexBlockPosition);
Expand Down
17 changes: 15 additions & 2 deletions core/src/main/scala/kafka/log/s3/ObjectWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;

import static kafka.log.s3.operator.Writer.MIN_PART_SIZE;

// TODO: memory optimization

/**
* Write stream records to a single object.
*/
public class ObjectWriter {

private static final byte DATA_BLOCK_MAGIC = 0x01;
Expand All @@ -47,12 +53,19 @@ public class ObjectWriter {

private long size;

/**
* Create a new object writer.
* @param objectId object id
* @param s3Operator S3 operator
* @param blockSizeThreshold the max size of a block
* @param partSizeThreshold the max size of a part. If it is smaller than {@link Writer#MIN_PART_SIZE}, it will be set to {@link Writer#MIN_PART_SIZE}.
*/
public ObjectWriter(long objectId, S3Operator s3Operator, int blockSizeThreshold, int partSizeThreshold) {
this.objectId = objectId;
// TODO: use a better clusterName
String objectKey = ObjectUtils.genKey(0, "todocluster", objectId);
this.blockSizeThreshold = blockSizeThreshold;
this.partSizeThreshold = partSizeThreshold;
this.partSizeThreshold = Math.max(MIN_PART_SIZE, partSizeThreshold);
waitingUploadBlocks = new LinkedList<>();
completedBlocks = new LinkedList<>();
writer = s3Operator.writer(objectKey);
Expand Down Expand Up @@ -253,7 +266,7 @@ public int size() {
}

static class Footer {
private static final int FOOTER_SIZE = 48;
public static final int FOOTER_SIZE = 48;
private static final long MAGIC = 0x88e241b785f4cff7L;
private final ByteBuf buf;

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/s3/S3StreamClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private CompletableFuture<Stream> openStream0(long streamId, long epoch) {
thenApply(metadata -> {
StreamObjectsCompactionTask.Builder builder = new StreamObjectsCompactionTask.Builder(objectManager, s3Operator)
.withCompactedStreamObjectMaxSize(config.s3StreamObjectCompactionMaxSize())
.withCompactableStreamObjectLivingTimeInMs(config.s3StreamObjectCompactionLivingTimeThreshold());
.withEligibleStreamObjectLivingTimeInMs(config.s3StreamObjectCompactionLivingTimeThreshold());
S3Stream stream = new S3Stream(
metadata.getStreamId(), metadata.getEpoch(),
metadata.getStartOffset(), metadata.getNextOffset(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import kafka.log.s3.operator.S3Operator;
import kafka.log.s3.operator.Writer;
import org.apache.kafka.metadata.stream.ObjectUtils;
Expand All @@ -30,47 +29,65 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class StreamObjectCopyer {
import static kafka.log.s3.operator.Writer.MAX_PART_SIZE;

public class StreamObjectCopier {
private final List<StreamObjectIndexData> completedObjects;
private final S3Operator s3Operator;
private final Writer writer;
private final long objectId;
private long nextObjectDataStartPosition;
private int blockCount;

private long size;

public StreamObjectCopyer(long objectId, S3Operator s3Operator) {
// TODO: use a better clusterName
this(objectId, s3Operator, s3Operator.writer(ObjectUtils.genKey(0, "todocluster", objectId)));
}

public StreamObjectCopyer(long objectId, S3Operator s3Operator, Writer writer) {
this.objectId = objectId;
public StreamObjectCopier(long objectId, S3Operator s3Operator) {
this.s3Operator = s3Operator;
this.writer = writer;
// TODO: use a better clusterName
this.writer = s3Operator.writer(ObjectUtils.genKey(0, "todocluster", objectId));
this.completedObjects = new LinkedList<>();
this.nextObjectDataStartPosition = 0;
this.blockCount = 0;
this.size = 0;
}

public void write(S3ObjectMetadata metadata) {
public void copy(S3ObjectMetadata metadata) {
splitAndCopy(metadata, 1);
}

public void splitAndCopy(S3ObjectMetadata metadata, int splitCount) {
if (metadata.getType() != S3ObjectType.STREAM) {
throw new IllegalArgumentException("Only stream object can be handled.");
}
if (metadata.objectSize() <= 0) {
throw new IllegalArgumentException("Object size must be positive.");
}
if (splitCount <= 0) {
throw new IllegalArgumentException("Split count must be positive.");
}
ObjectReader reader = new ObjectReader(metadata, s3Operator);
ObjectReader.BasicObjectInfo basicObjectInfo = reader.basicObjectInfo().join();

long restBytes = basicObjectInfo.dataBlockSize();
// Only copy data blocks for now.
writer.copyWrite(metadata.key(), 0, basicObjectInfo.dataBlockSize());
completedObjects.add(new StreamObjectIndexData(basicObjectInfo.indexBlock(), basicObjectInfo.blockCount(), nextObjectDataStartPosition, blockCount, basicObjectInfo.indexBlockSize()));
for (long i = 0; i < splitCount - 1 && restBytes >= MAX_PART_SIZE; i++) {
writer.copyWrite(metadata.key(), i * MAX_PART_SIZE, (i + 1) * MAX_PART_SIZE);
restBytes -= MAX_PART_SIZE;
}
if (restBytes > MAX_PART_SIZE) {
throw new IllegalArgumentException("splitCount is too small, resting bytes: " + restBytes + " is larger than MAX_PART_SIZE: " + MAX_PART_SIZE + ".");
}
if (restBytes > 0) {
writer.copyWrite(metadata.key(), (splitCount - 1) * MAX_PART_SIZE, basicObjectInfo.dataBlockSize());
}

completedObjects.add(new StreamObjectIndexData(basicObjectInfo.indexBlock(), nextObjectDataStartPosition, blockCount));
blockCount += basicObjectInfo.blockCount();
nextObjectDataStartPosition += basicObjectInfo.dataBlockSize();
size += basicObjectInfo.dataBlockSize();
}

public CompletableFuture<Void> close() {
CompositeByteBuf buf = Unpooled.compositeBuffer();
CompositeByteBuf buf = ByteBufAlloc.ALLOC.compositeBuffer();
IndexBlock indexBlock = new IndexBlock();
buf.addComponent(true, indexBlock.buffer());
ObjectWriter.Footer footer = new ObjectWriter.Footer(indexBlock.position(), indexBlock.size());
Expand All @@ -90,9 +107,9 @@ private class IndexBlock {

public IndexBlock() {
position = nextObjectDataStartPosition;
buf = Unpooled.compositeBuffer();
buf = ByteBufAlloc.ALLOC.compositeBuffer();
// block count
buf.addComponent(true, Unpooled.buffer(4).writeInt(blockCount));
buf.addComponent(true, ByteBufAlloc.ALLOC.buffer(4).writeInt(blockCount));
// block index
for (StreamObjectIndexData indexData : completedObjects) {
buf.addComponent(true, indexData.blockBuf());
Expand All @@ -119,18 +136,8 @@ public int size() {
static class StreamObjectIndexData {
private final ByteBuf blockBuf;
private final ByteBuf rangesBuf;
/**
* how many data blocks in this object.
*/
private final int dataBlockCount;
/**
* The total length of the block index.
*/
private final int blockIndexTotalLength;

public StreamObjectIndexData(ObjectReader.IndexBlock indexBlock, int dataBlockCount, long blockStartPosition, int blockStartId, int blockIndexTotalLength) {
this.dataBlockCount = dataBlockCount;
this.blockIndexTotalLength = blockIndexTotalLength;

public StreamObjectIndexData(ObjectReader.IndexBlock indexBlock, long blockStartPosition, int blockStartId) {
this.blockBuf = indexBlock.blocks().copy();
this.rangesBuf = indexBlock.streamRanges().copy();

Expand Down
Loading