Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions core/src/main/java/org/apache/accumulo/core/conf/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -1045,40 +1045,44 @@ public enum Property {
"The maximum size of a message that can be sent to a tablet server."),
// CompactionCoordinator properties
@Experimental
COORDINATOR_PREFIX("coordinator.", null, PropertyType.PREFIX,
COMPACTION_COORDINATOR_PREFIX("compaction.coordinator.", null, PropertyType.PREFIX,
"Properties in this category affect the behavior of the accumulo compaction coordinator server."),
@Experimental
COORDINATOR_THRIFTCLIENT_PORTSEARCH("coordinator.port.search", "false", PropertyType.BOOLEAN,
COMPACTION_COORDINATOR_THRIFTCLIENT_PORTSEARCH("compaction.coordinator.port.search", "false",
PropertyType.BOOLEAN,
"If the ports above are in use, search higher ports until one is available"),
@Experimental
COORDINATOR_CLIENTPORT("coordinator.port.client", "9132", PropertyType.PORT,
COMPACTION_COORDINATOR_CLIENTPORT("compaction.coordinator.port.client", "9132", PropertyType.PORT,
"The port used for handling Thrift client connections on the compaction coordinator server"),
@Experimental
COORDINATOR_MINTHREADS("coordinator.threads.minimum", "1", PropertyType.COUNT,
"The minimum number of threads to use to handle incoming requests."),
COMPACTION_COORDINATOR_MINTHREADS("compaction.coordinator.threads.minimum", "1",
PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."),
@Experimental
COORDINATOR_MINTHREADS_TIMEOUT("coordinator.threads.timeout", "0s", PropertyType.TIMEDURATION,
COMPACTION_COORDINATOR_MINTHREADS_TIMEOUT("compaction.coordinator.threads.timeout", "0s",
PropertyType.TIMEDURATION,
"The time after which incoming request threads terminate with no work available. Zero (0) will keep the threads alive indefinitely."),
@Experimental
COORDINATOR_THREADCHECK("coordinator.threadcheck.time", "1s", PropertyType.TIMEDURATION,
"The time between adjustments of the server thread pool."),
COMPACTION_COORDINATOR_THREADCHECK("compaction.coordinator.threadcheck.time", "1s",
PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool."),
@Experimental
COORDINATOR_MAX_MESSAGE_SIZE("coordinator.message.size.max", "10M", PropertyType.BYTES,
"The maximum size of a message that can be sent to a tablet server."),
COMPACTION_COORDINATOR_MAX_MESSAGE_SIZE("compaction.coordinator.message.size.max", "10M",
PropertyType.BYTES, "The maximum size of a message that can be sent to a tablet server."),
@Experimental
COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL("coordinator.compactor.dead.check.interval", "5m",
PropertyType.TIMEDURATION, "The interval at which to check for dead compactors."),
COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL(
"compaction.coordinator.compactor.dead.check.interval", "5m", PropertyType.TIMEDURATION,
"The interval at which to check for dead compactors."),
@Experimental
COORDINATOR_FINALIZER_TSERVER_NOTIFIER_MAXTHREADS("coordinator.finalizer.threads.maximum", "5",
PropertyType.COUNT,
COMPACTION_COORDINATOR_FINALIZER_TSERVER_NOTIFIER_MAXTHREADS(
"compaction.coordinator.compaction.finalizer.threads.maximum", "5", PropertyType.COUNT,
"The maximum number of threads to use for notifying tablet servers that an external compaction has completed."),
@Experimental
COORDINATOR_FINALIZER_COMPLETION_CHECK_INTERVAL("coordinator.finalizer.check.interval", "60s",
COMPACTION_COORDINATOR_FINALIZER_COMPLETION_CHECK_INTERVAL(
"compaction.coordinator.compaction.finalizer.check.interval", "60s",
PropertyType.TIMEDURATION,
"The interval at which to check for external compaction final state markers in the metadata table."),
@Experimental
COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL("coordinator.tserver.compaction.check.interval",
"1m", PropertyType.TIMEDURATION,
COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL(
"compaction.coordinator.tserver.check.interval", "1m", PropertyType.TIMEDURATION,
"The interval at which to check the tservers for external compactions."),
// deprecated properties grouped at the end to reference property that replaces them
@Deprecated(since = "1.6.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,15 @@ protected ServerAddress startCoordinatorClientService() throws UnknownHostExcept
}
final CompactionCoordinatorService.Processor<Iface> processor =
new CompactionCoordinatorService.Processor<>(rpcProxy);
Property maxMessageSizeProperty = (aconf.get(Property.COORDINATOR_MAX_MESSAGE_SIZE) != null
? Property.COORDINATOR_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
Property maxMessageSizeProperty =
(aconf.get(Property.COMPACTION_COORDINATOR_MAX_MESSAGE_SIZE) != null
? Property.COMPACTION_COORDINATOR_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
ServerAddress sp = TServerUtils.startServer(getMetricsSystem(), getContext(), getHostname(),
Property.COORDINATOR_CLIENTPORT, processor, this.getClass().getSimpleName(),
"Thrift Client Server", Property.COORDINATOR_THRIFTCLIENT_PORTSEARCH,
Property.COORDINATOR_MINTHREADS, Property.COORDINATOR_MINTHREADS_TIMEOUT,
Property.COORDINATOR_THREADCHECK, maxMessageSizeProperty);
Property.COMPACTION_COORDINATOR_CLIENTPORT, processor, this.getClass().getSimpleName(),
"Thrift Client Server", Property.COMPACTION_COORDINATOR_THRIFTCLIENT_PORTSEARCH,
Property.COMPACTION_COORDINATOR_MINTHREADS,
Property.COMPACTION_COORDINATOR_MINTHREADS_TIMEOUT,
Property.COMPACTION_COORDINATOR_THREADCHECK, maxMessageSizeProperty);
LOG.info("address = {}", sp.address);
return sp;
}
Expand Down Expand Up @@ -354,7 +356,8 @@ protected long getMissingCompactorWarningTime() {
}

protected long getTServerCheckInterval() {
return this.aconf.getTimeInMillis(Property.COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL);
return this.aconf
.getTimeInMillis(Property.COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL);
}

protected TabletMetadata getMetadataEntryForExtent(KeyExtent extent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ protected CompactionFinalizer(ServerContext context, ScheduledThreadPoolExecutor
this.pendingNotifications = new ArrayBlockingQueue<>(1000);

tserverCheckInterval = this.context.getConfiguration()
.getTimeInMillis(Property.COORDINATOR_FINALIZER_COMPLETION_CHECK_INTERVAL);
.getTimeInMillis(Property.COMPACTION_COORDINATOR_FINALIZER_COMPLETION_CHECK_INTERVAL);
int max = this.context.getConfiguration()
.getCount(Property.COORDINATOR_FINALIZER_TSERVER_NOTIFIER_MAXTHREADS);
.getCount(Property.COMPACTION_COORDINATOR_FINALIZER_TSERVER_NOTIFIER_MAXTHREADS);

this.ntfyExecutor = ThreadPools.createThreadPool(3, max, 1, TimeUnit.MINUTES,
"Compaction Finalizer Notifier", false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private void detectDeadCompactions() {

public void start() {
long interval = this.context.getConfiguration()
.getTimeInMillis(Property.COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL);
.getTimeInMillis(Property.COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL);

schedExecutor.scheduleWithFixedDelay(() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSit
DefaultCompactionPlanner.class.getName());
cfg.setProperty("tserver.compaction.major.service.cs2.planner.opts.executors",
"[{'name':'all', 'type': 'external','queue': 'DCQ2'}]");
cfg.setProperty(Property.COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL.getKey(), "30s");
cfg.setProperty(Property.COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL, "10s");
cfg.setProperty(Property.COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL.getKey(), "30s");
cfg.setProperty(Property.COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL, "10s");
cfg.setProperty(Property.COMPACTOR_PORTSEARCH, "true");
// use raw local file system so walogs sync and flush will work
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
Expand Down