Skip to content

Commit

Permalink
HDDS-6147. Add ability in OM to get limited delta updates (#2956)
Browse files Browse the repository at this point in the history
  • Loading branch information
symious committed Jan 18, 2022
1 parent 9785941 commit cde7cb7
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 1 deletion.
Expand Up @@ -187,4 +187,14 @@ <KEY, VALUE> void move(KEY sourceKey, KEY destKey, VALUE value,
*/
DBUpdatesWrapper getUpdatesSince(long sequenceNumber)
throws SequenceNumberNotFoundException;

/**
* Get limited data written to DB since a specific sequence number.
* @param sequenceNumber
* @param limitCount
* @return
* @throws SequenceNumberNotFoundException
*/
DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount)
throws SequenceNumberNotFoundException;
}
Expand Up @@ -382,7 +382,15 @@ public CodecRegistry getCodecRegistry() {
@Override
public DBUpdatesWrapper getUpdatesSince(long sequenceNumber)
throws SequenceNumberNotFoundException {
return getUpdatesSince(sequenceNumber, Long.MAX_VALUE);
}

@Override
public DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount)
throws SequenceNumberNotFoundException {
if (limitCount <= 0) {
throw new IllegalArgumentException("Illegal count for getUpdatesSince.");
}
DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper();
try {
TransactionLogIterator transactionLogIterator =
Expand Down Expand Up @@ -415,6 +423,9 @@ public DBUpdatesWrapper getUpdatesSince(long sequenceNumber)
}
dbUpdatesWrapper.addWriteBatch(result.writeBatch().data(),
result.sequenceNumber());
if (currSequenceNumber - sequenceNumber >= limitCount) {
break;
}
transactionLogIterator.next();
}
} catch (RocksDBException e) {
Expand Down
Expand Up @@ -348,6 +348,30 @@ public void testGetDBUpdatesSince() throws Exception {
}
}

@Test
public void testGetDBUpdatesSinceWithLimitCount() throws Exception {

try (RDBStore newStore =
new RDBStore(folder.newFolder(), options, configSet)) {

try (Table firstTable = newStore.getTable(families.get(1))) {
firstTable.put(
org.apache.commons.codec.binary.StringUtils.getBytesUtf16("Key1"),
org.apache.commons.codec.binary.StringUtils
.getBytesUtf16("Value1"));
firstTable.put(
org.apache.commons.codec.binary.StringUtils.getBytesUtf16("Key2"),
org.apache.commons.codec.binary.StringUtils
.getBytesUtf16("Value2"));
}
Assert.assertTrue(
newStore.getDb().getLatestSequenceNumber() == 2);

DBUpdatesWrapper dbUpdatesSince = newStore.getUpdatesSince(0, 1);
Assert.assertEquals(1, dbUpdatesSince.getData().size());
}
}

@Test
public void testDowngrade() throws Exception {

Expand Down
Expand Up @@ -1098,6 +1098,7 @@ message ServiceListRequest {

message DBUpdatesRequest {
required uint64 sequenceNumber = 1;
optional uint64 limitCount = 2;
}

message ServiceListResponse {
Expand Down
Expand Up @@ -3489,8 +3489,12 @@ public boolean isRatisEnabled() {
public DBUpdates getDBUpdates(
DBUpdatesRequest dbUpdatesRequest)
throws SequenceNumberNotFoundException {
long limitCount = Long.MAX_VALUE;
if (dbUpdatesRequest.hasLimitCount()) {
limitCount = dbUpdatesRequest.getLimitCount();
}
DBUpdatesWrapper updatesSince = metadataManager.getStore()
.getUpdatesSince(dbUpdatesRequest.getSequenceNumber());
.getUpdatesSince(dbUpdatesRequest.getSequenceNumber(), limitCount);
DBUpdates dbUpdates = new DBUpdates(updatesSince.getData());
dbUpdates.setCurrentSequenceNumber(updatesSince.getCurrentSequenceNumber());
return dbUpdates;
Expand Down

0 comments on commit cde7cb7

Please sign in to comment.