Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,4 @@ public StreamControlManager(
this.brokersMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,25 @@ public abstract class S3Object implements Comparable<S3Object> {

protected Optional<String> objectAddress = Optional.empty();

protected Optional<Long> applyTimeInMs = Optional.empty();

protected Optional<Long> createTimeInMs = Optional.empty();

protected Optional<Long> destroyTimeInMs = Optional.empty();
/**
* The time when broker apply the object
*/
protected Optional<Long> appliedTimeInMs = Optional.empty();

/**
* The time when the object will be expired if it is not be committed
*/
protected Optional<Long> expiredTimeInMs = Optional.empty();

/**
* The time when the object is committed
*/
protected Optional<Long> committedTimeInMs = Optional.empty();

/**
* The time when the object is destroyed
*/
protected Optional<Long> destroyedTimeInMs = Optional.empty();

protected S3ObjectState s3ObjectState = S3ObjectState.UNINITIALIZED;

Expand All @@ -49,17 +63,19 @@ protected S3Object(
final Long objectId,
final Long objectSize,
final String objectAddress,
final Long applyTimeInMs,
final Long createTimeInMs,
final Long destroyTimeInMs,
final Long appliedTimeInMs,
final Long expiredTimeInMs,
final Long committedTimeInMs,
final Long destroyedTimeInMs,
final S3ObjectState s3ObjectState,
final S3ObjectType objectType) {
this.objectId = objectId;
this.objectSize = Optional.of(objectSize);
this.objectAddress = Optional.of(objectAddress);
this.applyTimeInMs = Optional.of(applyTimeInMs);
this.createTimeInMs = Optional.of(createTimeInMs);
this.destroyTimeInMs = Optional.of(destroyTimeInMs);
this.appliedTimeInMs = Optional.of(appliedTimeInMs);
this.expiredTimeInMs = Optional.of(expiredTimeInMs);
this.committedTimeInMs = Optional.of(committedTimeInMs);
this.destroyedTimeInMs = Optional.of(destroyedTimeInMs);
this.objectType = objectType;
this.s3ObjectState = s3ObjectState;
}
Expand All @@ -69,16 +85,16 @@ public void onApply() {
throw new IllegalStateException("Object is not in UNINITIALIZED state");
}
this.s3ObjectState = S3ObjectState.APPLIED;
this.applyTimeInMs = Optional.of(System.currentTimeMillis());
this.appliedTimeInMs = Optional.of(System.currentTimeMillis());
}

public void onCreate(S3ObjectCreateContext createContext) {
public void onCreate(S3ObjectCommitContext createContext) {
// TODO: decide fetch object metadata from S3 or let broker send it to controller
if (this.s3ObjectState != S3ObjectState.APPLIED) {
throw new IllegalStateException("Object is not in APPLIED state");
}
this.s3ObjectState = S3ObjectState.CREATED;
this.createTimeInMs = Optional.of(createContext.createTimeInMs);
this.committedTimeInMs = Optional.of(createContext.committedTimeInMs);
this.objectSize = Optional.of(createContext.objectSize);
this.objectAddress = Optional.of(createContext.objectAddress);
this.objectType = createContext.objectType;
Expand All @@ -103,19 +119,19 @@ public S3ObjectType getObjectType() {
return objectType;
}

public class S3ObjectCreateContext {
public class S3ObjectCommitContext {

private final Long createTimeInMs;
private final Long committedTimeInMs;
private final Long objectSize;
private final String objectAddress;
private final S3ObjectType objectType;

public S3ObjectCreateContext(
final Long createTimeInMs,
public S3ObjectCommitContext(
final Long committedTimeInMs,
final Long objectSize,
final String objectAddress,
final S3ObjectType objectType) {
this.createTimeInMs = createTimeInMs;
this.committedTimeInMs = committedTimeInMs;
this.objectSize = objectSize;
this.objectAddress = objectAddress;
this.objectType = objectType;
Expand Down Expand Up @@ -156,16 +172,16 @@ public Optional<String> getObjectAddress() {
return objectAddress;
}

public Optional<Long> getApplyTimeInMs() {
return applyTimeInMs;
public Optional<Long> getAppliedTimeInMs() {
return appliedTimeInMs;
}

public Optional<Long> getCreateTimeInMs() {
return createTimeInMs;
public Optional<Long> getCommittedTimeInMs() {
return committedTimeInMs;
}

public Optional<Long> getDestroyTimeInMs() {
return destroyTimeInMs;
public Optional<Long> getDestroyedTimeInMs() {
return destroyedTimeInMs;
}

public S3ObjectState getS3ObjectState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ public S3StreamObject(final Long objectId) {
}

@Override
public void onCreate(S3ObjectCreateContext createContext) {
public void onCreate(S3ObjectCommitContext createContext) {
super.onCreate(createContext);
if (!(createContext instanceof StreamObjectCreateContext)) {
if (!(createContext instanceof StreamObjectCommitContext)) {
throw new IllegalArgumentException();
}
this.streamIndex = ((StreamObjectCreateContext) createContext).streamIndex;
this.streamIndex = ((StreamObjectCommitContext) createContext).streamIndex;
}

@Override
Expand All @@ -49,11 +49,11 @@ public int compareTo(S3Object o) {
return res == 0 ? this.streamIndex.getStartOffset().compareTo(s3StreamObject.streamIndex.getStartOffset()) : res;
}

class StreamObjectCreateContext extends S3ObjectCreateContext {
class StreamObjectCommitContext extends S3ObjectCommitContext {

private final S3ObjectStreamIndex streamIndex;

public StreamObjectCreateContext(
public StreamObjectCommitContext(
final Long createTimeInMs,
final Long objectSize,
final String objectAddress,
Expand All @@ -74,9 +74,10 @@ public ApiMessageAndVersion toRecord() {
.setStreamId(streamIndex.getStreamId())
.setObjectState((byte) s3ObjectState.ordinal())
.setObjectType((byte) objectType.ordinal())
.setApplyTimeInMs(applyTimeInMs.get())
.setCreateTimeInMs(createTimeInMs.get())
.setDestroyTimeInMs(destroyTimeInMs.get())
.setAppliedTimeInMs(appliedTimeInMs.get())
.setExpiredTimeInMs(expiredTimeInMs.get())
.setCommittedTimeInMs(committedTimeInMs.get())
.setDestroyedTimeInMs(destroyedTimeInMs.get())
.setObjectSize(objectSize.get())
.setStartOffset(streamIndex.getStartOffset())
.setEndOffset(streamIndex.getEndOffset()), (short) 0);
Expand All @@ -86,9 +87,10 @@ public static S3StreamObject of(S3StreamObjectRecord record) {
S3StreamObject s3StreamObject = new S3StreamObject(record.objectId());
s3StreamObject.objectType = S3ObjectType.fromByte(record.objectType());
s3StreamObject.s3ObjectState = S3ObjectState.fromByte(record.objectState());
s3StreamObject.applyTimeInMs = Optional.of(record.applyTimeInMs());
s3StreamObject.createTimeInMs = Optional.of(record.createTimeInMs());
s3StreamObject.destroyTimeInMs = Optional.of(record.destroyTimeInMs());
s3StreamObject.appliedTimeInMs = Optional.of(record.appliedTimeInMs());
s3StreamObject.expiredTimeInMs = Optional.of(record.expiredTimeInMs());
s3StreamObject.committedTimeInMs = Optional.of(record.committedTimeInMs());
s3StreamObject.destroyedTimeInMs = Optional.of(record.destroyedTimeInMs());
s3StreamObject.objectSize = Optional.of(record.objectSize());
s3StreamObject.streamIndex = new S3ObjectStreamIndex(record.streamId(), record.startOffset(), record.endOffset());
return s3StreamObject;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,36 +39,37 @@ private S3WALObject(
final Long objectSize,
final String objectAddress,
final Long applyTimeInMs,
final Long createTimeInMs,
final Long expiredTimeImMs,
final Long commitTimeInMs,
final Long destroyTimeInMs,
final S3ObjectState s3ObjectState,
final S3ObjectType objectType,
final Integer brokerId,
final List<S3ObjectStreamIndex> streamsIndex) {
super(objectId, objectSize, objectAddress, applyTimeInMs, createTimeInMs, destroyTimeInMs, s3ObjectState, objectType);
super(objectId, objectSize, objectAddress, applyTimeInMs, expiredTimeImMs, commitTimeInMs, destroyTimeInMs, s3ObjectState, objectType);
this.objectType = objectType;
this.brokerId = brokerId;
this.streamsIndex = streamsIndex.stream().collect(
Collectors.toMap(S3ObjectStreamIndex::getStreamId, index -> index));
}

@Override
public void onCreate(S3ObjectCreateContext createContext) {
public void onCreate(S3ObjectCommitContext createContext) {
super.onCreate(createContext);
if (!(createContext instanceof WALObjectCreateContext)) {
if (!(createContext instanceof WALObjectCommitContext)) {
throw new IllegalArgumentException();
}
WALObjectCreateContext walCreateContext = (WALObjectCreateContext) createContext;
WALObjectCommitContext walCreateContext = (WALObjectCommitContext) createContext;
this.streamsIndex = walCreateContext.streamIndexList.stream().collect(Collectors.toMap(S3ObjectStreamIndex::getStreamId, index -> index));
this.brokerId = walCreateContext.brokerId;
}

class WALObjectCreateContext extends S3ObjectCreateContext {
class WALObjectCommitContext extends S3ObjectCommitContext {

private final List<S3ObjectStreamIndex> streamIndexList;
private final Integer brokerId;

public WALObjectCreateContext(
public WALObjectCommitContext(
final Long createTimeInMs,
final Long objectSize,
final String objectAddress,
Expand All @@ -86,9 +87,10 @@ public ApiMessageAndVersion toRecord() {
.setObjectId(objectId)
.setObjectState((byte) s3ObjectState.ordinal())
.setObjectType((byte) objectType.ordinal())
.setApplyTimeInMs(applyTimeInMs.get())
.setCreateTimeInMs(createTimeInMs.get())
.setDestroyTimeInMs(destroyTimeInMs.get())
.setAppliedTimeInMs(appliedTimeInMs.get())
.setExpiredTimeInMs(expiredTimeInMs.get())
.setCommittedTimeInMs(committedTimeInMs.get())
.setDestroyedTimeInMs(destroyedTimeInMs.get())
.setObjectSize(objectSize.get())
.setStreamsIndex(
streamsIndex.values().stream()
Expand All @@ -99,7 +101,7 @@ public ApiMessageAndVersion toRecord() {
public static S3WALObject of(WALObjectRecord record) {
S3WALObject s3WalObject = new S3WALObject(
record.objectId(), record.objectSize(), null,
record.applyTimeInMs(), record.createTimeInMs(), record.destroyTimeInMs(),
record.appliedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.destroyedTimeInMs(),
S3ObjectState.fromByte(record.objectState()), S3ObjectType.fromByte(record.objectType()),
record.brokerId(), record.streamsIndex().stream().map(S3ObjectStreamIndex::of).collect(Collectors.toList()));
return s3WalObject;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,25 @@
"about": "The object size of the S3 object"
},
{
"name": "ApplyTimeInMs",
"name": "AppliedTimeInMs",
"type": "int64",
"versions": "0+",
"about": "The object be applied timestamp"
},
{
"name": "CreateTimeInMs",
"name": "ExpiredTimeInMs",
"type": "int64",
"versions": "0+",
"about": "The object be created timestamp"
"about": "The object be expired timestamp"
},
{
"name": "DestroyTimeInMs",
"name": "CommittedTimeInMs",
"type": "int64",
"versions": "0+",
"about": "The object be committed timestamp"
},
{
"name": "DestroyedTimeInMs",
"type": "int64",
"versions": "0+",
"about": "The object be destroyed timestamp"
Expand Down
14 changes: 10 additions & 4 deletions metadata/src/main/resources/common/metadata/WALObjectRecord.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,25 @@
"about": "The object size of the S3 object"
},
{
"name": "ApplyTimeInMs",
"name": "AppliedTimeInMs",
"type": "int64",
"versions": "0+",
"about": "The object be applied timestamp"
},
{
"name": "CreateTimeInMs",
"name": "ExpiredTimeInMs",
"type": "int64",
"versions": "0+",
"about": "The object be created timestamp"
"about": "The object be expired timestamp"
},
{
"name": "DestroyTimeInMs",
"name": "CommittedTimeInMs",
"type": "int64",
"versions": "0+",
"about": "The object be committed timestamp"
},
{
"name": "DestroyedTimeInMs",
"type": "int64",
"versions": "0+",
"about": "The object be destroyed timestamp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,19 +141,19 @@ public void testBasicChange() {
WALObjectRecord walObjectRecord0 = new WALObjectRecord()
.setBrokerId(brokerId0)
.setObjectId(0L)
.setApplyTimeInMs(System.currentTimeMillis())
.setAppliedTimeInMs(System.currentTimeMillis())
.setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal())
.setObjectState((byte) S3ObjectState.APPLIED.ordinal());
WALObjectRecord walObjectRecord1 = new WALObjectRecord()
.setBrokerId(brokerId1)
.setObjectId(1L)
.setApplyTimeInMs(System.currentTimeMillis())
.setAppliedTimeInMs(System.currentTimeMillis())
.setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal())
.setObjectState((byte) S3ObjectState.APPLIED.ordinal());
WALObjectRecord walObjectRecord2 = new WALObjectRecord()
.setBrokerId(brokerId1)
.setObjectId(2L)
.setApplyTimeInMs(System.currentTimeMillis())
.setAppliedTimeInMs(System.currentTimeMillis())
.setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal())
.setObjectState((byte) S3ObjectState.APPLIED.ordinal());
records.clear();
Expand Down Expand Up @@ -197,7 +197,7 @@ public void testBasicChange() {
.setBrokerId(brokerId1)
.setObjectId(1L)
.setObjectSize(WAL_LOOSE_SIZE)
.setCreateTimeInMs(System.currentTimeMillis())
.setCommittedTimeInMs(System.currentTimeMillis())
.setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal())
.setStreamsIndex(streamIndicesInWALObject1.stream().map(S3ObjectStreamIndex::toRecordStreamIndex).collect(
Collectors.toList()))
Expand All @@ -211,7 +211,7 @@ public void testBasicChange() {
.setBrokerId(brokerId1)
.setObjectId(2L)
.setObjectSize(WAL_LOOSE_SIZE)
.setCreateTimeInMs(System.currentTimeMillis())
.setCommittedTimeInMs(System.currentTimeMillis())
.setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal())
.setStreamsIndex(streamIndicesInWALObject2.stream().map(S3ObjectStreamIndex::toRecordStreamIndex).collect(
Collectors.toList()))
Expand Down Expand Up @@ -284,9 +284,9 @@ public void testBasicChange() {
.setObjectId(3L)
.setBrokerId(brokerId1)
.setObjectType((byte) S3ObjectType.WAL_MINOR.ordinal())
.setCreateTimeInMs(System.currentTimeMillis())
.setCommittedTimeInMs(System.currentTimeMillis())
.setObjectState((byte) S3ObjectState.CREATED.ordinal())
.setApplyTimeInMs(System.currentTimeMillis())
.setAppliedTimeInMs(System.currentTimeMillis())
.setObjectSize(WAL_MINOR_COMPACT_SIZE)
.setStreamsIndex(streamIndicesInWALObject3.stream().map(S3ObjectStreamIndex::toRecordStreamIndex).collect(
Collectors.toList()));
Expand Down Expand Up @@ -335,15 +335,15 @@ public void testBasicChange() {
.setStreamId(streamId0)
.setObjectSize(STREAM_OBJECT_SIZE)
.setObjectType((byte) S3ObjectType.STREAM.ordinal())
.setCreateTimeInMs(System.currentTimeMillis())
.setCommittedTimeInMs(System.currentTimeMillis())
.setStartOffset(s3ObjectStreamIndex4.getStartOffset())
.setEndOffset(s3ObjectStreamIndex4.getEndOffset());
S3StreamObjectRecord streamObjectRecord5 = new S3StreamObjectRecord()
.setObjectId(5L)
.setStreamId(streamId1)
.setObjectSize(STREAM_OBJECT_SIZE)
.setObjectType((byte) S3ObjectType.STREAM.ordinal())
.setCreateTimeInMs(System.currentTimeMillis())
.setCommittedTimeInMs(System.currentTimeMillis())
.setStartOffset(s3ObjectStreamIndex5.getStartOffset())
.setEndOffset(s3ObjectStreamIndex5.getEndOffset());
RemoveWALObjectRecord removeWALObjectRecord3 = new RemoveWALObjectRecord()
Expand Down