Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
"versions": "0+",
"about": "The ID of the WAL S3 object to commit"
},
{
"name": "OrderId",
"type": "int64",
"versions": "0+",
"about": "The order ID of the WAL S3 object"
},
{
"name": "ObjectSize",
"type": "int64",
Expand Down Expand Up @@ -86,13 +92,13 @@
"name": "ObjectId",
"type": "int64",
"versions": "0+",
"about": "The ID of the WAL S3 object to commit"
"about": "The ID of the Stream object to commit"
},
{
"name": "ObjectSize",
"type": "int64",
"versions": "0+",
"about": "The size of the WAL S3 object to commit"
"about": "The size of the Stream object to commit"
},
{
"name": "StreamId",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.metadata.AccessControlEntryRecord;
import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord;
import org.apache.kafka.common.metadata.AssignedStreamIdRecord;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.BrokerWALMetadataRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
Expand All @@ -75,12 +78,23 @@
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.common.metadata.RangeRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
import org.apache.kafka.common.metadata.RemoveBrokerWALMetadataRecord;
import org.apache.kafka.common.metadata.RemoveRangeRecord;
import org.apache.kafka.common.metadata.RemoveS3ObjectRecord;
import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord;
import org.apache.kafka.common.metadata.RemoveS3StreamRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.RemoveWALObjectRecord;
import org.apache.kafka.common.metadata.S3ObjectRecord;
import org.apache.kafka.common.metadata.S3StreamObjectRecord;
import org.apache.kafka.common.metadata.S3StreamRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.metadata.WALObjectRecord;
import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
Expand Down Expand Up @@ -1451,6 +1465,53 @@ private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, lon
case ZK_MIGRATION_STATE_RECORD:
// TODO handle this
break;

// Kafka on S3 inject start

case S3_STREAM_RECORD:
streamControlManager.replay((S3StreamRecord) message);
break;
case REMOVE_S3_STREAM_RECORD:
streamControlManager.replay((RemoveS3StreamRecord) message);
break;
case RANGE_RECORD:
streamControlManager.replay((RangeRecord) message);
break;
case REMOVE_RANGE_RECORD:
streamControlManager.replay((RemoveRangeRecord) message);
break;
case S3_STREAM_OBJECT_RECORD:
streamControlManager.replay((S3StreamObjectRecord) message);
break;
case REMOVE_S3_STREAM_OBJECT_RECORD:
streamControlManager.replay((RemoveS3StreamObjectRecord) message);
break;
case WALOBJECT_RECORD:
streamControlManager.replay((WALObjectRecord) message);
break;
case REMOVE_WALOBJECT_RECORD:
streamControlManager.replay((RemoveWALObjectRecord) message);
break;
case S3_OBJECT_RECORD:
s3ObjectControlManager.replay((S3ObjectRecord) message);
break;
case REMOVE_S3_OBJECT_RECORD:
s3ObjectControlManager.replay((RemoveS3ObjectRecord) message);
break;
case ASSIGNED_STREAM_ID_RECORD:
streamControlManager.replay((AssignedStreamIdRecord) message);
break;
case ASSIGNED_S3_OBJECT_ID_RECORD:
s3ObjectControlManager.replay((AssignedS3ObjectIdRecord) message);
break;
case BROKER_WALMETADATA_RECORD:
streamControlManager.replay((BrokerWALMetadataRecord) message);
break;
case REMOVE_BROKER_WALMETADATA_RECORD:
streamControlManager.replay((RemoveBrokerWALMetadataRecord) message);
break;

// Kafka on S3 inject end
default:
throw new RuntimeException("Unhandled record type " + type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,27 @@ public ControllerResult<Boolean> commitObject(long objectId, long objectSize) {
new ApiMessageAndVersion(record, (short) 0)), true);
}

public ControllerResult<Boolean> markDestroyObjects(List<Long> objects) {
List<ApiMessageAndVersion> records = new ArrayList<>();
for (Long objectId : objects) {
S3Object object = this.objectsMetadata.get(objectId);
if (object == null) {
log.error("object {} not exist when mark destroy object", objectId);
// TODO: Maybe we can ignore this situation, because this object is already destroyed ?
return ControllerResult.of(Collections.emptyList(), false);
}
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(objectId)
.setObjectState(S3ObjectState.MARK_DESTROYED.toByte())
.setPreparedTimeInMs(object.getPreparedTimeInMs())
.setExpiredTimeInMs(object.getExpiredTimeInMs())
.setCommittedTimeInMs(object.getCommittedTimeInMs())
.setMarkDestroyedTimeInMs(System.currentTimeMillis());
records.add(new ApiMessageAndVersion(record, (short) 0));
}
return ControllerResult.atomicOf(records, true);
}

public void replay(AssignedS3ObjectIdRecord record) {
nextAssignedObjectId.set(record.assignedS3ObjectId() + 1);
}
Expand All @@ -180,7 +201,7 @@ public void replay(S3ObjectRecord record) {
GenerateContextV0 ctx = new GenerateContextV0(clusterId, record.objectId());
String objectKey = S3ObjectKeyGeneratorManager.getByVersion(0).generate(ctx);
S3Object object = new S3Object(record.objectId(), record.objectSize(), objectKey,
record.preparedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.destroyedTimeInMs(),
record.preparedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.markDestroyedTimeInMs(),
S3ObjectState.fromByte(record.objectState()));
objectsMetadata.put(record.objectId(), object);
// TODO: recover the prepared objects and mark destroyed objects when restart the controller
Expand Down Expand Up @@ -215,7 +236,7 @@ public ControllerResult<Void> checkS3ObjectsLifecycle() {
.setPreparedTimeInMs(obj.getPreparedTimeInMs())
.setExpiredTimeInMs(obj.getExpiredTimeInMs())
.setCommittedTimeInMs(obj.getCommittedTimeInMs())
.setDestroyedTimeInMs(obj.getDestroyedTimeInMs());
.setMarkDestroyedTimeInMs(obj.getMarkDestroyedTimeInMs());
// generate the records which mark the expired objects as destroyed
records.add(new ApiMessageAndVersion(record, (short) 0));
// generate the records which listener reply for the object-destroy events
Expand Down
Loading