From ee2fb7470288db3495ba62c90ab81bb997e91d8c Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Thu, 3 Jun 2021 18:22:00 +0000 Subject: [PATCH 1/2] Fix external compaction properties names and keys Modified the external compaction properties to be in line with other properties in the codebase. Specifically, I changed 1. the enum names starting with `COORDINATOR` to `COMPACTION_COORDINATOR` 2. the keys starting with `coordinator` to `compaction_coordinator` 3. properties having to with compaction of files to include the term `compaction` in the key Closes #2142 --- .../apache/accumulo/core/conf/Property.java | 38 ++++++++++--------- .../coordinator/CompactionCoordinator.java | 17 +++++---- .../coordinator/CompactionFinalizer.java | 4 +- .../coordinator/DeadCompactionDetector.java | 2 +- .../accumulo/test/ExternalCompactionIT.java | 4 +- 5 files changed, 36 insertions(+), 29 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 10e90c7c1d2..68e13094840 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -1045,40 +1045,44 @@ public enum Property { "The maximum size of a message that can be sent to a tablet server."), // CompactionCoordinator properties @Experimental - COORDINATOR_PREFIX("coordinator.", null, PropertyType.PREFIX, + COMPACTION_COORDINATOR_PREFIX("compaction_coordinator.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the accumulo compaction coordinator server."), @Experimental - COORDINATOR_THRIFTCLIENT_PORTSEARCH("coordinator.port.search", "false", PropertyType.BOOLEAN, + COMPACTION_COORDINATOR_THRIFTCLIENT_PORTSEARCH("compaction_coordinator.port.search", "false", + PropertyType.BOOLEAN, "If the ports above are in use, search higher ports until one is available"), @Experimental - COORDINATOR_CLIENTPORT("coordinator.port.client", "9132", PropertyType.PORT, + COMPACTION_COORDINATOR_CLIENTPORT("compaction_coordinator.port.client", "9132", PropertyType.PORT, "The port used for handling Thrift client connections on the compaction coordinator server"), @Experimental - COORDINATOR_MINTHREADS("coordinator.threads.minimum", "1", PropertyType.COUNT, - "The minimum number of threads to use to handle incoming requests."), + COMPACTION_COORDINATOR_MINTHREADS("compaction_coordinator.threads.minimum", "1", + PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."), @Experimental - COORDINATOR_MINTHREADS_TIMEOUT("coordinator.threads.timeout", "0s", PropertyType.TIMEDURATION, + COMPACTION_COORDINATOR_MINTHREADS_TIMEOUT("compaction_coordinator.threads.timeout", "0s", + PropertyType.TIMEDURATION, "The time after which incoming request threads terminate with no work available. Zero (0) will keep the threads alive indefinitely."), @Experimental - COORDINATOR_THREADCHECK("coordinator.threadcheck.time", "1s", PropertyType.TIMEDURATION, - "The time between adjustments of the server thread pool."), + COMPACTION_COORDINATOR_THREADCHECK("compaction_coordinator.threadcheck.time", "1s", + PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool."), @Experimental - COORDINATOR_MAX_MESSAGE_SIZE("coordinator.message.size.max", "10M", PropertyType.BYTES, - "The maximum size of a message that can be sent to a tablet server."), + COMPACTION_COORDINATOR_MAX_MESSAGE_SIZE("compaction_coordinator.message.size.max", "10M", + PropertyType.BYTES, "The maximum size of a message that can be sent to a tablet server."), @Experimental - COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL("coordinator.compactor.dead.check.interval", "5m", - PropertyType.TIMEDURATION, "The interval at which to check for dead compactors."), + COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL( + "compaction_coordinator.compactor.dead.check.interval", "5m", PropertyType.TIMEDURATION, + "The interval at which to check for dead compactors."), @Experimental - COORDINATOR_FINALIZER_TSERVER_NOTIFIER_MAXTHREADS("coordinator.finalizer.threads.maximum", "5", - PropertyType.COUNT, + COMPACTION_COORDINATOR_FINALIZER_TSERVER_NOTIFIER_MAXTHREADS( + "compaction_coordinator.compaction.finalizer.threads.maximum", "5", PropertyType.COUNT, "The maximum number of threads to use for notifying tablet servers that an external compaction has completed."), @Experimental - COORDINATOR_FINALIZER_COMPLETION_CHECK_INTERVAL("coordinator.finalizer.check.interval", "60s", + COMPACTION_COORDINATOR_FINALIZER_COMPLETION_CHECK_INTERVAL( + "compaction_coordinator.compaction.finalizer.check.interval", "60s", PropertyType.TIMEDURATION, "The interval at which to check for external compaction final state markers in the metadata table."), @Experimental - COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL("coordinator.tserver.compaction.check.interval", - "1m", PropertyType.TIMEDURATION, + COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL( + "compaction_coordinator.tserver.check.interval", "1m", PropertyType.TIMEDURATION, "The interval at which to check the tservers for external compactions."), // deprecated properties grouped at the end to reference property that replaces them @Deprecated(since = "1.6.0") diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index 9594829111c..aee379cac3f 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -217,13 +217,15 @@ protected ServerAddress startCoordinatorClientService() throws UnknownHostExcept } final CompactionCoordinatorService.Processor processor = new CompactionCoordinatorService.Processor<>(rpcProxy); - Property maxMessageSizeProperty = (aconf.get(Property.COORDINATOR_MAX_MESSAGE_SIZE) != null - ? Property.COORDINATOR_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE); + Property maxMessageSizeProperty = + (aconf.get(Property.COMPACTION_COORDINATOR_MAX_MESSAGE_SIZE) != null + ? Property.COMPACTION_COORDINATOR_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE); ServerAddress sp = TServerUtils.startServer(getMetricsSystem(), getContext(), getHostname(), - Property.COORDINATOR_CLIENTPORT, processor, this.getClass().getSimpleName(), - "Thrift Client Server", Property.COORDINATOR_THRIFTCLIENT_PORTSEARCH, - Property.COORDINATOR_MINTHREADS, Property.COORDINATOR_MINTHREADS_TIMEOUT, - Property.COORDINATOR_THREADCHECK, maxMessageSizeProperty); + Property.COMPACTION_COORDINATOR_CLIENTPORT, processor, this.getClass().getSimpleName(), + "Thrift Client Server", Property.COMPACTION_COORDINATOR_THRIFTCLIENT_PORTSEARCH, + Property.COMPACTION_COORDINATOR_MINTHREADS, + Property.COMPACTION_COORDINATOR_MINTHREADS_TIMEOUT, + Property.COMPACTION_COORDINATOR_THREADCHECK, maxMessageSizeProperty); LOG.info("address = {}", sp.address); return sp; } @@ -354,7 +356,8 @@ protected long getMissingCompactorWarningTime() { } protected long getTServerCheckInterval() { - return this.aconf.getTimeInMillis(Property.COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL); + return this.aconf + .getTimeInMillis(Property.COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL); } protected TabletMetadata getMetadataEntryForExtent(KeyExtent extent) { diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java index bf8c6d66e8b..991f98fe459 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java @@ -64,9 +64,9 @@ protected CompactionFinalizer(ServerContext context, ScheduledThreadPoolExecutor this.pendingNotifications = new ArrayBlockingQueue<>(1000); tserverCheckInterval = this.context.getConfiguration() - .getTimeInMillis(Property.COORDINATOR_FINALIZER_COMPLETION_CHECK_INTERVAL); + .getTimeInMillis(Property.COMPACTION_COORDINATOR_FINALIZER_COMPLETION_CHECK_INTERVAL); int max = this.context.getConfiguration() - .getCount(Property.COORDINATOR_FINALIZER_TSERVER_NOTIFIER_MAXTHREADS); + .getCount(Property.COMPACTION_COORDINATOR_FINALIZER_TSERVER_NOTIFIER_MAXTHREADS); this.ntfyExecutor = ThreadPools.createThreadPool(3, max, 1, TimeUnit.MINUTES, "Compaction Finalizer Notifier", false); diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java index 1b5bd2850af..8996acd2c5f 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java @@ -103,7 +103,7 @@ private void detectDeadCompactions() { public void start() { long interval = this.context.getConfiguration() - .getTimeInMillis(Property.COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL); + .getTimeInMillis(Property.COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL); schedExecutor.scheduleWithFixedDelay(() -> { try { diff --git a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java index 128c8ecf962..621ecd8269a 100644 --- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java @@ -144,8 +144,8 @@ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSit DefaultCompactionPlanner.class.getName()); cfg.setProperty("tserver.compaction.major.service.cs2.planner.opts.executors", "[{'name':'all', 'type': 'external','queue': 'DCQ2'}]"); - cfg.setProperty(Property.COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL.getKey(), "30s"); - cfg.setProperty(Property.COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL, "10s"); + cfg.setProperty(Property.COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL.getKey(), "30s"); + cfg.setProperty(Property.COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL, "10s"); cfg.setProperty(Property.COMPACTOR_PORTSEARCH, "true"); // use raw local file system so walogs sync and flush will work hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); From cce4e6c190e6929ad54338c27afd1bd9c66a24a6 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Thu, 3 Jun 2021 19:49:48 +0000 Subject: [PATCH 2/2] Modified compaction_coordinator key prefixes to compaction.coordinator --- .../apache/accumulo/core/conf/Property.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 68e13094840..a1bee5922e2 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -1045,44 +1045,44 @@ public enum Property { "The maximum size of a message that can be sent to a tablet server."), // CompactionCoordinator properties @Experimental - COMPACTION_COORDINATOR_PREFIX("compaction_coordinator.", null, PropertyType.PREFIX, + COMPACTION_COORDINATOR_PREFIX("compaction.coordinator.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the accumulo compaction coordinator server."), @Experimental - COMPACTION_COORDINATOR_THRIFTCLIENT_PORTSEARCH("compaction_coordinator.port.search", "false", + COMPACTION_COORDINATOR_THRIFTCLIENT_PORTSEARCH("compaction.coordinator.port.search", "false", PropertyType.BOOLEAN, "If the ports above are in use, search higher ports until one is available"), @Experimental - COMPACTION_COORDINATOR_CLIENTPORT("compaction_coordinator.port.client", "9132", PropertyType.PORT, + COMPACTION_COORDINATOR_CLIENTPORT("compaction.coordinator.port.client", "9132", PropertyType.PORT, "The port used for handling Thrift client connections on the compaction coordinator server"), @Experimental - COMPACTION_COORDINATOR_MINTHREADS("compaction_coordinator.threads.minimum", "1", + COMPACTION_COORDINATOR_MINTHREADS("compaction.coordinator.threads.minimum", "1", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."), @Experimental - COMPACTION_COORDINATOR_MINTHREADS_TIMEOUT("compaction_coordinator.threads.timeout", "0s", + COMPACTION_COORDINATOR_MINTHREADS_TIMEOUT("compaction.coordinator.threads.timeout", "0s", PropertyType.TIMEDURATION, "The time after which incoming request threads terminate with no work available. Zero (0) will keep the threads alive indefinitely."), @Experimental - COMPACTION_COORDINATOR_THREADCHECK("compaction_coordinator.threadcheck.time", "1s", + COMPACTION_COORDINATOR_THREADCHECK("compaction.coordinator.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool."), @Experimental - COMPACTION_COORDINATOR_MAX_MESSAGE_SIZE("compaction_coordinator.message.size.max", "10M", + COMPACTION_COORDINATOR_MAX_MESSAGE_SIZE("compaction.coordinator.message.size.max", "10M", PropertyType.BYTES, "The maximum size of a message that can be sent to a tablet server."), @Experimental COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL( - "compaction_coordinator.compactor.dead.check.interval", "5m", PropertyType.TIMEDURATION, + "compaction.coordinator.compactor.dead.check.interval", "5m", PropertyType.TIMEDURATION, "The interval at which to check for dead compactors."), @Experimental COMPACTION_COORDINATOR_FINALIZER_TSERVER_NOTIFIER_MAXTHREADS( - "compaction_coordinator.compaction.finalizer.threads.maximum", "5", PropertyType.COUNT, + "compaction.coordinator.compaction.finalizer.threads.maximum", "5", PropertyType.COUNT, "The maximum number of threads to use for notifying tablet servers that an external compaction has completed."), @Experimental COMPACTION_COORDINATOR_FINALIZER_COMPLETION_CHECK_INTERVAL( - "compaction_coordinator.compaction.finalizer.check.interval", "60s", + "compaction.coordinator.compaction.finalizer.check.interval", "60s", PropertyType.TIMEDURATION, "The interval at which to check for external compaction final state markers in the metadata table."), @Experimental COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL( - "compaction_coordinator.tserver.check.interval", "1m", PropertyType.TIMEDURATION, + "compaction.coordinator.tserver.check.interval", "1m", PropertyType.TIMEDURATION, "The interval at which to check the tservers for external compactions."), // deprecated properties grouped at the end to reference property that replaces them @Deprecated(since = "1.6.0")