Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void write(ImageWriter writer, ImageWriterOptions options) {
// the writer#write maybe slow, so we use a copy to avoid holding the lock for a long time
List<S3Object> copy = registryRef.inLock(() -> new ArrayList<>(objects.values(registryRef.epoch())));

copy.forEach(v -> writer.write(v.toRecord()));
copy.forEach(v -> writer.write(v.toRecord(options.metadataVersion().autoMQVersion())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Objects;
import org.apache.kafka.common.metadata.S3ObjectRecord;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.automq.AutoMQVersion;

/**
* S3Object is the base class of object in S3. Manages the lifecycle of S3Object.
Expand Down Expand Up @@ -78,15 +79,19 @@ public S3Object(
this.attributes = attributes;
}

public ApiMessageAndVersion toRecord() {
return new ApiMessageAndVersion(new S3ObjectRecord()
public ApiMessageAndVersion toRecord(AutoMQVersion version) {
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(objectId)
.setObjectSize(objectSize)
.setObjectState(s3ObjectState.toByte())
.setPreparedTimeInMs(preparedTimeInMs)
.setExpiredTimeInMs(expiredTimeInMs)
.setCommittedTimeInMs(committedTimeInMs)
.setMarkDestroyedTimeInMs(markDestroyedTimeInMs), (short) 0);
.setMarkDestroyedTimeInMs(markDestroyedTimeInMs);
if (version.isObjectAttributesSupported()) {
record.setAttributes(attributes);
}
return new ApiMessageAndVersion(record, version.objectRecordVersion());
}

public static S3Object of(S3ObjectRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,15 @@ public Optional<StreamOffsetRange> find(long streamId) {
}

public ApiMessageAndVersion toRecord(AutoMQVersion version) {
return new ApiMessageAndVersion(new S3StreamSetObjectRecord()
S3StreamSetObjectRecord record = new S3StreamSetObjectRecord()
.setObjectId(objectId)
.setNodeId(nodeId)
.setOrderId(orderId)
.setDataTimeInMs(dataTimeInMs)
.setRanges(ranges), version.streamSetObjectRecordVersion());
.setDataTimeInMs(dataTimeInMs);
if (version.streamSetObjectRecordVersion() < (short) 1) {
record.setRanges(ranges);
}
return new ApiMessageAndVersion(record, version.streamSetObjectRecordVersion());
}

public static S3StreamSetObject of(S3StreamSetObjectRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,11 @@ public BasicObjectInfo(long dataBlockSize, IndexBlock indexBlock) {
public static BasicObjectInfo parse(ByteBuf objectTailBuf,
S3ObjectMetadata s3ObjectMetadata) throws ObjectParseException {
objectTailBuf = objectTailBuf.slice();
long footerMagic = objectTailBuf.getLong(objectTailBuf.readableBytes() - 8);
if (footerMagic != ObjectWriter.Footer.MAGIC) {
throw new ObjectParseException("Invalid footer magic: " + footerMagic);
}

long indexBlockPosition = objectTailBuf.getLong(objectTailBuf.readableBytes() - FOOTER_SIZE);
int indexBlockSize = objectTailBuf.getInt(objectTailBuf.readableBytes() - 40);
if (indexBlockPosition + objectTailBuf.readableBytes() < s3ObjectMetadata.objectSize()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public ByteBuf buffer() {

class Footer {
public static final int FOOTER_SIZE = 48;
private static final long MAGIC = 0x88e241b785f4cff7L;
public static final long MAGIC = 0x88e241b785f4cff7L;
private final ByteBuf buf;

public Footer(long indexStartPosition, int indexBlockLength) {
Expand Down