Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ public class S3ObjectControlManager {
private final S3Config config;

/**
* The objectId of the next object to be applied. (start from 0)
* The objectId of the next object to be prepared. (start from 0)
*/
private Long nextApplyObjectId = 0L;
private Long nextAssignedObjectId = 0L;

// TODO: add timer task to periodically check if there are objects to be destroyed or expired
private final Queue<Long/*objectId*/> appliedObjects;
private final Queue<Long/*objectId*/> preparedObjects;
private final Queue<Long/*objectId*/> markDestroyedObjects;

public S3ObjectControlManager(
Expand All @@ -62,12 +62,12 @@ public S3ObjectControlManager(
this.clusterId = clusterId;
this.config = config;
this.objectsMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
this.appliedObjects = new LinkedList<>();
this.preparedObjects = new LinkedList<>();
this.markDestroyedObjects = new LinkedList<>();
}
public Long appliedObjectNum() {
return nextApplyObjectId;

public Long nextAssignedObjectId() {
return nextAssignedObjectId;
}

public void replay(S3ObjectRecord record) {
Expand All @@ -76,6 +76,7 @@ public void replay(S3ObjectRecord record) {
S3Object object = new S3Object(record.objectId(), record.objectSize(), objectKey,
record.appliedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.destroyedTimeInMs(), S3ObjectState.fromByte(record.objectState()));
objectsMetadata.put(record.objectId(), object);
nextAssignedObjectId = Math.max(nextAssignedObjectId, record.objectId() + 1);
}

public void replay(RemoveS3ObjectRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ public void onApply() {
if (this.s3ObjectState != S3ObjectState.UNINITIALIZED) {
throw new IllegalStateException("Object is not in UNINITIALIZED state");
}
this.s3ObjectState = S3ObjectState.APPLIED;
this.s3ObjectState = S3ObjectState.PREPARED;
this.appliedTimeInMs = Optional.of(System.currentTimeMillis());
}

public void onCreate(S3ObjectCommitContext createContext) {
// TODO: decide fetch object metadata from S3 or let broker send it to controller
if (this.s3ObjectState != S3ObjectState.APPLIED) {
if (this.s3ObjectState != S3ObjectState.PREPARED) {
throw new IllegalStateException("Object is not in APPLIED state");
}
this.s3ObjectState = S3ObjectState.COMMITTED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

public enum S3ObjectState {
UNINITIALIZED,
APPLIED,
PREPARED,
COMMITTED,
MARK_DESTROYED,
DESTROYED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class S3ObjectsImageTest {
static {
Map<Long/*objectId*/, SimplifiedS3Object> map = new HashMap<>();
for (int i = 0; i < 4; i++) {
SimplifiedS3Object object = new SimplifiedS3Object(i, S3ObjectState.APPLIED);
SimplifiedS3Object object = new SimplifiedS3Object(i, S3ObjectState.PREPARED);
map.put(object.objectId(), object);
}
IMAGE1 = new S3ObjectsImage(map);
Expand All @@ -69,15 +69,15 @@ public class S3ObjectsImageTest {
.setObjectId(3L), (short) 0));
DELTA1_RECORDS.add(new ApiMessageAndVersion(new S3ObjectRecord().
setObjectId(4L).
setObjectState((byte) S3ObjectState.APPLIED.ordinal()), (short) 0));
setObjectState((byte) S3ObjectState.PREPARED.ordinal()), (short) 0));
DELTA1 = new S3ObjectsDelta(IMAGE1);
RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);

Map<Long/*objectId*/, SimplifiedS3Object> map2 = new HashMap<>();
map2.put(0L, new SimplifiedS3Object(0L, S3ObjectState.COMMITTED));
map2.put(1L, new SimplifiedS3Object(1L, S3ObjectState.COMMITTED));
map2.put(2L, new SimplifiedS3Object(2L, S3ObjectState.MARK_DESTROYED));
map2.put(4L, new SimplifiedS3Object(4L, S3ObjectState.APPLIED));
map2.put(4L, new SimplifiedS3Object(4L, S3ObjectState.PREPARED));

IMAGE2 = new S3ObjectsImage(map2);
}
Expand Down