Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-6147. Add ability in OM to get limited delta updates #2956

Merged
merged 5 commits into from Jan 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -187,4 +187,14 @@ <KEY, VALUE> void move(KEY sourceKey, KEY destKey, VALUE value,
*/
DBUpdatesWrapper getUpdatesSince(long sequenceNumber)
throws SequenceNumberNotFoundException;

/**
* Get limited data written to DB since a specific sequence number.
* @param sequenceNumber
* @param limitCount
* @return
* @throws SequenceNumberNotFoundException
*/
DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount)
throws SequenceNumberNotFoundException;
}
Expand Up @@ -382,7 +382,15 @@ public CodecRegistry getCodecRegistry() {
@Override
public DBUpdatesWrapper getUpdatesSince(long sequenceNumber)
throws SequenceNumberNotFoundException {
return getUpdatesSince(sequenceNumber, Long.MAX_VALUE);
}

@Override
public DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount)
throws SequenceNumberNotFoundException {
if (limitCount <= 0) {
throw new IllegalArgumentException("Illegal count for getUpdatesSince.");
}
DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper();
try {
TransactionLogIterator transactionLogIterator =
Expand Down Expand Up @@ -415,6 +423,9 @@ public DBUpdatesWrapper getUpdatesSince(long sequenceNumber)
}
dbUpdatesWrapper.addWriteBatch(result.writeBatch().data(),
result.sequenceNumber());
if (currSequenceNumber - sequenceNumber >= limitCount) {
break;
}
transactionLogIterator.next();
}
} catch (RocksDBException e) {
Expand Down
Expand Up @@ -348,6 +348,30 @@ public void testGetDBUpdatesSince() throws Exception {
}
}

@Test
public void testGetDBUpdatesSinceWithLimitCount() throws Exception {

try (RDBStore newStore =
new RDBStore(folder.newFolder(), options, configSet)) {

try (Table firstTable = newStore.getTable(families.get(1))) {
firstTable.put(
org.apache.commons.codec.binary.StringUtils.getBytesUtf16("Key1"),
org.apache.commons.codec.binary.StringUtils
.getBytesUtf16("Value1"));
firstTable.put(
org.apache.commons.codec.binary.StringUtils.getBytesUtf16("Key2"),
org.apache.commons.codec.binary.StringUtils
.getBytesUtf16("Value2"));
}
Assert.assertTrue(
newStore.getDb().getLatestSequenceNumber() == 2);

DBUpdatesWrapper dbUpdatesSince = newStore.getUpdatesSince(0, 1);
Assert.assertEquals(1, dbUpdatesSince.getData().size());
}
}

@Test
public void testDowngrade() throws Exception {

Expand Down
Expand Up @@ -1098,6 +1098,7 @@ message ServiceListRequest {

message DBUpdatesRequest {
required uint64 sequenceNumber = 1;
optional uint64 limitCount = 2;
}

message ServiceListResponse {
Expand Down
Expand Up @@ -3489,8 +3489,12 @@ public boolean isRatisEnabled() {
public DBUpdates getDBUpdates(
DBUpdatesRequest dbUpdatesRequest)
throws SequenceNumberNotFoundException {
long limitCount = Long.MAX_VALUE;
if (dbUpdatesRequest.hasLimitCount()) {
limitCount = dbUpdatesRequest.getLimitCount();
}
DBUpdatesWrapper updatesSince = metadataManager.getStore()
.getUpdatesSince(dbUpdatesRequest.getSequenceNumber());
.getUpdatesSince(dbUpdatesRequest.getSequenceNumber(), limitCount);
DBUpdates dbUpdates = new DBUpdates(updatesSince.getData());
dbUpdates.setCurrentSequenceNumber(updatesSince.getCurrentSequenceNumber());
return dbUpdates;
Expand Down