Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,11 @@ public final class OzoneConfigKeys {

public static final boolean OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF_DEFAULT = false;

public static final String OZONE_OM_DELTA_UPDATE_DATA_SIZE_MAX_LIMIT =
"ozone.om.delta.update.data.size.max.limit";
public static final String
OZONE_OM_DELTA_UPDATE_DATA_SIZE_MAX_LIMIT_DEFAULT = "1024MB";

/**
* There is no need to instantiate this class.
*/
Expand Down
12 changes: 12 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2851,6 +2851,18 @@
The actual fetched data might be larger than this limit.
</description>
</property>
<property>
<name>ozone.om.delta.update.data.size.max.limit</name>
<value>1024MB</value>
<tag>OM, MANAGEMENT</tag>
<description>
Recon get a limited delta updates from OM periodically since sequence number.
Based on sequence number passed, OM DB delta update may have large number of
log files and each log batch data may be huge depending on frequent writes and
updates by ozone client, so to avoid increase in heap memory, this config is used
as limiting factor of default 1 GB while preparing DB updates object.
</description>
</property>
<property>
<name>recon.om.delta.update.loop.limit</name>
<value>10</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS_OFF;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_DELTA_UPDATE_DATA_SIZE_MAX_LIMIT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_DELTA_UPDATE_DATA_SIZE_MAX_LIMIT_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL;
Expand Down Expand Up @@ -105,6 +107,9 @@ public final class DBStoreBuilder {
private boolean enableCompactionLog;
private long maxTimeAllowedForSnapshotInDag;
private long pruneCompactionDagDaemonRunInterval;
// this is to track the total size of dbUpdates data since sequence
// number in request to avoid increase in heap memory.
private long maxDbUpdatesSizeThreshold;

/**
* Create DBStoreBuilder from a generic DBDefinition.
Expand Down Expand Up @@ -162,6 +167,10 @@ private DBStoreBuilder(ConfigurationSource configuration,
OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL,
OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_DAG_DAEMON_RUN_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);

this.maxDbUpdatesSizeThreshold = (long) configuration.getStorageSize(
OZONE_OM_DELTA_UPDATE_DATA_SIZE_MAX_LIMIT,
OZONE_OM_DELTA_UPDATE_DATA_SIZE_MAX_LIMIT_DEFAULT, StorageUnit.BYTES);
}

private void applyDBDefinition(DBDefinition definition) {
Expand Down Expand Up @@ -219,7 +228,7 @@ public DBStore build() throws IOException {
return new RDBStore(dbFile, rocksDBOption, writeOptions, tableConfigs,
registry, openReadOnly, maxFSSnapshots, dbJmxBeanNameName,
enableCompactionLog, maxTimeAllowedForSnapshotInDag,
pruneCompactionDagDaemonRunInterval);
pruneCompactionDagDaemonRunInterval, maxDbUpdatesSizeThreshold);
} finally {
tableConfigs.forEach(TableConfig::close);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class DBUpdatesWrapper {
private List<byte[]> dataList = new ArrayList<>();
private long currentSequenceNumber = -1;
private long latestSequenceNumber = -1;
private boolean isDBUpdateSuccess = true;

public void addWriteBatch(byte[] data, long sequenceNumber) {
dataList.add(data);
Expand Down Expand Up @@ -57,5 +58,13 @@ public void setLatestSequenceNumber(long sequenceNumber) {
public long getLatestSequenceNumber() {
return latestSequenceNumber;
}

public boolean isDBUpdateSuccess() {
return isDBUpdateSuccess;
}

public void setDBUpdateSuccess(boolean dbUpdateSuccess) {
this.isDBUpdateSuccess = dbUpdateSuccess;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,18 @@ public class RDBStore implements DBStore {
*/
private final String dbCompactionLogDirName = "compaction-log";

// this is to track the total size of dbUpdates data since sequence
// number in request to avoid increase in heap memory.
private long maxDbUpdatesSizeThreshold;

@VisibleForTesting
public RDBStore(File dbFile, ManagedDBOptions options,
Set<TableConfig> families) throws IOException {
Set<TableConfig> families, long maxDbUpdatesSizeThreshold)
throws IOException {
this(dbFile, options, new ManagedWriteOptions(), families,
new CodecRegistry(), false, 1000, null, false,
TimeUnit.DAYS.toMillis(1), TimeUnit.HOURS.toMillis(1));
TimeUnit.DAYS.toMillis(1), TimeUnit.HOURS.toMillis(1),
maxDbUpdatesSizeThreshold);
}

@SuppressWarnings("parameternumber")
Expand All @@ -92,11 +98,13 @@ public RDBStore(File dbFile, ManagedDBOptions dbOptions,
CodecRegistry registry, boolean readOnly, int maxFSSnapshots,
String dbJmxBeanNameName, boolean enableCompactionLog,
long maxTimeAllowedForSnapshotInDag,
long compactionDagDaemonInterval)
long compactionDagDaemonInterval,
long maxDbUpdatesSizeThreshold)
throws IOException {
Preconditions.checkNotNull(dbFile, "DB file location cannot be null");
Preconditions.checkNotNull(families);
Preconditions.checkArgument(!families.isEmpty());
this.maxDbUpdatesSizeThreshold = maxDbUpdatesSizeThreshold;
codecRegistry = registry;
dbLocation = dbFile;
dbJmxBeanName = dbJmxBeanNameName == null ? dbFile.getName() :
Expand Down Expand Up @@ -363,6 +371,7 @@ public DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount)
if (limitCount <= 0) {
throw new IllegalArgumentException("Illegal count for getUpdatesSince.");
}
long cumulativeDBUpdateLogBatchSize = 0L;
DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper();
try (ManagedTransactionLogIterator logIterator =
db.getUpdatesSince(sequenceNumber)) {
Expand Down Expand Up @@ -407,6 +416,10 @@ public DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount)
if (currSequenceNumber - sequenceNumber >= limitCount) {
break;
}
cumulativeDBUpdateLogBatchSize += result.writeBatch().getDataSize();
if (cumulativeDBUpdateLogBatchSize >= maxDbUpdatesSizeThreshold) {
break;
}
} finally {
result.writeBatch().close();
}
Expand All @@ -416,13 +429,15 @@ public DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount)
LOG.warn("Unable to get delta updates since sequenceNumber {}. "
+ "This exception will be thrown to the client",
sequenceNumber, e);
dbUpdatesWrapper.setDBUpdateSuccess(false);
// Throw the exception back to Recon. Expect Recon to fall back to
// full snapshot.
throw e;
} catch (RocksDBException | IOException e) {
LOG.error("Unable to get delta updates since sequenceNumber {}. "
+ "This exception will not be thrown to the client ",
sequenceNumber, e);
dbUpdatesWrapper.setDBUpdateSuccess(false);
}
dbUpdatesWrapper.setLatestSequenceNumber(db.getLatestSequenceNumber());
return dbUpdatesWrapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
* RDBStore Tests.
*/
public class TestRDBStore {
public static final int MAX_DB_UPDATES_SIZE_THRESHOLD = 80;
private final List<String> families =
Arrays.asList(StringUtils.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
"First", "Second", "Third",
Expand All @@ -74,7 +75,8 @@ public void setUp(@TempDir File tempDir) throws Exception {
new ManagedColumnFamilyOptions());
configSet.add(newConfig);
}
rdbStore = new RDBStore(tempDir, options, configSet);
rdbStore = new RDBStore(tempDir, options, configSet,
MAX_DB_UPDATES_SIZE_THRESHOLD);
}

@AfterEach
Expand Down Expand Up @@ -262,7 +264,7 @@ public void testRocksDBCheckpoint() throws Exception {

RDBStore restoredStoreFromCheckPoint =
new RDBStore(checkpoint.getCheckpointLocation().toFile(),
options, configSet);
options, configSet, MAX_DB_UPDATES_SIZE_THRESHOLD);

// Let us make sure that our estimate is not off by 10%
Assertions.assertTrue(
Expand Down Expand Up @@ -318,11 +320,24 @@ public void testGetDBUpdatesSinceWithLimitCount() throws Exception {
org.apache.commons.codec.binary.StringUtils.getBytesUtf16("Key2"),
org.apache.commons.codec.binary.StringUtils
.getBytesUtf16("Value2"));
firstTable.put(
org.apache.commons.codec.binary.StringUtils.getBytesUtf16("Key3"),
org.apache.commons.codec.binary.StringUtils
.getBytesUtf16("Value3"));
firstTable.put(
org.apache.commons.codec.binary.StringUtils.getBytesUtf16("Key4"),
org.apache.commons.codec.binary.StringUtils
.getBytesUtf16("Value4"));
firstTable.put(
org.apache.commons.codec.binary.StringUtils.getBytesUtf16("Key5"),
org.apache.commons.codec.binary.StringUtils
.getBytesUtf16("Value5"));
}
Assertions.assertEquals(2, rdbStore.getDb().getLatestSequenceNumber());
Assertions.assertEquals(5, rdbStore.getDb().getLatestSequenceNumber());

DBUpdatesWrapper dbUpdatesSince = rdbStore.getUpdatesSince(0, 1);
Assertions.assertEquals(1, dbUpdatesSince.getData().size());
DBUpdatesWrapper dbUpdatesSince = rdbStore.getUpdatesSince(0, 5);
Assertions.assertEquals(2, dbUpdatesSince.getData().size());
Assertions.assertEquals(2, dbUpdatesSince.getCurrentSequenceNumber());
}

@Test
Expand Down Expand Up @@ -352,7 +367,8 @@ public void testDowngrade() throws Exception {
new ManagedColumnFamilyOptions());
configSet.add(newConfig);
}
rdbStore = new RDBStore(rdbStore.getDbLocation(), options, configSet);
rdbStore = new RDBStore(rdbStore.getDbLocation(), options, configSet,
MAX_DB_UPDATES_SIZE_THRESHOLD);
for (String family : familiesMinusOne) {
try (Table table = rdbStore.getTable(family)) {
Assertions.assertNotNull(table, family + "is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
* Tests for RocksDBTable Store.
*/
public class TestRDBTableStore {
public static final int MAX_DB_UPDATES_SIZE_THRESHOLD = 80;
private static int count = 0;
private final List<String> families =
Arrays.asList(StringUtils.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
Expand Down Expand Up @@ -113,7 +114,8 @@ public void setUp() throws Exception {
TableConfig newConfig = new TableConfig(name, cfOptions);
configSet.add(newConfig);
}
rdbStore = new RDBStore(tempDir, options, configSet);
rdbStore = new RDBStore(tempDir, options, configSet,
MAX_DB_UPDATES_SIZE_THRESHOLD);
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
* Tests for RocksDBTable Store.
*/
public class TestTypedRDBTableStore {
public static final int MAX_DB_UPDATES_SIZE_THRESHOLD = 80;
private static int count = 0;
private final List<String> families =
Arrays.asList(StringUtils.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
Expand Down Expand Up @@ -79,7 +80,8 @@ public void setUp(@TempDir File tempDir) throws Exception {
new ManagedColumnFamilyOptions());
configSet.add(newConfig);
}
rdbStore = new RDBStore(tempDir, options, configSet);
rdbStore = new RDBStore(tempDir, options, configSet,
MAX_DB_UPDATES_SIZE_THRESHOLD);

codecRegistry = new CodecRegistry();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class DBUpdates {

private long latestSequenceNumber = -1L;

private boolean isDBUpdateSuccess = true;

public DBUpdates() {
this.dataList = new ArrayList<>();
}
Expand Down Expand Up @@ -65,4 +67,12 @@ public void setLatestSequenceNumber(long sequenceNumber) {
public long getLatestSequenceNumber() {
return latestSequenceNumber;
}

public boolean isDBUpdateSuccess() {
return isDBUpdateSuccess;
}

public void setDBUpdateSuccess(boolean dbUpdateSuccess) {
this.isDBUpdateSuccess = dbUpdateSuccess;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1339,6 +1339,7 @@ message DBUpdatesResponse {
required uint64 sequenceNumber = 1;
repeated bytes data = 2;
optional uint64 latestSequenceNumber = 3;
optional bool dbUpdateSuccess = 4;
}

message RangerBGSyncRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3980,6 +3980,7 @@ public DBUpdates getDBUpdates(
DBUpdates dbUpdates = new DBUpdates(updatesSince.getData());
dbUpdates.setCurrentSequenceNumber(updatesSince.getCurrentSequenceNumber());
dbUpdates.setLatestSequenceNumber(updatesSince.getLatestSequenceNumber());
dbUpdates.setDBUpdateSuccess(updatesSince.isDBUpdateSuccess());
return dbUpdates;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ private DBUpdatesResponse getOMDBUpdates(
}
builder.setSequenceNumber(dbUpdatesWrapper.getCurrentSequenceNumber());
builder.setLatestSequenceNumber(dbUpdatesWrapper.getLatestSequenceNumber());
builder.setDbUpdateSuccess(dbUpdatesWrapper.isDBUpdateSuccess());
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,15 @@ void getAndApplyDeltaUpdatesFromOM(
long inLoopLatestSequenceNumber;
while (loopCount < deltaUpdateLoopLimit &&
deltaUpdateCnt >= deltaUpdateLimit) {
innerGetAndApplyDeltaUpdatesFromOM(
inLoopStartSequenceNumber, omdbUpdatesHandler);
if (!innerGetAndApplyDeltaUpdatesFromOM(
inLoopStartSequenceNumber, omdbUpdatesHandler)) {
LOG.error(
"Retrieve OM DB delta update failed for sequence number : {}, " +
"so falling back to full snapshot.", inLoopStartSequenceNumber);
throw new RocksDBException(
"Unable to get delta updates since sequenceNumber - " +
inLoopStartSequenceNumber);
}
inLoopLatestSequenceNumber = getCurrentOMDBSequenceNumber();
deltaUpdateCnt = inLoopLatestSequenceNumber - inLoopStartSequenceNumber;
inLoopStartSequenceNumber = inLoopLatestSequenceNumber;
Expand All @@ -433,7 +440,7 @@ void getAndApplyDeltaUpdatesFromOM(
* @throws RocksDBException when writing to RocksDB fails.
*/
@VisibleForTesting
void innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber,
boolean innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber,
OMDBUpdatesHandler omdbUpdatesHandler)
throws IOException, RocksDBException {
DBUpdatesRequest dbUpdatesRequest = DBUpdatesRequest.newBuilder()
Expand Down Expand Up @@ -469,6 +476,7 @@ void innerGetAndApplyDeltaUpdatesFromOM(long fromSequenceNumber,
LOG.info("Number of updates received from OM : {}, " +
"SequenceNumber diff: {}, SequenceNumber Lag from OM {}.",
numUpdates, getCurrentOMDBSequenceNumber() - fromSequenceNumber, lag);
return null != dbUpdates && dbUpdates.isDBUpdateSuccess();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ public void testGetAndApplyDeltaUpdatesFromOMWithLimit() throws Exception {
getMockOzoneManagerClientWith4Updates(dbUpdatesWrapper[0],
dbUpdatesWrapper[1], dbUpdatesWrapper[2], dbUpdatesWrapper[3]));

assertEquals(true, dbUpdatesWrapper[0].isDBUpdateSuccess());
assertEquals(true, dbUpdatesWrapper[1].isDBUpdateSuccess());
assertEquals(true, dbUpdatesWrapper[2].isDBUpdateSuccess());
assertEquals(true, dbUpdatesWrapper[3].isDBUpdateSuccess());

OMDBUpdatesHandler updatesHandler =
new OMDBUpdatesHandler(omMetadataManager);
ozoneManagerServiceProvider.getAndApplyDeltaUpdatesFromOM(
Expand Down