Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
"about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota."
},
{
"name": "S3ObjectIds",
"type": "[]int64",
"name": "FirstS3ObjectId",
"type": "int64",
"versions": "0+",
"about": "The prepared S3 objects' id"
"about": "The first of the prepared S3 objects' id"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ public CompletableFuture<Long> prepareObject(int count, long ttl) {
Errors code = Errors.forCode(resp.errorCode());
switch (code) {
case NONE:
// TODO: simply response's data structure, only return first object id is enough
return resp.s3ObjectIds().stream().findFirst().get();
return resp.firstS3ObjectId();
default:
LOGGER.error("Error while preparing {} object, code: {}", count, code);
throw code.exception();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,16 +132,15 @@ public ControllerResult<PrepareS3ObjectResponseData> prepareObject(PrepareS3Obje
List<ApiMessageAndVersion> records = new ArrayList<>();
PrepareS3ObjectResponseData response = new PrepareS3ObjectResponseData();
int count = request.preparedCount();
List<Long> prepareObjectIds = new ArrayList<>(count);

// update assigned stream id
long newAssignedObjectId = nextAssignedObjectId.get() + count - 1;
records.add(new ApiMessageAndVersion(new AssignedS3ObjectIdRecord()
.setAssignedS3ObjectId(newAssignedObjectId), (short) 0));

long firstAssignedObjectId = nextAssignedObjectId.get();
for (int i = 0; i < count; i++) {
Long objectId = nextAssignedObjectId.get() + i;
prepareObjectIds.add(objectId);
long preparedTs = System.currentTimeMillis();
long expiredTs = preparedTs + request.timeToLiveInMs();
S3ObjectRecord record = new S3ObjectRecord()
Expand All @@ -151,7 +150,7 @@ public ControllerResult<PrepareS3ObjectResponseData> prepareObject(PrepareS3Obje
.setExpiredTimeInMs(expiredTs);
records.add(new ApiMessageAndVersion(record, (short) 0));
}
response.setS3ObjectIds(prepareObjectIds);
response.setFirstS3ObjectId(firstAssignedObjectId);
return ControllerResult.atomicOf(records, response);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,6 @@ public ControllerResult<DeleteStreamResponseData> deleteStream(DeleteStreamReque
}

public ControllerResult<CommitWALObjectResponseData> commitWALObject(CommitWALObjectRequestData data) {
// TODO: deal with compacted objects, mark delete compacted object
// TODO: deal with stream objects, replay streamObjectRecord to advance stream's end offset
CommitWALObjectResponseData resp = new CommitWALObjectResponseData();
List<ApiMessageAndVersion> records = new ArrayList<>();
long objectId = data.objectId();
Expand Down Expand Up @@ -402,18 +400,17 @@ public ControllerResult<CommitWALObjectResponseData> commitWALObject(CommitWALOb
indexes.stream()
.map(S3ObjectStreamIndex::toRecordStreamIndex)
.collect(Collectors.toList())), (short) 0));
// generate compacted objects' remove record
data.compactedObjectIds().forEach(id -> records.add(new ApiMessageAndVersion(new RemoveWALObjectRecord()
.setObjectId(id), (short) 0)));
// create stream object records
// TODO: deal with the lifecycle of stream object's source objects, when and how to delete them ?
List<StreamObject> streamObjects = data.streamObjects();
streamObjects.stream().forEach(obj -> {
long streamId = obj.streamId();
long startOffset = obj.startOffset();
long endOffset = obj.endOffset();
records.add(new S3StreamObject(obj.objectId(), obj.objectSize(), streamId, startOffset, endOffset).toRecord());
});
// generate compacted objects' remove record
data.compactedObjectIds().forEach(id -> records.add(new ApiMessageAndVersion(new RemoveWALObjectRecord()
.setObjectId(id), (short) 0)));
log.info("[CommitWALObject]: broker: {} commit wal object {} success", brokerId, objectId);
return ControllerResult.atomicOf(records, resp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ public long endOffset() {
}

public S3ObjectMetadata toS3ObjectMetadata() {
// TODO: fill object size later
return new S3ObjectMetadata(objectId, -1, objectType);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.kafka.common.message.PrepareS3ObjectRequestData;
import org.apache.kafka.common.message.PrepareS3ObjectResponseData;
import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.metadata.RemoveS3ObjectRecord;
import org.apache.kafka.common.metadata.S3ObjectRecord;
import org.apache.kafka.common.protocol.ApiMessage;
Expand Down Expand Up @@ -80,6 +82,7 @@ public void testPrepareObject() {
.setPreparedCount(3)
.setTimeToLiveInMs(60 * 1000));
assertEquals(Errors.NONE.code(), result0.response().errorCode());
assertEquals(0L, result0.response().firstS3ObjectId());
assertEquals(4, result0.records().size());
ApiMessage message = result0.records().get(0).message();
assertInstanceOf(AssignedS3ObjectIdRecord.class, message);
Expand All @@ -88,8 +91,7 @@ public void testPrepareObject() {
for (int i = 0; i < 3; i++) {
verifyPrepareObjectRecord(result0.records().get(i + 1), i, 60 * 1000);
}
manager.replay(assignedRecord);
result0.records().stream().skip(1).map(ApiMessageAndVersion::message).forEach(record -> manager.replay((S3ObjectRecord) record));
replay(manager, result0.records());

// verify the 3 objects are prepared
assertEquals(3, manager.objectsMetadata().size());
Expand All @@ -99,6 +101,46 @@ public void testPrepareObject() {
assertEquals(60 * 1000, s3Object.getExpiredTimeInMs() - s3Object.getPreparedTimeInMs());
});
assertEquals(3, manager.nextAssignedObjectId());

// 2. prepare 5 objects
ControllerResult<PrepareS3ObjectResponseData> result1 = manager.prepareObject(new PrepareS3ObjectRequestData()
.setBrokerId(BROKER1)
.setPreparedCount(5)
.setTimeToLiveInMs(60 * 1000));
assertEquals(Errors.NONE.code(), result1.response().errorCode());
assertEquals(3L, result1.response().firstS3ObjectId());
assertEquals(6, result1.records().size());
replay(manager, result1.records());

// verify the 5 objects are prepared
assertEquals(8, manager.objectsMetadata().size());
manager.objectsMetadata().forEach((id, s3Object) -> {
assertEquals(S3ObjectState.PREPARED, s3Object.getS3ObjectState());
assertEquals(id, s3Object.getObjectId());
assertEquals(60 * 1000, s3Object.getExpiredTimeInMs() - s3Object.getPreparedTimeInMs());
});
assertEquals(8, manager.nextAssignedObjectId());
}

private void replay(S3ObjectControlManager manager, List<ApiMessageAndVersion> records) {
List<ApiMessage> messages = records.stream().map(x -> x.message())
.collect(Collectors.toList());
for (ApiMessage message : messages) {
MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
switch (type) {
case ASSIGNED_S3_OBJECT_ID_RECORD:
manager.replay((AssignedS3ObjectIdRecord) message);
break;
case S3_OBJECT_RECORD:
manager.replay((S3ObjectRecord) message);
break;
case REMOVE_S3_OBJECT_RECORD:
manager.replay((RemoveS3ObjectRecord) message);
break;
default:
throw new IllegalStateException("Unknown metadata record type " + type);
}
}
}

private void verifyPrepareObjectRecord(ApiMessageAndVersion result, long expectedObjectId, long expectedTimeToLiveInMs) {
Expand Down