Skip to content

Commit

Permalink
fixes #3045 remove stale compactions from coordinator (#3059)
Browse files Browse the repository at this point in the history
  • Loading branch information
keith-turner committed Nov 2, 2022
1 parent 1d43b5b commit 5e4e30b
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 29 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -1701,7 +1701,7 @@
<jdk>[17,)</jdk>
</activation>
<properties>
<extraTestArgs>--add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.io=ALL-UNNAMED --add-opens java.base/java.net=ALL-UNNAMED --add-opens java.management/java.lang.management=ALL-UNNAMED --add-opens java.management/sun.management=ALL-UNNAMED --add-opens java.base/java.security=ALL-UNNAMED --add-opens java.base/java.lang.reflect=ALL-UNNAMED --add-opens java.base/java.util.concurrent=ALL-UNNAMED --add-opens java.base/java.util.concurrent.atomic=ALL-UNNAMED</extraTestArgs>
<extraTestArgs>--add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.io=ALL-UNNAMED --add-opens java.base/java.net=ALL-UNNAMED --add-opens java.management/java.lang.management=ALL-UNNAMED --add-opens java.management/sun.management=ALL-UNNAMED --add-opens java.base/java.security=ALL-UNNAMED --add-opens java.base/java.lang.reflect=ALL-UNNAMED --add-opens java.base/java.util.concurrent=ALL-UNNAMED --add-opens java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens java.base/java.time=ALL-UNNAMED</extraTestArgs>
</properties>
</profile>
</profiles>
Expand Down
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.accumulo.coordinator.QueueSummaries.PrioTserver;
import org.apache.accumulo.core.Constants;
Expand All @@ -55,7 +56,9 @@
import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metrics.MetricsUtil;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
Expand Down Expand Up @@ -90,6 +93,7 @@

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.Sets;

public class CompactionCoordinator extends AbstractServer
implements CompactionCoordinatorService.Iface, LiveTServerSet.Listener {
Expand All @@ -100,8 +104,14 @@ public class CompactionCoordinator extends AbstractServer

protected static final QueueSummaries QUEUE_SUMMARIES = new QueueSummaries();

/* Map of compactionId to RunningCompactions */
protected static final Map<ExternalCompactionId,RunningCompaction> RUNNING =
/*
* Map of compactionId to RunningCompactions. This is an informational cache of what external
* compactions may be running. Its possible it may contain external compactions that are not
* actually running. It may not contain compactions that are actually running. The metadata table
* is the most authoritative source of what external compactions are currently running, but it
* does not have the stats that this map has.
*/
protected static final Map<ExternalCompactionId,RunningCompaction> RUNNING_CACHE =
new ConcurrentHashMap<>();

private static final Cache<ExternalCompactionId,RunningCompaction> COMPLETED =
Expand Down Expand Up @@ -137,6 +147,7 @@ protected CompactionCoordinator(ServerOpts opts, String[] args, AccumuloConfigur
startGCLogger(schedExecutor);
printStartupMsg();
startCompactionCleaner(schedExecutor);
startRunningCleaner(schedExecutor);
}

@Override
Expand Down Expand Up @@ -170,6 +181,12 @@ protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor)
ThreadPools.watchNonCriticalScheduledTask(future);
}

protected void startRunningCleaner(ScheduledThreadPoolExecutor schedExecutor) {
ScheduledFuture<?> future =
schedExecutor.scheduleWithFixedDelay(this::cleanUpRunning, 0, 5, TimeUnit.MINUTES);
ThreadPools.watchNonCriticalScheduledTask(future);
}

protected void printStartupMsg() {
LOG.info("Version " + Constants.VERSION);
LOG.info("Instance " + getContext().getInstanceID());
Expand Down Expand Up @@ -277,7 +294,7 @@ public void run() {
update.setState(TCompactionState.IN_PROGRESS);
update.setMessage("Coordinator restarted, compaction found in progress");
rc.addUpdate(System.currentTimeMillis(), update);
RUNNING.put(ExternalCompactionId.of(rc.getJob().getExternalCompactionId()), rc);
RUNNING_CACHE.put(ExternalCompactionId.of(rc.getJob().getExternalCompactionId()), rc);
});
}

Expand Down Expand Up @@ -446,7 +463,10 @@ public TExternalCompactionJob getCompactionJob(TInfo tinfo, TCredentials credent
prioTserver = QUEUE_SUMMARIES.getNextTserver(queue);
continue;
}
RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()),
// It is possible that by the time this added that the tablet has already canceled the
// compaction or the compactor that made this request is dead. In these cases the compaction
// is not actually running.
RUNNING_CACHE.put(ExternalCompactionId.of(job.getExternalCompactionId()),
new RunningCompaction(job, compactorAddress, queue));
LOG.debug("Returning external job {} to {}", job.externalCompactionId, compactorAddress);
result = job;
Expand Down Expand Up @@ -523,15 +543,7 @@ public void compactionCompleted(TInfo tinfo, TCredentials credentials,
// It's possible that RUNNING might not have an entry for this ecid in the case
// of a coordinator restart when the Coordinator can't find the TServer for the
// corresponding external compaction.
final RunningCompaction rc = RUNNING.get(ecid);
if (null != rc) {
RUNNING.remove(ecid, rc);
COMPLETED.put(ecid, rc);
} else {
LOG.warn(
"Compaction completed called by Compactor for {}, but no running compaction for that id.",
externalCompactionId);
}
recordCompletion(ecid);
}

@Override
Expand All @@ -549,17 +561,7 @@ public void compactionFailed(TInfo tinfo, TCredentials credentials, String exter

void compactionFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
compactionFinalizer.failCompactions(compactions);
compactions.forEach((k, v) -> {
final RunningCompaction rc = RUNNING.get(k);
if (null != rc) {
RUNNING.remove(k, rc);
COMPLETED.put(k, rc);
} else {
LOG.warn(
"Compaction failed called by Compactor for {}, but no running compaction for that id.",
k);
}
});
compactions.forEach((k, v) -> recordCompletion(k));
}

/**
Expand Down Expand Up @@ -589,12 +591,49 @@ public void updateCompactionStatus(TInfo tinfo, TCredentials credentials,
}
LOG.debug("Compaction status update, id: {}, timestamp: {}, update: {}", externalCompactionId,
timestamp, update);
final RunningCompaction rc = RUNNING.get(ExternalCompactionId.of(externalCompactionId));
final RunningCompaction rc = RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId));
if (null != rc) {
rc.addUpdate(timestamp, update);
}
}

private void recordCompletion(ExternalCompactionId ecid) {
var rc = RUNNING_CACHE.remove(ecid);
if (rc != null) {
COMPLETED.put(ecid, rc);
}
}

protected Set<ExternalCompactionId> readExternalCompactionIds() {
return getContext().getAmple().readTablets().forLevel(Ample.DataLevel.USER)
.fetch(TabletMetadata.ColumnType.ECOMP).build().stream()
.flatMap(tm -> tm.getExternalCompactions().keySet().stream()).collect(Collectors.toSet());
}

/**
* The RUNNING_CACHE set may contain external compactions that are not actually running. This
* method periodically cleans those up.
*/
protected void cleanUpRunning() {

// grab a snapshot of the ids in the set before reading the metadata table. This is done to
// avoid removing things that are added while reading the metadata.
Set<ExternalCompactionId> idsSnapshot = Set.copyOf(RUNNING_CACHE.keySet());

// grab the ids that are listed as running in the metadata table. It important that this is done
// after getting the snapshot.
Set<ExternalCompactionId> idsInMetadata = readExternalCompactionIds();

var idsToRemove = Sets.difference(idsSnapshot, idsInMetadata);

// remove ids that are in the running set but not in the metadata table
idsToRemove.forEach(ecid -> recordCompletion(ecid));

if (idsToRemove.size() > 0) {
LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove);
}
}

/**
* Return information about running compactions
*
Expand All @@ -614,8 +653,9 @@ public TExternalCompactionList getRunningCompactions(TInfo tinfo, TCredentials c
throw new AccumuloSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED).asThriftException();
}

final TExternalCompactionList result = new TExternalCompactionList();
RUNNING.forEach((ecid, rc) -> {
RUNNING_CACHE.forEach((ecid, rc) -> {
TExternalCompaction trc = new TExternalCompaction();
trc.setQueueName(rc.getQueueName());
trc.setCompactor(rc.getCompactorAddress());
Expand Down Expand Up @@ -660,7 +700,7 @@ public TExternalCompactionList getCompletedCompactions(TInfo tinfo, TCredentials
@Override
public void cancel(TInfo tinfo, TCredentials credentials, String externalCompactionId)
throws TException {
var runningCompaction = RUNNING.get(ExternalCompactionId.of(externalCompactionId));
var runningCompaction = RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId));
var extent = KeyExtent.fromThrift(runningCompaction.getJob().getExtent());
try {
NamespaceId nsId = getContext().getNamespaceId(extent.tableId());
Expand Down
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;

import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
import org.apache.accumulo.core.metadata.TServerInstance;
Expand Down Expand Up @@ -87,6 +88,8 @@ public class TestCoordinator extends CompactionCoordinator {
private final ServerAddress client;
private final TabletClientService.Client tabletServerClient;

private Set<ExternalCompactionId> metadataCompactionIds = null;

protected TestCoordinator(CompactionFinalizer finalizer, LiveTServerSet tservers,
ServerAddress client, TabletClientService.Client tabletServerClient, ServerContext context,
AuditedSecurityOperation security) {
Expand Down Expand Up @@ -158,6 +161,18 @@ public void compactionCompleted(TInfo tinfo, TCredentials credentials,
public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId,
TKeyExtent extent) throws ThriftSecurityException {}

void setMetadataCompactionIds(Set<ExternalCompactionId> mci) {
metadataCompactionIds = mci;
}

protected Set<ExternalCompactionId> readExternalCompactionIds() {
if (metadataCompactionIds == null) {
return RUNNING_CACHE.keySet();
} else {
return metadataCompactionIds;
}
}

public Map<String,TreeMap<Short,TreeSet<TServerInstance>>> getQueues() {
return CompactionCoordinator.QUEUE_SUMMARIES.QUEUES;
}
Expand All @@ -167,13 +182,14 @@ public Map<TServerInstance,Set<QueueAndPriority>> getIndex() {
}

public Map<ExternalCompactionId,RunningCompaction> getRunning() {
return RUNNING;
return RUNNING_CACHE;
}

public void resetInternals() {
getQueues().clear();
getIndex().clear();
getRunning().clear();
metadataCompactionIds = null;
}

}
Expand Down Expand Up @@ -586,4 +602,50 @@ public void testGetCompactionJobNoJobs() throws Exception {
coordinator.close();
}

@Test
public void testCleanUpRunning() throws Exception {
PowerMock.resetAll();
PowerMock.suppress(PowerMock.constructor(AbstractServer.class));

ServerContext context = PowerMock.createNiceMock(ServerContext.class);
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();

TCredentials creds = PowerMock.createNiceMock(TCredentials.class);

CompactionFinalizer finalizer = PowerMock.createNiceMock(CompactionFinalizer.class);
LiveTServerSet tservers = PowerMock.createNiceMock(LiveTServerSet.class);

ServerAddress client = PowerMock.createNiceMock(ServerAddress.class);
HostAndPort address = HostAndPort.fromString("localhost:10240");
expect(client.getAddress()).andReturn(address).anyTimes();

TabletClientService.Client tsc = PowerMock.createNiceMock(TabletClientService.Client.class);

AuditedSecurityOperation security = PowerMock.createNiceMock(AuditedSecurityOperation.class);
expect(security.canPerformSystemActions(creds)).andReturn(true);

PowerMock.replayAll();

var coordinator = new TestCoordinator(finalizer, tservers, client, tsc, context, security);
coordinator.resetInternals();

var ecid1 = ExternalCompactionId.generate(UUID.randomUUID());
var ecid2 = ExternalCompactionId.generate(UUID.randomUUID());
var ecid3 = ExternalCompactionId.generate(UUID.randomUUID());

coordinator.getRunning().put(ecid1, new RunningCompaction(new TExternalCompaction()));
coordinator.getRunning().put(ecid2, new RunningCompaction(new TExternalCompaction()));
coordinator.getRunning().put(ecid3, new RunningCompaction(new TExternalCompaction()));

coordinator.cleanUpRunning();

assertEquals(Set.of(ecid1, ecid2, ecid3), coordinator.getRunning().keySet());

coordinator.setMetadataCompactionIds(Set.of(ecid1, ecid2));

coordinator.cleanUpRunning();

assertEquals(Set.of(ecid1, ecid2), coordinator.getRunning().keySet());

}
}

0 comments on commit 5e4e30b

Please sign in to comment.