diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 92bc94f4ab9..9b31ca33f17 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -70,6 +70,7 @@ import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList; import org.apache.accumulo.core.compaction.thrift.TNextCompactionJob; +import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -98,6 +99,8 @@ import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.spi.compaction.CompactionPlanner; +import org.apache.accumulo.core.spi.compaction.CompactionServiceId; import org.apache.accumulo.core.spi.compaction.CompactorGroupId; import org.apache.accumulo.core.tabletserver.thrift.InputFile; import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig; @@ -107,6 +110,8 @@ import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.cache.Caches.CacheName; +import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams; +import org.apache.accumulo.core.util.compaction.CompactionServicesConfig; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.compaction.RunningCompaction; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -120,6 +125,7 @@ import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue; import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues; import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.ServiceEnvironmentImpl; import org.apache.accumulo.server.compaction.CompactionConfigStorage; import org.apache.accumulo.server.compaction.CompactionPluginUtils; import org.apache.accumulo.server.security.SecurityOperation; @@ -163,7 +169,6 @@ public class CompactionCoordinator new ConcurrentHashMap<>(); /* Map of group name to last time compactor called to get a compaction job */ - // ELASTICITY_TODO #4403 need to clean out groups that are no longer configured.. private final Map TIME_COMPACTOR_LAST_CHECKED = new ConcurrentHashMap<>(); private final ServerContext ctx; @@ -186,6 +191,8 @@ public class CompactionCoordinator private final LoadingCache compactorCounts; private final int jobQueueInitialSize; + private volatile long coordinatorStartTime; + public CompactionCoordinator(ServerContext ctx, SecurityOperation security, AtomicReference>> fateInstances, Manager manager) { this.ctx = ctx; @@ -253,44 +260,23 @@ public void shutdown() { } } - protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) { - ScheduledFuture future = - schedExecutor.scheduleWithFixedDelay(this::cleanUpCompactors, 0, 5, TimeUnit.MINUTES); + protected void startCompactorZKCleaner(ScheduledThreadPoolExecutor schedExecutor) { + ScheduledFuture future = schedExecutor + .scheduleWithFixedDelay(this::cleanUpEmptyCompactorPathInZK, 0, 5, TimeUnit.MINUTES); ThreadPools.watchNonCriticalScheduledTask(future); } - protected void startRunningCleaner(ScheduledThreadPoolExecutor schedExecutor) { + protected void startInternalStateCleaner(ScheduledThreadPoolExecutor schedExecutor) { ScheduledFuture future = - schedExecutor.scheduleWithFixedDelay(this::cleanUpRunning, 0, 5, TimeUnit.MINUTES); - ThreadPools.watchNonCriticalScheduledTask(future); - } - - protected void startIdleCompactionWatcher() { - - ScheduledFuture future = schedExecutor.scheduleWithFixedDelay(this::idleCompactionWarning, - getTServerCheckInterval(), getTServerCheckInterval(), TimeUnit.MILLISECONDS); + schedExecutor.scheduleWithFixedDelay(this::cleanUpInternalState, 0, 5, TimeUnit.MINUTES); ThreadPools.watchNonCriticalScheduledTask(future); } - private void idleCompactionWarning() { - - long now = System.currentTimeMillis(); - Map> idleCompactors = getIdleCompactors(); - TIME_COMPACTOR_LAST_CHECKED.forEach((groupName, lastCheckTime) -> { - if ((now - lastCheckTime) > getMissingCompactorWarningTime() - && jobQueues.getQueuedJobs(groupName) > 0 - && idleCompactors.containsKey(groupName.canonical())) { - LOG.warn("No compactors have checked in with coordinator for group {} in {}ms", groupName, - getMissingCompactorWarningTime()); - } - }); - - } - @Override public void run() { - startCompactionCleaner(schedExecutor); - startRunningCleaner(schedExecutor); + + this.coordinatorStartTime = System.currentTimeMillis(); + startCompactorZKCleaner(schedExecutor); // On a re-start of the coordinator it's possible that external compactions are in-progress. // Attempt to get the running compactions on the compactors and then resolve which tserver @@ -312,8 +298,7 @@ public void run() { } startDeadCompactionDetector(); - - startIdleCompactionWatcher(); + startInternalStateCleaner(schedExecutor); try { shutdown.await(); @@ -324,13 +309,14 @@ public void run() { LOG.info("Shutting down"); } - private Map> getIdleCompactors() { + private Map> + getIdleCompactors(Map> runningCompactors) { - Map> allCompactors = new HashMap<>(); - ExternalCompactionUtil.getCompactorAddrs(ctx) + final Map> allCompactors = new HashMap<>(); + runningCompactors .forEach((group, compactorList) -> allCompactors.put(group, new HashSet<>(compactorList))); - Set emptyQueues = new HashSet<>(); + final Set emptyQueues = new HashSet<>(); // Remove all of the compactors that are running a compaction RUNNING_CACHE.values().forEach(rc -> { @@ -939,30 +925,6 @@ protected Set readExternalCompactionIds() { } } - /** - * The RUNNING_CACHE set may contain external compactions that are not actually running. This - * method periodically cleans those up. - */ - public void cleanUpRunning() { - - // grab a snapshot of the ids in the set before reading the metadata table. This is done to - // avoid removing things that are added while reading the metadata. - Set idsSnapshot = Set.copyOf(RUNNING_CACHE.keySet()); - - // grab the ids that are listed as running in the metadata table. It important that this is done - // after getting the snapshot. - Set idsInMetadata = readExternalCompactionIds(); - - var idsToRemove = Sets.difference(idsSnapshot, idsInMetadata); - - // remove ids that are in the running set but not in the metadata table - idsToRemove.forEach(this::recordCompletion); - - if (idsToRemove.size() > 0) { - LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove); - } - } - /** * Return information about running compactions * @@ -1049,6 +1011,11 @@ protected List getCompactionsRunningOnCompactors() { return ExternalCompactionUtil.getCompactionsRunningOnCompactors(this.ctx); } + /* Method exists to be overridden in test to hide static method */ + protected Map> getRunningCompactors() { + return ExternalCompactionUtil.getCompactorAddrs(this.ctx); + } + /* Method exists to be overridden in test to hide static method */ protected void cancelCompactionOnCompactor(String address, String externalCompactionId) { HostAndPort hostPort = HostAndPort.fromString(address); @@ -1065,7 +1032,7 @@ private void deleteEmpty(ZooReaderWriter zoorw, String path) } } - private void cleanUpCompactors() { + private void cleanUpEmptyCompactorPathInZK() { final String compactorQueuesPath = this.ctx.getZooKeeperRoot() + Constants.ZCOMPACTORS; final var zoorw = this.ctx.getZooReaderWriter(); @@ -1118,6 +1085,137 @@ private void cleanUpCompactors() { } } + private Set getCompactionServicesConfigurationGroups() + throws ReflectiveOperationException, IllegalArgumentException, SecurityException { + + Set groups = new HashSet<>(); + AccumuloConfiguration config = ctx.getConfiguration(); + CompactionServicesConfig servicesConfig = new CompactionServicesConfig(config); + + for (var entry : servicesConfig.getPlanners().entrySet()) { + String serviceId = entry.getKey(); + String plannerClassName = entry.getValue(); + + Class plannerClass = + Class.forName(plannerClassName).asSubclass(CompactionPlanner.class); + CompactionPlanner planner = plannerClass.getDeclaredConstructor().newInstance(); + + var initParams = new CompactionPlannerInitParams(CompactionServiceId.of(serviceId), + servicesConfig.getPlannerPrefix(serviceId), servicesConfig.getOptions().get(serviceId), + new ServiceEnvironmentImpl(ctx)); + + planner.init(initParams); + + groups.addAll(initParams.getRequestedGroups()); + } + return groups; + } + + public void cleanUpInternalState() { + + // This method does the following: + // + // 1. Removes entries from RUNNING_CACHE that are not really running + // 2. Cancels running compactions for groups that are not in the current configuration + // 3. Remove groups not in configuration from TIME_COMPACTOR_LAST_CHECKED + // 4. Log groups with no compactors + // 5. Log compactors with no groups + // 6. Log groups with compactors and queued jos that have not checked in + + // grab a snapshot of the ids in the set before reading the metadata table. This is done to + // avoid removing things that are added while reading the metadata. + final Set idsSnapshot = Set.copyOf(RUNNING_CACHE.keySet()); + + // grab the ids that are listed as running in the metadata table. It important that this is done + // after getting the snapshot. + final Set idsInMetadata = readExternalCompactionIds(); + + final Set idsToRemove = Sets.difference(idsSnapshot, idsInMetadata); + + // remove ids that are in the running set but not in the metadata table + idsToRemove.forEach(this::recordCompletion); + if (idsToRemove.size() > 0) { + LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove); + } + + // Get the set of groups being referenced in the current configuration + Set groupsInConfiguration = null; + try { + groupsInConfiguration = getCompactionServicesConfigurationGroups(); + } catch (RuntimeException | ReflectiveOperationException e) { + LOG.error( + "Error getting groups from the compaction services configuration. Unable to clean up internal state.", + e); + return; + } + + // Compaction jobs are created in the TabletGroupWatcher and added to the Coordinator + // via the addJobs method which adds the job to the CompactionJobQueues object. + final Set groupsWithJobs = jobQueues.getQueueIds(); + + final Set jobGroupsNotInConfiguration = + Sets.difference(groupsWithJobs, groupsInConfiguration); + + if (jobGroupsNotInConfiguration != null && !jobGroupsNotInConfiguration.isEmpty()) { + RUNNING_CACHE.values().forEach(rc -> { + if (jobGroupsNotInConfiguration.contains(CompactorGroupId.of(rc.getGroupName()))) { + LOG.warn( + "External compaction {} running in group {} on compactor {}," + + " but group not found in current configuration. Failing compaction...", + rc.getJob().getExternalCompactionId(), rc.getGroupName(), rc.getCompactorAddress()); + cancelCompactionOnCompactor(rc.getCompactorAddress(), + rc.getJob().getExternalCompactionId()); + } + }); + + final Set trackedGroups = Set.copyOf(TIME_COMPACTOR_LAST_CHECKED.keySet()); + TIME_COMPACTOR_LAST_CHECKED.keySet().retainAll(groupsInConfiguration); + LOG.debug("No longer tracking compactor check-in times for groups: {}", + Sets.difference(trackedGroups, TIME_COMPACTOR_LAST_CHECKED.keySet())); + } + + final Map> runningCompactors = getRunningCompactors(); + + final Set runningCompactorGroups = new HashSet<>(); + runningCompactors.keySet() + .forEach(group -> runningCompactorGroups.add(CompactorGroupId.of(group))); + + final Set groupsWithNoCompactors = + Sets.difference(groupsInConfiguration, runningCompactorGroups); + if (groupsWithNoCompactors != null && !groupsWithNoCompactors.isEmpty()) { + for (CompactorGroupId group : groupsWithNoCompactors) { + long queuedJobCount = jobQueues.getQueuedJobs(group); + if (queuedJobCount > 0) { + LOG.warn("Compactor group {} has {} queued compactions but no running compactors", group, + queuedJobCount); + } + } + } + + final Set compactorsWithNoGroups = + Sets.difference(runningCompactorGroups, groupsInConfiguration); + if (compactorsWithNoGroups != null && !compactorsWithNoGroups.isEmpty()) { + LOG.warn( + "The following groups have running compactors, but are not in the current configuration: {}", + compactorsWithNoGroups); + } + + final long now = System.currentTimeMillis(); + final long warningTime = getMissingCompactorWarningTime(); + Map> idleCompactors = getIdleCompactors(runningCompactors); + for (CompactorGroupId groupName : groupsInConfiguration) { + long lastCheckTime = + TIME_COMPACTOR_LAST_CHECKED.getOrDefault(groupName, coordinatorStartTime); + if ((now - lastCheckTime) > warningTime && jobQueues.getQueuedJobs(groupName) > 0 + && idleCompactors.containsKey(groupName.canonical())) { + LOG.warn( + "The group {} has queued jobs and {} idle compactors, however none have checked in " + + "with coordinator for {}ms", + groupName, idleCompactors.get(groupName.canonical()).size(), warningTime); + } + } + } + private static Set getFilesReservedBySelection(TabletMetadata tabletMetadata, SteadyTime steadyTime, ServerContext ctx) { if (tabletMetadata.getSelectedFiles() == null) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java index bf702b0db76..7dbf45a3d6e 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java @@ -230,8 +230,7 @@ private void detectDeadCompactions() { this.deadCompactions.keySet().removeAll(toFail); } - // Find and delete any known tables that have unreferenced - // compaction tmp files. + // Find and delete compaction tmp files that are unreferenced if (!tablesWithUnreferencedTmpFiles.isEmpty()) { Set copy = new HashSet<>(); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index 58a592f0363..90c100aa729 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -148,13 +148,10 @@ protected long getTServerCheckInterval() { } @Override - protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) {} + protected void startCompactorZKCleaner(ScheduledThreadPoolExecutor schedExecutor) {} @Override - protected void startRunningCleaner(ScheduledThreadPoolExecutor schedExecutor) {} - - @Override - protected void startIdleCompactionWatcher() { + protected void startInternalStateCleaner(ScheduledThreadPoolExecutor schedExecutor) { // This is called from CompactionCoordinator.run(). Counting down // the latch will exit the run method this.shutdown.countDown(); @@ -196,6 +193,11 @@ protected List getCompactionsRunningOnCompactors() { return runningCompactions; } + @Override + protected Map> getRunningCompactors() { + return Map.of(); + } + @Override protected CompactionMetadata reserveCompaction(MetaJob metaJob, String compactorAddress, ExternalCompactionId externalCompactionId) { @@ -434,13 +436,13 @@ public void testCleanUpRunning() throws Exception { coordinator.getRunning().put(ecid2, new RunningCompaction(new TExternalCompaction())); coordinator.getRunning().put(ecid3, new RunningCompaction(new TExternalCompaction())); - coordinator.cleanUpRunning(); + coordinator.cleanUpInternalState(); assertEquals(Set.of(ecid1, ecid2, ecid3), coordinator.getRunning().keySet()); coordinator.setMetadataCompactionIds(Set.of(ecid1, ecid2)); - coordinator.cleanUpRunning(); + coordinator.cleanUpInternalState(); assertEquals(Set.of(ecid1, ecid2), coordinator.getRunning().keySet());