Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop tracking last compactor check-in for non-existent groups #4403

Open
wants to merge 12 commits into
base: elasticity
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -231,57 +231,22 @@ public void shutdown() {
}
}

protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) {
ScheduledFuture<?> future =
schedExecutor.scheduleWithFixedDelay(this::cleanUpCompactors, 0, 5, TimeUnit.MINUTES);
protected void startCompactorZKCleaner(ScheduledThreadPoolExecutor schedExecutor) {
ScheduledFuture<?> future = schedExecutor
.scheduleWithFixedDelay(this::cleanUpEmptyCompactorPathInZK, 0, 5, TimeUnit.MINUTES);
ThreadPools.watchNonCriticalScheduledTask(future);
}

protected void startRunningCleaner(ScheduledThreadPoolExecutor schedExecutor) {
protected void startInternalStateCleaner(ScheduledThreadPoolExecutor schedExecutor) {
ScheduledFuture<?> future =
schedExecutor.scheduleWithFixedDelay(this::cleanUpRunning, 0, 5, TimeUnit.MINUTES);
ThreadPools.watchNonCriticalScheduledTask(future);
}

protected void startIdleCompactionWatcher() {

ScheduledFuture<?> future = schedExecutor.scheduleWithFixedDelay(this::idleCompactionWarning,
getTServerCheckInterval(), getTServerCheckInterval(), TimeUnit.MILLISECONDS);
ThreadPools.watchNonCriticalScheduledTask(future);
}

protected void startGroupCheckThread() {
ScheduledFuture<?> future = schedExecutor.scheduleWithFixedDelay(() -> {
Set<String> compactorGroups = ExternalCompactionUtil.getCompactorAddrs(ctx).keySet();
for (CompactorGroupId groupId : TIME_COMPACTOR_LAST_CHECKED.keySet()) {
if (!compactorGroups.contains(groupId.canonical())) {
TIME_COMPACTOR_LAST_CHECKED.remove(groupId);
}
}
}, 5, 5, TimeUnit.MINUTES);
schedExecutor.scheduleWithFixedDelay(this::cleanUpInternalState, 0, 5, TimeUnit.MINUTES);
ThreadPools.watchNonCriticalScheduledTask(future);
}

private void idleCompactionWarning() {

long now = System.currentTimeMillis();
Map<String,Set<HostAndPort>> idleCompactors = getIdleCompactors();
TIME_COMPACTOR_LAST_CHECKED.forEach((groupName, lastCheckTime) -> {
if ((now - lastCheckTime) > getMissingCompactorWarningTime()
&& jobQueues.getQueuedJobs(groupName) > 0
&& idleCompactors.containsKey(groupName.canonical())) {
LOG.warn("No compactors have checked in with coordinator for group {} in {}ms", groupName,
getMissingCompactorWarningTime());
}
});

}

@Override
public void run() {

startCompactionCleaner(schedExecutor);
startRunningCleaner(schedExecutor);
startCompactorZKCleaner(schedExecutor);

// On a re-start of the coordinator it's possible that external compactions are in-progress.
// Attempt to get the running compactions on the compactors and then resolve which tserver
Expand All @@ -303,10 +268,7 @@ public void run() {
}

startDeadCompactionDetector();

startIdleCompactionWatcher();

startGroupCheckThread();
startInternalStateCleaner(schedExecutor);

try {
shutdown.await();
Expand All @@ -320,7 +282,7 @@ public void run() {
private Map<String,Set<HostAndPort>> getIdleCompactors() {

Map<String,Set<HostAndPort>> allCompactors = new HashMap<>();
ExternalCompactionUtil.getCompactorAddrs(ctx)
getRunningCompactors()
.forEach((group, compactorList) -> allCompactors.put(group, new HashSet<>(compactorList)));

Set<String> emptyQueues = new HashSet<>();
Expand Down Expand Up @@ -903,30 +865,6 @@ protected Set<ExternalCompactionId> readExternalCompactionIds() {
}
}

/**
* The RUNNING_CACHE set may contain external compactions that are not actually running. This
* method periodically cleans those up.
*/
public void cleanUpRunning() {

// grab a snapshot of the ids in the set before reading the metadata table. This is done to
// avoid removing things that are added while reading the metadata.
Set<ExternalCompactionId> idsSnapshot = Set.copyOf(RUNNING_CACHE.keySet());

// grab the ids that are listed as running in the metadata table. It important that this is done
// after getting the snapshot.
Set<ExternalCompactionId> idsInMetadata = readExternalCompactionIds();

var idsToRemove = Sets.difference(idsSnapshot, idsInMetadata);

// remove ids that are in the running set but not in the metadata table
idsToRemove.forEach(this::recordCompletion);

if (idsToRemove.size() > 0) {
LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove);
}
}

/**
* Return information about running compactions
*
Expand Down Expand Up @@ -1013,6 +951,11 @@ protected List<RunningCompaction> getCompactionsRunningOnCompactors() {
return ExternalCompactionUtil.getCompactionsRunningOnCompactors(this.ctx);
}

/* Method exists to be overridden in test to hide static method */
protected Map<String,List<HostAndPort>> getRunningCompactors() {
return ExternalCompactionUtil.getCompactorAddrs(this.ctx);
}

/* Method exists to be overridden in test to hide static method */
protected void cancelCompactionOnCompactor(String address, String externalCompactionId) {
HostAndPort hostPort = HostAndPort.fromString(address);
Expand All @@ -1029,7 +972,7 @@ private void deleteEmpty(ZooReaderWriter zoorw, String path)
}
}

private void cleanUpCompactors() {
private void cleanUpEmptyCompactorPathInZK() {
final String compactorQueuesPath = this.ctx.getZooKeeperRoot() + Constants.ZCOMPACTORS;

var zoorw = this.ctx.getZooReaderWriter();
Expand Down Expand Up @@ -1063,4 +1006,91 @@ private void cleanUpCompactors() {
}
}

public void cleanUpInternalState() {

// This method does the following:
//
// 1. Removes entries from RUNNING_CACHE that are not really running
// 2. Cancels running compactions for groups that are not in the current configuration
// 3. Remove groups not in configuration from TIME_COMPACTOR_LAST_CHECKED
// 4. Log groups with no compactors
// 5. Log compactors with no groups
// 6. Log groups with compactors that have not checked in

// grab a snapshot of the ids in the set before reading the metadata table. This is done to
// avoid removing things that are added while reading the metadata.
Set<ExternalCompactionId> idsSnapshot = Set.copyOf(RUNNING_CACHE.keySet());

// grab the ids that are listed as running in the metadata table. It important that this is done
// after getting the snapshot.
Set<ExternalCompactionId> idsInMetadata = readExternalCompactionIds();

var idsToRemove = Sets.difference(idsSnapshot, idsInMetadata);

// remove ids that are in the running set but not in the metadata table
idsToRemove.forEach(this::recordCompletion);

if (idsToRemove.size() > 0) {
LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove);
}

// Get the set of groups being referenced in the current configuration
// Needs Dan's changes for this
Set<CompactorGroupId> groupsInConfiguration = new HashSet<>();

// Compaction jobs are created in the TabletGroupWatcher and added to the Coordinator
// via the addJobs method which adds the job to the CompactionJobQueues object.
Set<CompactorGroupId> groupsWithJobs = jobQueues.getQueueIds();

Set<CompactorGroupId> jobGroupsNotInConfiguration =
Sets.difference(groupsWithJobs, groupsInConfiguration);

if (jobGroupsNotInConfiguration != null && !jobGroupsNotInConfiguration.isEmpty()) {
RUNNING_CACHE.values().forEach(rc -> {
if (jobGroupsNotInConfiguration.contains(CompactorGroupId.of(rc.getGroupName()))) {
LOG.warn(
"External compaction {} running in group {} on compactor {},"
+ " but group not found in current configuration. Failing compaction...",
rc.getJob().getExternalCompactionId(), rc.getGroupName(), rc.getCompactorAddress());
cancelCompactionOnCompactor(rc.getCompactorAddress(),
rc.getJob().getExternalCompactionId());
}
});

// Remove groups not in configuration from TIME_COMPACTOR_LAST_CHECKED
LOG.debug("No longer tracking compactor check-in times for groups: {}",
jobGroupsNotInConfiguration);
jobGroupsNotInConfiguration.forEach(TIME_COMPACTOR_LAST_CHECKED::remove);
}
dlmarion marked this conversation as resolved.
Show resolved Hide resolved

final Set<CompactorGroupId> runningCompactorGroups = new HashSet<>();
getRunningCompactors().keySet()
.forEach(group -> runningCompactorGroups.add(CompactorGroupId.of(group)));

Set<CompactorGroupId> groupsWithNoCompactors =
Sets.difference(groupsInConfiguration, runningCompactorGroups);
if (groupsWithNoCompactors != null && !groupsWithNoCompactors.isEmpty()) {
LOG.warn("The following groups have no running compactors: {}", groupsWithNoCompactors);
dlmarion marked this conversation as resolved.
Show resolved Hide resolved
}

Set<CompactorGroupId> compactorsWithNoGroups =
Sets.difference(runningCompactorGroups, groupsInConfiguration);
if (compactorsWithNoGroups != null && !compactorsWithNoGroups.isEmpty()) {
LOG.warn(
"The following groups have running compactors, but are not in the current configuration: {}",
compactorsWithNoGroups);
}

long now = System.currentTimeMillis();
Map<String,Set<HostAndPort>> idleCompactors = getIdleCompactors();
dlmarion marked this conversation as resolved.
Show resolved Hide resolved
TIME_COMPACTOR_LAST_CHECKED.forEach((groupName, lastCheckTime) -> {
dlmarion marked this conversation as resolved.
Show resolved Hide resolved
if ((now - lastCheckTime) > getMissingCompactorWarningTime()
&& jobQueues.getQueuedJobs(groupName) > 0
&& idleCompactors.containsKey(groupName.canonical())) {
LOG.warn("No compactors have checked in with coordinator for group {} in {}ms", groupName,
getMissingCompactorWarningTime());
dlmarion marked this conversation as resolved.
Show resolved Hide resolved
}
});

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,7 @@ private void detectDeadCompactions() {
this.deadCompactions.keySet().removeAll(toFail);
}

// Find and delete any known tables that have unreferenced
// compaction tmp files.
// Find and delete compaction tmp files that are unreferenced
if (!tablesWithUnreferencedTmpFiles.isEmpty()) {

Set<TableId> copy = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,22 +115,16 @@ public TestCoordinator(ServerContext ctx, SecurityOperation security,
@Override
protected void startDeadCompactionDetector() {}

@Override
protected void startGroupCheckThread() {}

@Override
protected long getTServerCheckInterval() {
return 5000L;
}

@Override
protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) {}

@Override
protected void startRunningCleaner(ScheduledThreadPoolExecutor schedExecutor) {}
protected void startCompactorZKCleaner(ScheduledThreadPoolExecutor schedExecutor) {}

@Override
protected void startIdleCompactionWatcher() {
protected void startInternalStateCleaner(ScheduledThreadPoolExecutor schedExecutor) {
// This is called from CompactionCoordinator.run(). Counting down
// the latch will exit the run method
this.shutdown.countDown();
Expand Down Expand Up @@ -172,6 +166,11 @@ protected List<RunningCompaction> getCompactionsRunningOnCompactors() {
return runningCompactions;
}

@Override
protected Map<String,List<HostAndPort>> getRunningCompactors() {
return Map.of();
}

@Override
protected CompactionMetadata reserveCompaction(MetaJob metaJob, String compactorAddress,
ExternalCompactionId externalCompactionId) {
Expand Down Expand Up @@ -382,13 +381,13 @@ public void testCleanUpRunning() throws Exception {
coordinator.getRunning().put(ecid2, new RunningCompaction(new TExternalCompaction()));
coordinator.getRunning().put(ecid3, new RunningCompaction(new TExternalCompaction()));

coordinator.cleanUpRunning();
coordinator.cleanUpInternalState();

assertEquals(Set.of(ecid1, ecid2, ecid3), coordinator.getRunning().keySet());

coordinator.setMetadataCompactionIds(Set.of(ecid1, ecid2));

coordinator.cleanUpRunning();
coordinator.cleanUpInternalState();

assertEquals(Set.of(ecid1, ecid2), coordinator.getRunning().keySet());

Expand Down