Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes #3045 remove stale compactions from coordinator #3059

Merged
merged 3 commits into from Nov 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -1701,7 +1701,7 @@
<jdk>[17,)</jdk>
</activation>
<properties>
<extraTestArgs>--add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.io=ALL-UNNAMED --add-opens java.base/java.net=ALL-UNNAMED --add-opens java.management/java.lang.management=ALL-UNNAMED --add-opens java.management/sun.management=ALL-UNNAMED --add-opens java.base/java.security=ALL-UNNAMED --add-opens java.base/java.lang.reflect=ALL-UNNAMED --add-opens java.base/java.util.concurrent=ALL-UNNAMED --add-opens java.base/java.util.concurrent.atomic=ALL-UNNAMED</extraTestArgs>
<extraTestArgs>--add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.io=ALL-UNNAMED --add-opens java.base/java.net=ALL-UNNAMED --add-opens java.management/java.lang.management=ALL-UNNAMED --add-opens java.management/sun.management=ALL-UNNAMED --add-opens java.base/java.security=ALL-UNNAMED --add-opens java.base/java.lang.reflect=ALL-UNNAMED --add-opens java.base/java.util.concurrent=ALL-UNNAMED --add-opens java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens java.base/java.time=ALL-UNNAMED</extraTestArgs>
</properties>
</profile>
</profiles>
Expand Down
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.accumulo.coordinator.QueueSummaries.PrioTserver;
import org.apache.accumulo.core.Constants;
Expand All @@ -55,7 +56,9 @@
import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metrics.MetricsUtil;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
Expand Down Expand Up @@ -90,6 +93,7 @@

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.Sets;

public class CompactionCoordinator extends AbstractServer
implements CompactionCoordinatorService.Iface, LiveTServerSet.Listener {
Expand All @@ -100,8 +104,14 @@ public class CompactionCoordinator extends AbstractServer

protected static final QueueSummaries QUEUE_SUMMARIES = new QueueSummaries();

/* Map of compactionId to RunningCompactions */
protected static final Map<ExternalCompactionId,RunningCompaction> RUNNING =
/*
* Map of compactionId to RunningCompactions. This is an informational cache of what external
* compactions may be running. Its possible it may contain external compactions that are not
* actually running. It may not contain compactions that are actually running. The metadata table
* is the most authoritative source of what external compactions are currently running, but it
* does not have the stats that this map has.
*/
protected static final Map<ExternalCompactionId,RunningCompaction> RUNNING_CACHE =
new ConcurrentHashMap<>();

private static final Cache<ExternalCompactionId,RunningCompaction> COMPLETED =
Expand Down Expand Up @@ -137,6 +147,7 @@ protected CompactionCoordinator(ServerOpts opts, String[] args, AccumuloConfigur
startGCLogger(schedExecutor);
printStartupMsg();
startCompactionCleaner(schedExecutor);
startRunningCleaner(schedExecutor);
}

@Override
Expand Down Expand Up @@ -170,6 +181,12 @@ protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor)
ThreadPools.watchNonCriticalScheduledTask(future);
}

protected void startRunningCleaner(ScheduledThreadPoolExecutor schedExecutor) {
ScheduledFuture<?> future =
schedExecutor.scheduleWithFixedDelay(this::cleanUpRunning, 0, 5, TimeUnit.MINUTES);
ThreadPools.watchNonCriticalScheduledTask(future);
}

protected void printStartupMsg() {
LOG.info("Version " + Constants.VERSION);
LOG.info("Instance " + getContext().getInstanceID());
Expand Down Expand Up @@ -277,7 +294,7 @@ public void run() {
update.setState(TCompactionState.IN_PROGRESS);
update.setMessage("Coordinator restarted, compaction found in progress");
rc.addUpdate(System.currentTimeMillis(), update);
RUNNING.put(ExternalCompactionId.of(rc.getJob().getExternalCompactionId()), rc);
RUNNING_CACHE.put(ExternalCompactionId.of(rc.getJob().getExternalCompactionId()), rc);
});
}

Expand Down Expand Up @@ -446,7 +463,10 @@ public TExternalCompactionJob getCompactionJob(TInfo tinfo, TCredentials credent
prioTserver = QUEUE_SUMMARIES.getNextTserver(queue);
continue;
}
RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()),
// It is possible that by the time this added that the tablet has already canceled the
// compaction or the compactor that made this request is dead. In these cases the compaction
// is not actually running.
RUNNING_CACHE.put(ExternalCompactionId.of(job.getExternalCompactionId()),
new RunningCompaction(job, compactorAddress, queue));
LOG.debug("Returning external job {} to {}", job.externalCompactionId, compactorAddress);
result = job;
Expand Down Expand Up @@ -523,15 +543,7 @@ public void compactionCompleted(TInfo tinfo, TCredentials credentials,
// It's possible that RUNNING might not have an entry for this ecid in the case
// of a coordinator restart when the Coordinator can't find the TServer for the
// corresponding external compaction.
final RunningCompaction rc = RUNNING.get(ecid);
if (null != rc) {
RUNNING.remove(ecid, rc);
COMPLETED.put(ecid, rc);
} else {
LOG.warn(
"Compaction completed called by Compactor for {}, but no running compaction for that id.",
externalCompactionId);
}
recordCompletion(ecid);
}

@Override
Expand All @@ -549,17 +561,7 @@ public void compactionFailed(TInfo tinfo, TCredentials credentials, String exter

void compactionFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
compactionFinalizer.failCompactions(compactions);
compactions.forEach((k, v) -> {
final RunningCompaction rc = RUNNING.get(k);
if (null != rc) {
RUNNING.remove(k, rc);
COMPLETED.put(k, rc);
} else {
LOG.warn(
"Compaction failed called by Compactor for {}, but no running compaction for that id.",
k);
}
});
compactions.forEach((k, v) -> recordCompletion(k));
}

/**
Expand Down Expand Up @@ -589,12 +591,49 @@ public void updateCompactionStatus(TInfo tinfo, TCredentials credentials,
}
LOG.debug("Compaction status update, id: {}, timestamp: {}, update: {}", externalCompactionId,
timestamp, update);
final RunningCompaction rc = RUNNING.get(ExternalCompactionId.of(externalCompactionId));
final RunningCompaction rc = RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId));
if (null != rc) {
rc.addUpdate(timestamp, update);
}
}

private void recordCompletion(ExternalCompactionId ecid) {
var rc = RUNNING_CACHE.remove(ecid);
if (rc != null) {
COMPLETED.put(ecid, rc);
}
}

protected Set<ExternalCompactionId> readExternalCompactionIds() {
return getContext().getAmple().readTablets().forLevel(Ample.DataLevel.USER)
.fetch(TabletMetadata.ColumnType.ECOMP).build().stream()
.flatMap(tm -> tm.getExternalCompactions().keySet().stream()).collect(Collectors.toSet());
}

/**
* The RUNNING_CACHE set may contain external compactions that are not actually running. This
* method periodically cleans those up.
*/
protected void cleanUpRunning() {

// grab a snapshot of the ids in the set before reading the metadata table. This is done to
// avoid removing things that are added while reading the metadata.
Set<ExternalCompactionId> idsSnapshot = Set.copyOf(RUNNING_CACHE.keySet());

// grab the ids that are listed as running in the metadata table. It important that this is done
// after getting the snapshot.
Set<ExternalCompactionId> idsInMetadata = readExternalCompactionIds();

var idsToRemove = Sets.difference(idsSnapshot, idsInMetadata);

// remove ids that are in the running set but not in the metadata table
idsToRemove.forEach(ecid -> recordCompletion(ecid));

if (idsToRemove.size() > 0) {
LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove);
}
}

/**
* Return information about running compactions
*
Expand All @@ -614,8 +653,9 @@ public TExternalCompactionList getRunningCompactions(TInfo tinfo, TCredentials c
throw new AccumuloSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED).asThriftException();
}

final TExternalCompactionList result = new TExternalCompactionList();
RUNNING.forEach((ecid, rc) -> {
RUNNING_CACHE.forEach((ecid, rc) -> {
TExternalCompaction trc = new TExternalCompaction();
trc.setQueueName(rc.getQueueName());
trc.setCompactor(rc.getCompactorAddress());
Expand Down Expand Up @@ -660,7 +700,7 @@ public TExternalCompactionList getCompletedCompactions(TInfo tinfo, TCredentials
@Override
public void cancel(TInfo tinfo, TCredentials credentials, String externalCompactionId)
throws TException {
var runningCompaction = RUNNING.get(ExternalCompactionId.of(externalCompactionId));
var runningCompaction = RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId));
var extent = KeyExtent.fromThrift(runningCompaction.getJob().getExtent());
try {
NamespaceId nsId = getContext().getNamespaceId(extent.tableId());
Expand Down
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;

import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
import org.apache.accumulo.core.metadata.TServerInstance;
Expand Down Expand Up @@ -87,6 +88,8 @@ public class TestCoordinator extends CompactionCoordinator {
private final ServerAddress client;
private final TabletClientService.Client tabletServerClient;

private Set<ExternalCompactionId> metadataCompactionIds = null;

protected TestCoordinator(CompactionFinalizer finalizer, LiveTServerSet tservers,
ServerAddress client, TabletClientService.Client tabletServerClient, ServerContext context,
AuditedSecurityOperation security) {
Expand Down Expand Up @@ -158,6 +161,18 @@ public void compactionCompleted(TInfo tinfo, TCredentials credentials,
public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId,
TKeyExtent extent) throws ThriftSecurityException {}

void setMetadataCompactionIds(Set<ExternalCompactionId> mci) {
metadataCompactionIds = mci;
}

protected Set<ExternalCompactionId> readExternalCompactionIds() {
if (metadataCompactionIds == null) {
return RUNNING_CACHE.keySet();
} else {
return metadataCompactionIds;
}
}

public Map<String,TreeMap<Short,TreeSet<TServerInstance>>> getQueues() {
return CompactionCoordinator.QUEUE_SUMMARIES.QUEUES;
}
Expand All @@ -167,13 +182,14 @@ public Map<TServerInstance,Set<QueueAndPriority>> getIndex() {
}

public Map<ExternalCompactionId,RunningCompaction> getRunning() {
return RUNNING;
return RUNNING_CACHE;
}

public void resetInternals() {
getQueues().clear();
getIndex().clear();
getRunning().clear();
metadataCompactionIds = null;
}

}
Expand Down Expand Up @@ -586,4 +602,50 @@ public void testGetCompactionJobNoJobs() throws Exception {
coordinator.close();
}

@Test
public void testCleanUpRunning() throws Exception {
PowerMock.resetAll();
PowerMock.suppress(PowerMock.constructor(AbstractServer.class));

ServerContext context = PowerMock.createNiceMock(ServerContext.class);
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();

TCredentials creds = PowerMock.createNiceMock(TCredentials.class);

CompactionFinalizer finalizer = PowerMock.createNiceMock(CompactionFinalizer.class);
LiveTServerSet tservers = PowerMock.createNiceMock(LiveTServerSet.class);

ServerAddress client = PowerMock.createNiceMock(ServerAddress.class);
HostAndPort address = HostAndPort.fromString("localhost:10240");
expect(client.getAddress()).andReturn(address).anyTimes();

TabletClientService.Client tsc = PowerMock.createNiceMock(TabletClientService.Client.class);

AuditedSecurityOperation security = PowerMock.createNiceMock(AuditedSecurityOperation.class);
expect(security.canPerformSystemActions(creds)).andReturn(true);

PowerMock.replayAll();

var coordinator = new TestCoordinator(finalizer, tservers, client, tsc, context, security);
coordinator.resetInternals();

var ecid1 = ExternalCompactionId.generate(UUID.randomUUID());
var ecid2 = ExternalCompactionId.generate(UUID.randomUUID());
var ecid3 = ExternalCompactionId.generate(UUID.randomUUID());

coordinator.getRunning().put(ecid1, new RunningCompaction(new TExternalCompaction()));
coordinator.getRunning().put(ecid2, new RunningCompaction(new TExternalCompaction()));
coordinator.getRunning().put(ecid3, new RunningCompaction(new TExternalCompaction()));

coordinator.cleanUpRunning();

assertEquals(Set.of(ecid1, ecid2, ecid3), coordinator.getRunning().keySet());

coordinator.setMetadataCompactionIds(Set.of(ecid1, ecid2));

coordinator.cleanUpRunning();

assertEquals(Set.of(ecid1, ecid2), coordinator.getRunning().keySet());

}
}