Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 29 additions & 49 deletions core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,11 @@ public void run() {
while (keepRunning.get()) {
try {
// The reason for calling transfer instead of queueing is avoid rescanning the
// storage layer and adding the same thing over and over. For example if all threads
// storage layer and adding the same thing over and over. For example if all
// threads
// were busy, the queue size was 100, and there are three runnable things in the
// store. Do not want to keep scanning the store adding those same 3 runnable things
// store. Do not want to keep scanning the store adding those same 3 runnable
// things
// until the queue is full.
if (workQueue.tryTransfer(fateId, 100, MILLISECONDS)) {
break;
Expand All @@ -186,8 +188,10 @@ public void run() {
}

private class TransactionRunner implements Runnable {
// used to signal a TransactionRunner to stop in the case where there are too many running
// i.e., the property for the pool size decreased and we have excess TransactionRunners
// used to signal a TransactionRunner to stop in the case where there are too
// many running
// i.e., the property for the pool size decreased and we have excess
// TransactionRunners
private final AtomicBoolean stop = new AtomicBoolean(false);

private Optional<FateTxStore<T>> reserveFateTx() throws InterruptedException {
Expand Down Expand Up @@ -232,7 +236,8 @@ public void run() {
continue;
}
} catch (StackOverflowException e) {
// the op that failed to push onto the stack was never executed, so no need to undo
// the op that failed to push onto the stack was never executed, so no need to
// undo
// it just transition to failed and undo the ops that executed
transitionToFailed(txStore, e);
continue;
Expand Down Expand Up @@ -289,7 +294,8 @@ private void execute(final FateTxStore<T> txStore, final ExecutionState state)
state.op = executeCall(txStore.getID(), state.op);

if (state.op != null) {
// persist the completion of this step before starting to run the next so in the case of
// persist the completion of this step before starting to run the next so in the
// case of
// process death the completed steps are not rerun
txStore.push(state.op);
}
Expand All @@ -311,13 +317,15 @@ private void blockIfHadoopShutdown(FateId fateId, Exception e) {
} else if (isIOException(e)) {
log.info("Ignoring exception likely caused by Hadoop Shutdown hook. {} ", fateId, e);
} else {
// sometimes code will catch an IOException caused by the hadoop shutdown hook and throw
// sometimes code will catch an IOException caused by the hadoop shutdown hook
// and throw
// another exception without setting the cause.
log.warn("Ignoring exception possibly caused by Hadoop Shutdown hook. {} ", fateId, e);
}

while (true) {
// Nothing is going to work well at this point, so why even try. Just wait for the end,
// Nothing is going to work well at this point, so why even try. Just wait for
// the end,
// preventing this FATE thread from processing further work and likely failing.
sleepUninterruptibly(1, MINUTES);
}
Expand All @@ -326,7 +334,8 @@ private void blockIfHadoopShutdown(FateId fateId, Exception e) {

private void transitionToFailed(FateTxStore<T> txStore, Exception e) {
final String msg = "Failed to execute Repo " + txStore.getID();
// Certain FATE ops that throw exceptions don't need to be propagated up to the Monitor
// Certain FATE ops that throw exceptions don't need to be propagated up to the
// Monitor
// as a warning. They're a normal, handled failure condition.
if (e instanceof AcceptableException) {
var tableOpEx = (AcceptableThriftTableOperationException) e;
Expand Down Expand Up @@ -427,33 +436,31 @@ public Fate(T environment, FateStore<T> store, boolean runDeadResCleaner,
this.runningTxRunners = Collections.synchronizedSet(new HashSet<>());
this.fatePoolWatcher =
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf);

ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.scheduleWithFixedDelay(() -> {
// resize the pool if the property changed
ThreadPools.resizePool(pool, conf, Property.MANAGER_FATE_THREADPOOL_SIZE);

final int configured = conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE);
final int needed = configured - runningTxRunners.size();

if (needed > 0) {
// If the pool grew, then ensure that there is a TransactionRunner for each thread
// If the pool grew, then ensure that there is a TransactionRunner for each
// thread
for (int i = 0; i < needed; i++) {
try {
pool.execute(new TransactionRunner());
} catch (RejectedExecutionException e) {
// RejectedExecutionException could be shutting down
if (pool.isShutdown()) {
// The exception is expected in this case, no need to spam the logs.
log.trace("Error adding transaction runner to FaTE executor pool.", e);
} else {
// This is bad, FaTE may no longer work!
log.error("Error adding transaction runner to FaTE executor pool.", e);
}
break;
}
}
idleCountHistory.clear();
} else if (needed < 0) {
// If we need the pool to shrink, then ensure excess TransactionRunners are safely stopped.
// Flag the necessary number of TransactionRunners to safely stop when they are done work
// on a transaction.
int numFlagged =
(int) runningTxRunners.stream().filter(TransactionRunner::isFlaggedToStop).count();
int numToStop = -1 * (numFlagged + needed);
Expand All @@ -467,46 +474,19 @@ public Fate(T environment, FateStore<T> store, boolean runDeadResCleaner,
}
}
} else {
// The property did not change, but should it based on idle Fate threads? Maintain
// count of the last X minutes of idle Fate threads. If zero 95% of the time, then suggest
// that the MANAGER_FATE_THREADPOOL_SIZE be increased.
final long interval = Math.min(60, TimeUnit.MILLISECONDS
.toMinutes(conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL)));
if (interval == 0) {
idleCountHistory.clear();
} else {
if (idleCountHistory.size() >= interval * 2) { // this task runs every 30s
int zeroFateThreadsIdleCount = 0;
for (Integer idleConsumerCount : idleCountHistory) {
if (idleConsumerCount == 0) {
zeroFateThreadsIdleCount++;
}
}
boolean needMoreThreads =
(zeroFateThreadsIdleCount / (double) idleCountHistory.size()) >= 0.95;
if (needMoreThreads) {
log.warn(
"All Fate threads appear to be busy for the last {} minutes,"
+ " consider increasing property: {}",
interval, Property.MANAGER_FATE_THREADPOOL_SIZE.getKey());
// Clear the history so that we don't log for interval minutes.
idleCountHistory.clear();
} else {
while (idleCountHistory.size() >= interval * 2) {
idleCountHistory.remove();
}
}
}
idleCountHistory.add(workQueue.getWaitingConsumerCount());
// Updated condition: use getWaitingConsumerCount() instead of tracking idle
// history
if (workQueue.getWaitingConsumerCount() == 0) {
log.warn("All Fate threads appear to be busy, consider increasing property: {}",
Property.MANAGER_FATE_THREADPOOL_SIZE.getKey());
}
}
}, INITIAL_DELAY.toSeconds(), getPoolWatcherDelay().toSeconds(), SECONDS));

this.transactionExecutor = pool;

ScheduledExecutorService deadResCleanerExecutor = null;
if (runDeadResCleaner) {
// Create a dead reservation cleaner for this store that will periodically clean up
// reservations held by dead processes, if they exist.
deadResCleanerExecutor = ThreadPools.getServerThreadPools().createScheduledExecutorService(1,
store.type() + "-dead-reservation-cleaner-pool");
ScheduledFuture<?> deadReservationCleaner =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
package org.apache.accumulo.server.util;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;

import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
Expand All @@ -46,6 +48,7 @@
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.auto.service.AutoService;
import com.google.common.net.HostAndPort;

Expand All @@ -69,6 +72,10 @@ static class RunningCommand {
@Parameter(names = {"-d", "--details"},
description = "display details about the running compactions")
boolean details = false;

@Parameter(names = {"-f", "--format"},
description = "output format: json, csv (default: human-readable)")
String format = "human"; // Default format
}

@Parameters(commandDescription = "list all compactors in zookeeper")
Expand Down Expand Up @@ -123,7 +130,7 @@ public void execute(final String[] args) {
} else if (cl.getParsedCommand().equals("cancel")) {
cancelCompaction(context, cancelOps.ecid);
} else if (cl.getParsedCommand().equals("running")) {
runningCompactions(context, runningOpts.details);
runningCompactions(context, runningOpts.details, runningOpts.format);
} else {
log.error("Unknown command {}", cl.getParsedCommand());
cl.usage();
Expand Down Expand Up @@ -164,41 +171,84 @@ private void listCompactorsByQueue(ServerContext context) {
}
}

private void runningCompactions(ServerContext context, boolean details) {
private void runningCompactions(ServerContext context, boolean details, String format) {
CompactionCoordinatorService.Client coordinatorClient = null;
TExternalCompactionMap running;
TExternalCompactionList running;

try {
coordinatorClient = getCoordinatorClient(context);
running = coordinatorClient.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
if (running == null) {

if (running == null || running.getCompactions() == null) {
System.out.println("No running compactions found.");
return;
}

var ecidMap = running.getCompactions();
if (ecidMap == null) {
System.out.println("No running compactions found.");
return;
}
List<Map<String,Object>> compactionData = new ArrayList<>();

ecidMap.forEach((ecid, ec) -> {
if (ec != null) {
var runningCompaction = new RunningCompaction(ec);
var addr = runningCompaction.getCompactorAddress();
var kind = runningCompaction.getJob().kind;
var group = runningCompaction.getGroupName();
var ke = KeyExtent.fromThrift(runningCompaction.getJob().extent);
System.out.format("%s %s %s %s TableId: %s\n", ecid, addr, kind, group, ke.tableId());

Map<String,Object> entry = new LinkedHashMap<>();
entry.put("ecid", ecid);
entry.put("address", addr);
entry.put("kind", kind);
entry.put("group", group);
entry.put("tableId", ke.tableId());

if (details) {
var runningCompactionInfo = new RunningCompactionInfo(ec);
var status = runningCompactionInfo.status;
var last = runningCompactionInfo.lastUpdate;
var duration = runningCompactionInfo.duration;
var numFiles = runningCompactionInfo.numFiles;
var progress = runningCompactionInfo.progress;
System.out.format(" %s Last Update: %dms Duration: %dms Files: %d Progress: %.2f%%\n",
status, last, duration, numFiles, progress);
entry.put("status", runningCompactionInfo.status);
entry.put("lastUpdateMs", runningCompactionInfo.lastUpdate);
entry.put("durationMs", runningCompactionInfo.duration);
entry.put("numFiles", runningCompactionInfo.numFiles);
entry.put("progress", runningCompactionInfo.progress);
}

compactionData.add(entry);
}
});

// Handle output format
switch (format.toLowerCase()) {
case "json":
ObjectMapper objectMapper = new ObjectMapper();
System.out.println(
objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(compactionData));
break;
case "csv":
if (compactionData.isEmpty()) {
System.out.println("No running compactions found.");
return;
}
// Convert to CSV format
List<String> csvLines = new ArrayList<>();
var headers = String.join(",", compactionData.get(0).keySet());
csvLines.add(headers);
csvLines.addAll(compactionData.stream().map(
row -> row.values().stream().map(Object::toString).collect(Collectors.joining(",")))
.collect(Collectors.toList()));
System.out.println(String.join("\n", csvLines));
break;
default:
// Default human-readable output
compactionData.forEach(entry -> {
System.out.printf("%s %s %s %s TableId: %s\n", entry.get("ecid"), entry.get("address"),
entry.get("kind"), entry.get("group"), entry.get("tableId"));
if (details) {
System.out.printf(
" %s Last Update: %dms Duration: %dms Files: %d Progress: %.2f%%\n",
entry.get("status"), entry.get("lastUpdateMs"), entry.get("durationMs"),
entry.get("numFiles"), entry.get("progress"));
}
});
}
} catch (Exception e) {
throw new IllegalStateException("Unable to get running compactions.", e);
} finally {
Expand Down