Skip to content
Permalink
Browse files
[NO ISSUE][OTH] Update storage options names
- user model changes: yes
- storage format changes: no
- interface changes: no

Details:

- Update storage options names and descriptions to better
  reflect their usage.

Change-Id: I436c6327fb6b6af432e5cf74962beca38660b9c9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10723
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
  • Loading branch information
mhubail committed Mar 26, 2021
1 parent 8b5b5ce commit 690da336312dacb2a42f4c21e71a8ad1bb7417f5
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 27 deletions.
@@ -209,15 +209,15 @@ public void initialize(IRecoveryManagerFactory recoveryManagerFactory, IReceptio
}
localResourceRepository.deleteStorageData();
}
int maxConcurrentFlushes = storageProperties.getMaxConcurrentFlushes();
if (maxConcurrentFlushes <= 0) {
maxConcurrentFlushes = ioManager.getIODevices().size();
int maxScheduledFlushes = storageProperties.getMaxScheduledFlushes();
if (maxScheduledFlushes <= 0) {
maxScheduledFlushes = ioManager.getIODevices().size();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("The value of maxConcurrentFlushes is not provided. Setting maxConcurrentFlushes = {}.",
maxConcurrentFlushes);
LOGGER.info("The value of maxScheduledFlushes is not provided. Setting maxConcurrentFlushes = {}.",
maxScheduledFlushes);
}
}
virtualBufferCache = new GlobalVirtualBufferCache(allocator, storageProperties, maxConcurrentFlushes);
virtualBufferCache = new GlobalVirtualBufferCache(allocator, storageProperties, maxScheduledFlushes);
// Must start vbc now instead of by life cycle component manager (lccm) because lccm happens after
// the metadata bootstrap task
((ILifeCycleComponent) virtualBufferCache).start();
@@ -584,24 +584,24 @@ private ILSMIOOperationScheduler createIoScheduler(StorageProperties properties)
String schedulerName = storageProperties.getIoScheduler();
int numPartitions = ioManager.getIODevices().size();

int maxRunningFlushes = storageProperties.getMaxRunningFlushes(numPartitions);
int maxConcurrentFlushes = storageProperties.geMaxConcurrentFlushes(numPartitions);
int maxScheduledMerges = storageProperties.getMaxScheduledMerges(numPartitions);
int maxRunningMerges = storageProperties.getMaxRunningMerges(numPartitions);
int maxConcurrentMerges = storageProperties.getMaxConcurrentMerges(numPartitions);

ILSMIOOperationScheduler ioScheduler = null;
if (AsynchronousScheduler.FACTORY.getName().equalsIgnoreCase(schedulerName)) {
ioScheduler = AsynchronousScheduler.FACTORY.createIoScheduler(getServiceContext().getThreadFactory(),
HaltCallback.INSTANCE, maxRunningFlushes, maxScheduledMerges, maxRunningMerges);
HaltCallback.INSTANCE, maxConcurrentFlushes, maxScheduledMerges, maxConcurrentMerges);
} else if (GreedyScheduler.FACTORY.getName().equalsIgnoreCase(schedulerName)) {
ioScheduler = GreedyScheduler.FACTORY.createIoScheduler(getServiceContext().getThreadFactory(),
HaltCallback.INSTANCE, maxRunningFlushes, maxScheduledMerges, maxRunningMerges);
HaltCallback.INSTANCE, maxConcurrentFlushes, maxScheduledMerges, maxConcurrentMerges);
} else {
if (LOGGER.isWarnEnabled()) {
LOGGER.log(Level.WARN,
"Unknown storage I/O scheduler: " + schedulerName + "; defaulting to greedy I/O scheduler.");
}
ioScheduler = GreedyScheduler.FACTORY.createIoScheduler(getServiceContext().getThreadFactory(),
HaltCallback.INSTANCE, maxRunningFlushes, maxScheduledMerges, maxRunningMerges);
HaltCallback.INSTANCE, maxConcurrentFlushes, maxScheduledMerges, maxConcurrentMerges);
}
return ioScheduler;
}
@@ -19,7 +19,6 @@
package org.apache.asterix.common.config;

import static org.apache.hyracks.control.common.config.OptionTypes.DOUBLE;
import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT;
import static org.apache.hyracks.control.common.config.OptionTypes.LONG_BYTE_UNIT;
import static org.apache.hyracks.control.common.config.OptionTypes.NONNEGATIVE_INTEGER;
@@ -49,16 +48,16 @@ public enum Option implements IOption {
STORAGE_MEMORYCOMPONENT_PAGESIZE(INTEGER_BYTE_UNIT, StorageUtil.getIntSizeInBytes(128, KILOBYTE)),
STORAGE_MEMORYCOMPONENT_NUMCOMPONENTS(POSITIVE_INTEGER, 2),
STORAGE_MEMORYCOMPONENT_FLUSH_THRESHOLD(DOUBLE, 0.9d),
STORAGE_MEMORYCOMPONENT_MAX_CONCURRENT_FLUSHES(INTEGER, 0),
STORAGE_MEMORYCOMPONENT_MAX_SCHEDULED_FLUSHES(NONNEGATIVE_INTEGER, 0),
STORAGE_FILTERED_MEMORYCOMPONENT_MAX_SIZE(LONG_BYTE_UNIT, 0L),
STORAGE_LSM_BLOOMFILTER_FALSEPOSITIVERATE(DOUBLE, 0.01d),
STORAGE_COMPRESSION_BLOCK(STRING, "snappy"),
STORAGE_DISK_FORCE_BYTES(LONG_BYTE_UNIT, StorageUtil.getLongSizeInBytes(16, MEGABYTE)),
STORAGE_IO_SCHEDULER(STRING, "greedy"),
STORAGE_WRITE_RATE_LIMIT(LONG_BYTE_UNIT, 0l),
STORAGE_MAX_RUNNING_FLUSHES_PER_PARTITION(NONNEGATIVE_INTEGER, 2),
STORAGE_MAX_CONCURRENT_FLUSHES_PER_PARTITION(NONNEGATIVE_INTEGER, 2),
STORAGE_MAX_SCHEDULED_MERGES_PER_PARTITION(NONNEGATIVE_INTEGER, 8),
STORAGE_MAX_RUNNING_MERGES_PER_PARTITION(NONNEGATIVE_INTEGER, 2);
STORAGE_MAX_CONCURRENT_MERGES_PER_PARTITION(NONNEGATIVE_INTEGER, 2);

private final IOptionType interpreter;
private final Object defaultValue;
@@ -96,8 +95,8 @@ public String description() {
return "The page size in bytes for pages allocated to memory components";
case STORAGE_MEMORYCOMPONENT_NUMCOMPONENTS:
return "The number of memory components to be used per lsm index";
case STORAGE_MEMORYCOMPONENT_MAX_CONCURRENT_FLUSHES:
return "The maximum number of concurrent flush operations. 0 means that the value will be "
case STORAGE_MEMORYCOMPONENT_MAX_SCHEDULED_FLUSHES:
return "The maximum number of scheduled flush operations. 0 means that the value will be "
+ "calculated as the number of partitions";
case STORAGE_MEMORYCOMPONENT_FLUSH_THRESHOLD:
return "The memory usage threshold when memory components should be flushed";
@@ -114,12 +113,12 @@ public String description() {
return "The number of bytes before each disk force (fsync)";
case STORAGE_IO_SCHEDULER:
return "The I/O scheduler for LSM flush and merge operations";
case STORAGE_MAX_RUNNING_FLUSHES_PER_PARTITION:
return "The maximum number of running flushes per partition (0 means unlimited)";
case STORAGE_MAX_CONCURRENT_FLUSHES_PER_PARTITION:
return "The maximum number of concurrently executed flushes per partition (0 means unlimited)";
case STORAGE_MAX_SCHEDULED_MERGES_PER_PARTITION:
return "The maximum number of scheduled merges per partition (0 means unlimited)";
case STORAGE_MAX_RUNNING_MERGES_PER_PARTITION:
return "The maximum number of running merges per partition (0 means unlimited)";
case STORAGE_MAX_CONCURRENT_MERGES_PER_PARTITION:
return "The maximum number of concurrently executed merges per partition (0 means unlimited)";
default:
throw new IllegalStateException("NYI: " + this);
}
@@ -189,8 +188,8 @@ public int getBufferCacheNumPages() {
return (int) (getBufferCacheSize() / (getBufferCachePageSize() + IBufferCache.RESERVED_HEADER_BYTES));
}

public int getMaxConcurrentFlushes() {
return accessor.getInt(Option.STORAGE_MEMORYCOMPONENT_MAX_CONCURRENT_FLUSHES);
public int getMaxScheduledFlushes() {
return accessor.getInt(Option.STORAGE_MEMORYCOMPONENT_MAX_SCHEDULED_FLUSHES);
}

public long getJobExecutionMemoryBudget() {
@@ -213,8 +212,8 @@ public String getIoScheduler() {
return accessor.getString(Option.STORAGE_IO_SCHEDULER);
}

public int getMaxRunningFlushes(int numPartitions) {
int value = accessor.getInt(Option.STORAGE_MAX_RUNNING_FLUSHES_PER_PARTITION);
public int geMaxConcurrentFlushes(int numPartitions) {
int value = accessor.getInt(Option.STORAGE_MAX_CONCURRENT_FLUSHES_PER_PARTITION);
return value != 0 ? value * numPartitions : Integer.MAX_VALUE;
}

@@ -223,8 +222,8 @@ public int getMaxScheduledMerges(int numPartitions) {
return value != 0 ? value * numPartitions : Integer.MAX_VALUE;
}

public int getMaxRunningMerges(int numPartitions) {
int value = accessor.getInt(Option.STORAGE_MAX_RUNNING_MERGES_PER_PARTITION);
public int getMaxConcurrentMerges(int numPartitions) {
int value = accessor.getInt(Option.STORAGE_MAX_CONCURRENT_MERGES_PER_PARTITION);
return value != 0 ? value * numPartitions : Integer.MAX_VALUE;
}

0 comments on commit 690da33

Please sign in to comment.