Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.accumulo.monitor.next.InformationFetcher.InstanceSummary;
import org.apache.accumulo.monitor.next.SystemInformation.CompactionGroupSummary;
import org.apache.accumulo.monitor.next.SystemInformation.CompactionTableSummary;
import org.apache.accumulo.monitor.next.SystemInformation.FateTransaction;
import org.apache.accumulo.monitor.next.SystemInformation.MessageCategory;
import org.apache.accumulo.monitor.next.SystemInformation.MessagePriority;
import org.apache.accumulo.monitor.next.SystemInformation.RecoveryInformation;
Expand Down Expand Up @@ -387,6 +388,14 @@ public CompactorsSummary getExternalCompactors() {
return new CompactorsSummary(summary.getCompactorServers(), summary.getTimestamp());
}

@GET
@Path("fate")
@Produces(MediaType.APPLICATION_JSON)
@Description("Returns a list of fate transaction details")
public List<FateTransaction> getFateTransactions() {
return monitor.getInformationFetcher().getSummaryForEndpoint().getFateTransactions();
}

@GET
@Path("tables")
@Produces(MediaType.APPLICATION_JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CancellationException;
Expand All @@ -53,6 +54,12 @@
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.RowRange;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.AdminUtil;
import org.apache.accumulo.core.fate.AdminUtil.FateStatus;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.ReadOnlyFateStore;
import org.apache.accumulo.core.fate.user.UserFateStore;
import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ResourceGroupPredicate;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
Expand All @@ -74,6 +81,8 @@
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.compaction.CompactionPluginUtils;
import org.apache.accumulo.server.util.adminCommand.Fate;
import org.apache.zookeeper.KeeperException;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.eclipse.jetty.util.NanoTime;
import org.slf4j.Logger;
Expand Down Expand Up @@ -173,7 +182,7 @@ void add(UpdateTaskFuture f) {
}

enum UpdateType {
COMPACTION, COMPACTION_RGS, METRIC, TABLE;
COMPACTION, COMPACTION_RGS, FATE, METRIC, TABLE;
}

interface UpdateTask<T extends Object> extends Runnable, Comparable<UpdateTask<T>> {
Expand Down Expand Up @@ -412,6 +421,71 @@ public void run() {
}
}

class FateTransactionFetcher implements UpdateTask<Void> {

private final SystemInformation summary;

public FateTransactionFetcher(SystemInformation summary) {
this.summary = summary;
}

@Override
public void run() {
try {
AdminUtil<Fate> admin = new AdminUtil<>();
var zTableLocksPath = ctx.getServerPaths().createTableLocksPath();
var zk = ctx.getZooSession();
FateStatus status = admin.getStatus(stores, zk, zTableLocksPath, null, null, null);
summary.processFateTransactions(status.getTransactions());
} catch (KeeperException | InterruptedException e) {
throw new IllegalStateException(e);
}
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + Objects.hash(getType());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
FateTransactionFetcher other = (FateTransactionFetcher) obj;
return Objects.equals(getType(), other.getType());
}

@Override
public int compareTo(UpdateTask<Void> other) {
return this.getType().compareTo(other.getType());
}

@Override
public UpdateType getType() {
return UpdateType.FATE;
}

@Override
public Void getResource() {
return null;
}

@Override
public String getFailureMessage() {
return "Error fetching fate transaction details";
}
}

class ConfiguredCompactionResourceGroupFetcher implements UpdateTask<Void> {

private final SystemInformation summary;
Expand Down Expand Up @@ -486,6 +560,9 @@ public String getFailureMessage() {
private final Cache<ServerId,MetricResponse> allMetrics;
private final Cache<ServerId,Boolean> retainedProblemServers;
private final AtomicReference<SystemInformation> summaryRef = new AtomicReference<>();
private final ReadOnlyFateStore<Fate> readOnlyMFS;
private final ReadOnlyFateStore<Fate> readOnlyUFS;
private final Map<FateInstanceType,ReadOnlyFateStore<Fate>> stores;
private final TabletMetadataFilter noLocation = new NoCurrentLocationFilter();

public InformationFetcher(ServerContext ctx, Supplier<Long> connectionCount) {
Expand All @@ -495,6 +572,13 @@ public InformationFetcher(ServerContext ctx, Supplier<Long> connectionCount) {
.expireAfterWrite(Duration.ofMinutes(10)).evictionListener(this::onRemoval).build();
this.retainedProblemServers = Caffeine.newBuilder().executor(pool)
.scheduler(Scheduler.systemScheduler()).expireAfterWrite(Duration.ofMinutes(10)).build();
try {
this.readOnlyMFS = new MetaFateStore<>(ctx.getZooSession(), null, null);
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException("Exception creating MetaFateStore", e);
}
this.readOnlyUFS = new UserFateStore<>(ctx, SystemTables.FATE.tableName(), null, null);
this.stores = Map.of(FateInstanceType.META, readOnlyMFS, FateInstanceType.USER, readOnlyUFS);
}

public void newConnectionEvent() {
Expand Down Expand Up @@ -659,9 +743,16 @@ public void run() {

final UpdateTasks futures = new UpdateTasks();
final SystemInformation summary = new SystemInformation(allMetrics, this.ctx);

// Fetch set of registered compactors
Set<ServerId> compactors = this.ctx.instanceOperations().getServers(Type.COMPACTOR);
summary.processExternalCompactionInventory(compactors);

// Fetch Fate transaction information
FateTransactionFetcher fateFetcher = new FateTransactionFetcher(summary);
Future<?> fff = this.pool.submit(fateFetcher);
futures.add(new UpdateTaskFuture(fff, fateFetcher));

// Fetch metrics from the other server processes. This
// makes an RPC call to AbstractServer.getMetrics
for (ServerId.Type type : ServerId.Type.values()) {
Expand All @@ -674,7 +765,6 @@ public void run() {
futures.add(new UpdateTaskFuture(mff, mf));
}
}
ThreadPools.resizePool(pool, () -> Math.max(20, (futures.size() / 20)), poolName);

// Fetch external compaction information from the Compactors
RunningCompactionFetcher rcf = new RunningCompactionFetcher(summary, pool);
Expand All @@ -692,6 +782,8 @@ public void run() {
Future<?> f = this.pool.submit(r);
futures.add(new UpdateTaskFuture(f, r));

ThreadPools.resizePool(pool, () -> Math.max(20, (futures.size() / 20)), poolName);

final long monitorFetchTimeout =
ctx.getConfiguration().getTimeInMillis(Property.MONITOR_FETCH_TIMEOUT);
final long allFuturesAdded = NanoTime.now();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
import org.apache.accumulo.core.fate.AdminUtil.TransactionStatus;
import org.apache.accumulo.core.fate.Fate.FateOperation;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ResourceGroupPredicate;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
Expand Down Expand Up @@ -457,6 +461,14 @@ public List<TabletNeedingRecovery> getTabletsNeedingRecovery() {
}
}

public enum LockRangeType {
FULL, PARTIAL;
}

public record FateTransaction(FateInstanceType type, FateOperation op, String id, TStatus status,
long created, List<String> heldLocks, List<String> waitingLocks, LockRangeType lockRange) {
}

private static final Logger LOG = LoggerFactory.getLogger(SystemInformation.class);

private final DistributionStatisticConfig DSC =
Expand Down Expand Up @@ -526,6 +538,8 @@ public List<TabletNeedingRecovery> getTabletsNeedingRecovery() {

private final Set<String> configuredCompactionResourceGroups = ConcurrentHashMap.newKeySet();

private final List<FateTransaction> fateTransactions = new ArrayList<>();

private final AtomicLong timestamp = new AtomicLong(0);
private final EnumMap<ServerId.Type,Status> componentStatuses =
new EnumMap<>(ServerId.Type.class);
Expand Down Expand Up @@ -575,6 +589,7 @@ public void clear() {
componentStatuses.clear();
managerGoalState = null;
serverMetricsView.clear();
fateTransactions.clear();
messageCounts.clear();
}

Expand Down Expand Up @@ -909,6 +924,15 @@ public void processTabletInformation(TableId tableId, String tableName, TabletIn
}
}

public void processFateTransactions(List<TransactionStatus> transactions) {
transactions.forEach(t -> {
fateTransactions
.add(new FateTransaction(t.getInstanceType(), t.getFateOp(), t.getFateId().getTxUUIDStr(),
t.getStatus(), t.getTimeCreated(), t.getHeldLocks(), t.getWaitingLocks(),
t.getLockRange().isInfinite() ? LockRangeType.FULL : LockRangeType.PARTIAL));
});
}

public void processError(ServerId server) {
problemHosts.add(server);
}
Expand Down Expand Up @@ -1375,6 +1399,10 @@ public Map<MessagePriority,Map<MessageCategory,Set<String>>> getMessages() {
return this.messages;
}

public List<FateTransaction> getFateTransactions() {
return this.fateTransactions;
}

public Map<MessagePriority,AtomicLong> getMessageCounts() {
return this.messageCounts;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,24 @@ public Map<String,Object> getBulkImports() {
return model;
}

/**
* Returns the Fate template
*
* @return Fate model
*/
@GET
@Path("fate")
@Template(name = "/default.ftl")
public Map<String,Object> getFate() {

Map<String,Object> model = getModel();
model.put("title", "Fate Transaction Details");
model.put("template", "fate.ftl");
model.put("js", "fate.js");

return model;
}

/**
* Returns the garbage collector template
*
Expand Down
Loading