Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop tracking last compactor check-in for non-existent groups #4403

Open
wants to merge 11 commits into
base: elasticity
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ public class CompactionCoordinator
new ConcurrentHashMap<>();

/* Map of group name to last time compactor called to get a compaction job */
// ELASTICITY_TODO need to clean out groups that are no longer configured..
private final Map<CompactorGroupId,Long> TIME_COMPACTOR_LAST_CHECKED = new ConcurrentHashMap<>();

private final ServerContext ctx;
Expand Down Expand Up @@ -251,6 +250,18 @@ protected void startIdleCompactionWatcher() {
ThreadPools.watchNonCriticalScheduledTask(future);
}

protected void startGroupCheckThread() {
ScheduledFuture<?> future = schedExecutor.scheduleWithFixedDelay(() -> {
Set<String> compactorGroups = ExternalCompactionUtil.getCompactorAddrs(ctx).keySet();
dlmarion marked this conversation as resolved.
Show resolved Hide resolved
for (CompactorGroupId groupId : TIME_COMPACTOR_LAST_CHECKED.keySet()) {
if (!compactorGroups.contains(groupId.canonical())) {
TIME_COMPACTOR_LAST_CHECKED.remove(groupId);
}
}
}, 5, 5, TimeUnit.MINUTES);
ThreadPools.watchNonCriticalScheduledTask(future);
}

private void idleCompactionWarning() {

long now = System.currentTimeMillis();
Expand Down Expand Up @@ -295,6 +306,8 @@ public void run() {

startIdleCompactionWatcher();

startGroupCheckThread();

try {
shutdown.await();
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ public TestCoordinator(ServerContext ctx, SecurityOperation security,
@Override
protected void startDeadCompactionDetector() {}

@Override
protected void startGroupCheckThread() {}

@Override
protected long getTServerCheckInterval() {
return 5000L;
Expand Down