Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop tracking last compactor check-in for non-existent groups #4403

Open
wants to merge 11 commits into
base: elasticity
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -239,44 +239,22 @@ public void shutdown() {
}
}

protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) {
ScheduledFuture<?> future =
schedExecutor.scheduleWithFixedDelay(this::cleanUpCompactors, 0, 5, TimeUnit.MINUTES);
protected void startCompactorZKCleaner(ScheduledThreadPoolExecutor schedExecutor) {
ScheduledFuture<?> future = schedExecutor
.scheduleWithFixedDelay(this::cleanUpEmptyCompactorPathInZK, 0, 5, TimeUnit.MINUTES);
ThreadPools.watchNonCriticalScheduledTask(future);
}

protected void startRunningCleaner(ScheduledThreadPoolExecutor schedExecutor) {
protected void startInternalStateCleaner(ScheduledThreadPoolExecutor schedExecutor) {
ScheduledFuture<?> future =
schedExecutor.scheduleWithFixedDelay(this::cleanUpRunning, 0, 5, TimeUnit.MINUTES);
ThreadPools.watchNonCriticalScheduledTask(future);
}

protected void startIdleCompactionWatcher() {

ScheduledFuture<?> future = schedExecutor.scheduleWithFixedDelay(this::idleCompactionWarning,
getTServerCheckInterval(), getTServerCheckInterval(), TimeUnit.MILLISECONDS);
schedExecutor.scheduleWithFixedDelay(this::cleanUpInternalState, 0, 5, TimeUnit.MINUTES);
ThreadPools.watchNonCriticalScheduledTask(future);
}

private void idleCompactionWarning() {

long now = System.currentTimeMillis();
Map<String,Set<HostAndPort>> idleCompactors = getIdleCompactors();
TIME_COMPACTOR_LAST_CHECKED.forEach((groupName, lastCheckTime) -> {
if ((now - lastCheckTime) > getMissingCompactorWarningTime()
&& jobQueues.getQueuedJobs(groupName) > 0
&& idleCompactors.containsKey(groupName.canonical())) {
LOG.warn("No compactors have checked in with coordinator for group {} in {}ms", groupName,
getMissingCompactorWarningTime());
}
});

}

@Override
public void run() {
startCompactionCleaner(schedExecutor);
startRunningCleaner(schedExecutor);

startCompactorZKCleaner(schedExecutor);

// On a re-start of the coordinator it's possible that external compactions are in-progress.
// Attempt to get the running compactions on the compactors and then resolve which tserver
Expand All @@ -298,8 +276,7 @@ public void run() {
}

startDeadCompactionDetector();

startIdleCompactionWatcher();
startInternalStateCleaner(schedExecutor);

try {
shutdown.await();
Expand All @@ -310,13 +287,14 @@ public void run() {
LOG.info("Shutting down");
}

private Map<String,Set<HostAndPort>> getIdleCompactors() {
private Map<String,Set<HostAndPort>>
getIdleCompactors(Map<String,Set<HostAndPort>> runningCompactors) {

Map<String,Set<HostAndPort>> allCompactors = new HashMap<>();
ExternalCompactionUtil.getCompactorAddrs(ctx)
final Map<String,Set<HostAndPort>> allCompactors = new HashMap<>();
runningCompactors
.forEach((group, compactorList) -> allCompactors.put(group, new HashSet<>(compactorList)));

Set<String> emptyQueues = new HashSet<>();
final Set<String> emptyQueues = new HashSet<>();

// Remove all of the compactors that are running a compaction
RUNNING_CACHE.values().forEach(rc -> {
Expand Down Expand Up @@ -926,30 +904,6 @@ protected Set<ExternalCompactionId> readExternalCompactionIds() {
}
}

/**
* The RUNNING_CACHE set may contain external compactions that are not actually running. This
* method periodically cleans those up.
*/
public void cleanUpRunning() {

// grab a snapshot of the ids in the set before reading the metadata table. This is done to
// avoid removing things that are added while reading the metadata.
Set<ExternalCompactionId> idsSnapshot = Set.copyOf(RUNNING_CACHE.keySet());

// grab the ids that are listed as running in the metadata table. It important that this is done
// after getting the snapshot.
Set<ExternalCompactionId> idsInMetadata = readExternalCompactionIds();

var idsToRemove = Sets.difference(idsSnapshot, idsInMetadata);

// remove ids that are in the running set but not in the metadata table
idsToRemove.forEach(this::recordCompletion);

if (idsToRemove.size() > 0) {
LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove);
}
}

/**
* Return information about running compactions
*
Expand Down Expand Up @@ -1036,6 +990,11 @@ protected List<RunningCompaction> getCompactionsRunningOnCompactors() {
return ExternalCompactionUtil.getCompactionsRunningOnCompactors(this.ctx);
}

/* Method exists to be overridden in test to hide static method */
protected Map<String,Set<HostAndPort>> getRunningCompactors() {
return ExternalCompactionUtil.getCompactorAddrs(this.ctx);
}

/* Method exists to be overridden in test to hide static method */
protected void cancelCompactionOnCompactor(String address, String externalCompactionId) {
HostAndPort hostPort = HostAndPort.fromString(address);
Expand All @@ -1052,7 +1011,7 @@ private void deleteEmpty(ZooReaderWriter zoorw, String path)
}
}

private void cleanUpCompactors() {
private void cleanUpEmptyCompactorPathInZK() {
final String compactorQueuesPath = this.ctx.getZooKeeperRoot() + Constants.ZCOMPACTORS;

var zoorw = this.ctx.getZooReaderWriter();
Expand Down Expand Up @@ -1086,6 +1045,105 @@ private void cleanUpCompactors() {
}
}

public void cleanUpInternalState() {

// This method does the following:
//
// 1. Removes entries from RUNNING_CACHE that are not really running
// 2. Cancels running compactions for groups that are not in the current configuration
// 3. Remove groups not in configuration from TIME_COMPACTOR_LAST_CHECKED
// 4. Log groups with no compactors
// 5. Log compactors with no groups
// 6. Log groups with compactors and queued jos that have not checked in

// grab a snapshot of the ids in the set before reading the metadata table. This is done to
// avoid removing things that are added while reading the metadata.
final Set<ExternalCompactionId> idsSnapshot = Set.copyOf(RUNNING_CACHE.keySet());

// grab the ids that are listed as running in the metadata table. It important that this is done
// after getting the snapshot.
final Set<ExternalCompactionId> idsInMetadata = readExternalCompactionIds();

var idsToRemove = Sets.difference(idsSnapshot, idsInMetadata);

// remove ids that are in the running set but not in the metadata table
idsToRemove.forEach(this::recordCompletion);
if (idsToRemove.size() > 0) {
LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove);
}

// Get the set of groups being referenced in the current configuration
// TODO: Needs Dan's changes to get Compactor configuration for this
// to work.
final Set<CompactorGroupId> groupsInConfiguration = new HashSet<>();

// Compaction jobs are created in the TabletGroupWatcher and added to the Coordinator
// via the addJobs method which adds the job to the CompactionJobQueues object.
final Set<CompactorGroupId> groupsWithJobs = jobQueues.getQueueIds();

final Set<CompactorGroupId> jobGroupsNotInConfiguration =
Sets.difference(groupsWithJobs, groupsInConfiguration);

if (jobGroupsNotInConfiguration != null && !jobGroupsNotInConfiguration.isEmpty()) {
RUNNING_CACHE.values().forEach(rc -> {
if (jobGroupsNotInConfiguration.contains(CompactorGroupId.of(rc.getGroupName()))) {
LOG.warn(
"External compaction {} running in group {} on compactor {},"
+ " but group not found in current configuration. Failing compaction...",
rc.getJob().getExternalCompactionId(), rc.getGroupName(), rc.getCompactorAddress());
cancelCompactionOnCompactor(rc.getCompactorAddress(),
rc.getJob().getExternalCompactionId());
}
});

final Set<CompactorGroupId> trackedGroups = Set.copyOf(TIME_COMPACTOR_LAST_CHECKED.keySet());
TIME_COMPACTOR_LAST_CHECKED.keySet().retainAll(groupsInConfiguration);
LOG.debug("No longer tracking compactor check-in times for groups: {}",
Sets.difference(trackedGroups, TIME_COMPACTOR_LAST_CHECKED.keySet()));
}

final Map<String,Set<HostAndPort>> runningCompactors = getRunningCompactors();

final Set<CompactorGroupId> runningCompactorGroups = new HashSet<>();
runningCompactors.keySet()
.forEach(group -> runningCompactorGroups.add(CompactorGroupId.of(group)));

final Set<CompactorGroupId> groupsWithNoCompactors =
Sets.difference(groupsInConfiguration, runningCompactorGroups);
if (groupsWithNoCompactors != null && !groupsWithNoCompactors.isEmpty()) {
for (CompactorGroupId group : groupsWithNoCompactors) {
long queuedJobCount = jobQueues.getQueuedJobs(group);
if (queuedJobCount > 0) {
LOG.warn("Compactor group {} has {} queued compactions but no running compactors", group,
queuedJobCount);
}
}
}

final Set<CompactorGroupId> compactorsWithNoGroups =
Sets.difference(runningCompactorGroups, groupsInConfiguration);
if (compactorsWithNoGroups != null && !compactorsWithNoGroups.isEmpty()) {
LOG.warn(
"The following groups have running compactors, but are not in the current configuration: {}",
compactorsWithNoGroups);
}

long now = System.currentTimeMillis();
Map<String,Set<HostAndPort>> idleCompactors = getIdleCompactors(runningCompactors);
for (CompactorGroupId groupName : groupsInConfiguration) {
long lastCheckTime = TIME_COMPACTOR_LAST_CHECKED.getOrDefault(groupName, 0L);
if ((now - lastCheckTime) > getMissingCompactorWarningTime()
&& jobQueues.getQueuedJobs(groupName) > 0
&& idleCompactors.containsKey(groupName.canonical())) {
LOG.warn(
"The group {} has queued jobs and {} idle compactors, however none have checked in "
+ "with coordinator for {}ms",
groupName, idleCompactors.get(groupName.canonical()).size(),
getMissingCompactorWarningTime());
}
}
}

private static Set<StoredTabletFile> getFilesReservedBySelection(TabletMetadata tabletMetadata,
SteadyTime steadyTime, ServerContext ctx) {
if (tabletMetadata.getSelectedFiles() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,7 @@ private void detectDeadCompactions() {
this.deadCompactions.keySet().removeAll(toFail);
}

// Find and delete any known tables that have unreferenced
// compaction tmp files.
// Find and delete compaction tmp files that are unreferenced
if (!tablesWithUnreferencedTmpFiles.isEmpty()) {

Set<TableId> copy = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,10 @@ protected long getTServerCheckInterval() {
}

@Override
protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) {}
protected void startCompactorZKCleaner(ScheduledThreadPoolExecutor schedExecutor) {}

@Override
protected void startRunningCleaner(ScheduledThreadPoolExecutor schedExecutor) {}

@Override
protected void startIdleCompactionWatcher() {
protected void startInternalStateCleaner(ScheduledThreadPoolExecutor schedExecutor) {
// This is called from CompactionCoordinator.run(). Counting down
// the latch will exit the run method
this.shutdown.countDown();
Expand Down Expand Up @@ -190,6 +187,11 @@ protected List<RunningCompaction> getCompactionsRunningOnCompactors() {
return runningCompactions;
}

@Override
protected Map<String,Set<HostAndPort>> getRunningCompactors() {
return Map.of();
}

@Override
protected CompactionMetadata reserveCompaction(MetaJob metaJob, String compactorAddress,
ExternalCompactionId externalCompactionId) {
Expand Down Expand Up @@ -425,13 +427,13 @@ public void testCleanUpRunning() throws Exception {
coordinator.getRunning().put(ecid2, new RunningCompaction(new TExternalCompaction()));
coordinator.getRunning().put(ecid3, new RunningCompaction(new TExternalCompaction()));

coordinator.cleanUpRunning();
coordinator.cleanUpInternalState();

assertEquals(Set.of(ecid1, ecid2, ecid3), coordinator.getRunning().keySet());

coordinator.setMetadataCompactionIds(Set.of(ecid1, ecid2));

coordinator.cleanUpRunning();
coordinator.cleanUpInternalState();

assertEquals(Set.of(ecid1, ecid2), coordinator.getRunning().keySet());

Expand Down