Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,15 @@
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.admin.TabletInformation;
import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.data.ResourceGroupId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.metrics.flatbuffers.FMetric;
import org.apache.accumulo.core.process.thrift.MetricResponse;
import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
import org.apache.accumulo.monitor.Monitor;
import org.apache.accumulo.monitor.next.InformationFetcher.InstanceSummary;
import org.apache.accumulo.monitor.next.SystemInformation.ProcessSummary;
import org.apache.accumulo.monitor.next.SystemInformation.TableSummary;
import org.apache.accumulo.monitor.next.SystemInformation.TimeOrderedRunningCompactionSet;
import org.apache.accumulo.monitor.next.deployment.DeploymentOverview;
import org.apache.accumulo.monitor.next.ec.CompactorsSummary;
import org.apache.accumulo.monitor.next.ec.CoordinatorSummary;
import org.apache.accumulo.monitor.next.sservers.ScanServerView;
Expand Down Expand Up @@ -408,10 +407,10 @@ public List<TabletInformation> getTablets(@PathParam(TABLEID_PARAM_KEY) String t
@GET
@Path("deployment")
@Produces(MediaType.APPLICATION_JSON)
@Description("Returns a map of resource group to server type to process summary."
+ " The process summary contains the number of configured, responding, and not responding servers")
public Map<ResourceGroupId,Map<String,ProcessSummary>> getDeploymentOverview() {
return monitor.getInformationFetcher().getSummaryForEndpoint().getDeploymentOverview();
@Description("Returns a UI-ready deployment overview grouped by resource group. Each process row"
+ " contains the total, responding, and not responding server counts.")
public DeploymentOverview getDeploymentOverview() {
return monitor.getInformationFetcher().getSummaryForEndpoint().getDeploymentView();
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void run() {
}
} catch (Exception e) {
LOG.warn("Error trying to get metrics from server: {}", server, e);
summary.processError(server);
summary.processMetricsError(server);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.accumulo.core.process.thrift.MetricResponse;
import org.apache.accumulo.core.spi.balancer.TableLoadBalancer;
import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
import org.apache.accumulo.monitor.next.deployment.DeploymentOverview;
import org.apache.accumulo.monitor.next.sservers.ScanServerView;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.TableConfiguration;
Expand Down Expand Up @@ -272,36 +273,29 @@ public void addTablet(TabletInformation info) {
}

public static class ProcessSummary {
private final AtomicLong configured = new AtomicLong(0);
private final AtomicLong responded = new AtomicLong();
private final Set<String> notResponded = ConcurrentHashMap.newKeySet();
private final Set<ServerId> responded = ConcurrentHashMap.newKeySet();
private final Set<ServerId> notResponded = ConcurrentHashMap.newKeySet();

public void addResponded() {
configured.incrementAndGet();
responded.incrementAndGet();
public void addResponded(ServerId server) {
responded.add(server);
}

public void addNotResponded(ServerId server) {
configured.incrementAndGet();
notResponded.add(server.getHost() + ":" + server.getPort());
responded.remove(server);
notResponded.add(server);
}

public long getConfigured() {
return this.configured.get();
public long getTotal() {
return this.responded.size() + this.notResponded.size();
}

public long getResponded() {
return this.responded.get();
return this.responded.size();
}

public long getNotResponded() {
return this.notResponded.size();
}

public Set<String> getNotRespondedHosts() {
return this.notResponded;
}

}

// Object that serves as a TopN view of the RunningCompactions, ordered by
Expand Down Expand Up @@ -368,6 +362,7 @@ public Stream<RunningCompactionInfo> stream() {

private final Set<String> resourceGroups = ConcurrentHashMap.newKeySet();
private final Set<ServerId> problemHosts = ConcurrentHashMap.newKeySet();
private final Set<ServerId> metricProblemHosts = ConcurrentHashMap.newKeySet();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a new collection here. Now we collect servers who didnt respond to the metrics poll into this new metricProblemHosts and that number is what is used in the table for the "Not Responding" count. Made this separation since problemHosts could contain hosts that are still responding but just have other problems.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's possible that a Compactor doesn't respond to the call to get the Metrics, but does respond to the call to get the currently running compaction. Seems odd, but could happen.

private final AtomicReference<ServerId> manager = new AtomicReference<>();
private final AtomicReference<ServerId> gc = new AtomicReference<>();

Expand Down Expand Up @@ -410,15 +405,17 @@ public Stream<RunningCompactionInfo> stream() {
private final Map<TableId,List<TabletInformation>> tablets = new ConcurrentHashMap<>();

// Deployment Overview
private final Map<ResourceGroupId,Map<String,ProcessSummary>> deployment =
private final Map<ResourceGroupId,Map<ServerId.Type,ProcessSummary>> deployment =
new ConcurrentHashMap<>();

private final Set<String> suggestions = new ConcurrentSkipListSet<>();

private final Set<String> configuredCompactionResourceGroups = ConcurrentHashMap.newKeySet();

private long timestamp = 0;
private ScanServerView scanServerView;
private ScanServerView scanServerView = new ScanServerView(0L, List.of(),
new ScanServerView.Status(false, false, false, 0, 0, 0L, "OK", null));
private DeploymentOverview deploymentOverview = new DeploymentOverview(0, List.of());
private final int rgLongRunningCompactionSize;

public SystemInformation(Cache<ServerId,MetricResponse> allMetrics, ServerContext ctx) {
Expand All @@ -431,6 +428,7 @@ public SystemInformation(Cache<ServerId,MetricResponse> allMetrics, ServerContex
public void clear() {
resourceGroups.clear();
problemHosts.clear();
metricProblemHosts.clear();
compactors.clear();
sservers.clear();
tservers.clear();
Expand All @@ -451,7 +449,6 @@ public void clear() {
runningCompactionsPerGroup.clear();
runningCompactionsPerTable.clear();
configuredCompactionResourceGroups.clear();
scanServerView = null;
}

private void updateAggregates(final MetricResponse response,
Expand Down Expand Up @@ -509,10 +506,11 @@ private void createCompactionSummary(MetricResponse response) {

public void processResponse(final ServerId server, final MetricResponse response) {
problemHosts.remove(server);
metricProblemHosts.remove(server);
allMetrics.put(server, response);
resourceGroups.add(response.getResourceGroup());
deployment.computeIfAbsent(server.getResourceGroup(), g -> new ConcurrentHashMap<>())
.computeIfAbsent(server.getType().name(), t -> new ProcessSummary()).addResponded();
.computeIfAbsent(server.getType(), t -> new ProcessSummary()).addResponded(server);
switch (response.serverType) {
case COMPACTOR:
compactors
Expand Down Expand Up @@ -583,17 +581,21 @@ public void processError(ServerId server) {
problemHosts.add(server);
}

public void processMetricsError(ServerId server) {
problemHosts.add(server);
metricProblemHosts.add(server);
}

public void addConfiguredCompactionGroups(Set<String> groups) {
configuredCompactionResourceGroups.addAll(groups);
}

public void finish() {
// Update the deployment not-responded numbers based
// on the problem hosts.
problemHosts.forEach(serverId -> {
// on metric fetch failures for this refresh.
metricProblemHosts.forEach(serverId -> {
deployment.computeIfAbsent(serverId.getResourceGroup(), g -> new ConcurrentHashMap<>())
.computeIfAbsent(serverId.getType().name(), t -> new ProcessSummary())
.addNotResponded(serverId);
.computeIfAbsent(serverId.getType(), t -> new ProcessSummary()).addNotResponded(serverId);
});
for (SystemTables table : SystemTables.values()) {
TableConfiguration tconf = this.ctx.getTableConfiguration(table.tableId());
Expand Down Expand Up @@ -648,6 +650,7 @@ public void finish() {
.filter(serverId -> serverId.getType() == ServerId.Type.SCAN_SERVER).count();
var responses = allMetrics.getAllPresent(scanServers).values();
timestamp = System.currentTimeMillis();
deploymentOverview = DeploymentOverview.fromSummary(deployment, timestamp);
scanServerView = ScanServerView.fromMetrics(responses, scanServers.size(),
problemScanServerCount, timestamp);
}
Expand Down Expand Up @@ -731,10 +734,14 @@ public List<TabletInformation> getTablets(TableId tableId) {
return this.tablets.get(tableId);
}

public Map<ResourceGroupId,Map<String,ProcessSummary>> getDeploymentOverview() {
public Map<ResourceGroupId,Map<ServerId.Type,ProcessSummary>> getDeploymentSummary() {
return this.deployment;
}

public DeploymentOverview getDeploymentView() {
return this.deploymentOverview;
}

public Set<String> getSuggestions() {
return this.suggestions;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.monitor.next.deployment;

import java.util.Comparator;
import java.util.List;
import java.util.Map;

import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.data.ResourceGroupId;
import org.apache.accumulo.monitor.next.SystemInformation.ProcessSummary;

/**
* Data Transfer Object for the Monitor Overview page deployment tables. It packages the total,
* responding, and not responding server counts for each process in each resource group into a
* UI-ready JSON response.
*/
public record DeploymentOverview(long lastUpdate, List<ResourceGroupDeployment> groups) {

/**
* Data Transfer Object for a resource group and its associated server counts
*/
public record ResourceGroupDeployment(String resourceGroup,
List<ServerTypeDeployment> processes) {
}

/**
* Data Transfer Object for a server type and its associated counts
*/
public record ServerTypeDeployment(String serverType, long total, long responding,
long notResponding) {
}

public static DeploymentOverview fromSummary(
Map<ResourceGroupId,Map<ServerId.Type,ProcessSummary>> deployment, long lastUpdate) {
if (deployment == null || deployment.isEmpty()) {
return new DeploymentOverview(lastUpdate, List.of());
}

var groups = deployment.entrySet().stream().sorted(Map.Entry.comparingByKey()).map(entry -> {
String resourceGroup = entry.getKey().canonical();
List<ServerTypeDeployment> processes = buildProcesses(entry.getValue());
return new ResourceGroupDeployment(resourceGroup, processes);
}).toList();

return new DeploymentOverview(lastUpdate, groups);
}

private static List<ServerTypeDeployment>
buildProcesses(Map<ServerId.Type,ProcessSummary> processes) {
if (processes == null || processes.isEmpty()) {
return List.of();
}

return processes.entrySet().stream()
.sorted(Map.Entry.comparingByKey(Comparator.comparingInt(Enum::ordinal))).map(entry -> {
ServerId.Type key = entry.getKey();
ProcessSummary value = entry.getValue();
return new ServerTypeDeployment(processLabel(key), value.getTotal(), value.getResponded(),
value.getNotResponded());
}).toList();
}

private static String processLabel(ServerId.Type process) {
return switch (process) {
case COMPACTOR -> "Compactor";
case GARBAGE_COLLECTOR -> "Garbage Collector";
case MANAGER -> "Manager";
case MONITOR -> "Monitor";
case SCAN_SERVER -> "Scan Server";
case TABLET_SERVER -> "Tablet Server";
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ function refreshOverview() {
refreshManagerTable();
}
});

refreshDeploymentTables();
}

/**
Expand All @@ -63,3 +65,61 @@ function refreshManagerTable() {
table.eq(3).html('<a href="' + contextPath + 'rest-v2/manager/metrics">Metrics</a>');
});
}

/**
* Refreshes the deployment overview tables
*/
function refreshDeploymentTables() {
getDeployment().then(function () {
var data = JSON.parse(sessionStorage.deployment);
var groups = Array.isArray(data.groups) ? data.groups : [];
if (groups.length === 0) {
$('#deploymentTables').html('<div class="alert alert-warning" role="alert">' +
'No deployment data is currently available.</div>');
return;
}

var tables = groups.map(renderDeploymentTable).join('');

$('#deploymentTables').html(tables);
});
}

/**
* Renders the deployment table for one resource group
*
* @param {object} group deployment data for one resource group
* @returns {string} html for the resource group deployment table
*/
function renderDeploymentTable(group) {
var processes = Array.isArray(group.processes) ? group.processes : [];
var rows = processes.map(function (process) {
return '<tr>' +
'<td>' + process.serverType + '</td>' +
'<td>' + process.total + '</td>' +
'<td>' + process.responding + '</td>' +
'<td>' + process.notResponding + '</td>' +
'</tr>';
}).join('');

return '<div class="mb-4">' +
'<table class="table table-bordered table-striped table-condensed" style="table-layout: fixed; width: 100%;">' +
'<colgroup>' +
'<col style="width: 25%;">' +
'<col style="width: 25%;">' +
'<col style="width: 25%;">' +
'<col style="width: 25%;">' +
'</colgroup>' +
'<thead>' +
'<tr><th colspan="4" class="center">' + group.resourceGroup + '</th></tr>' +
'<tr>' +
'<th>Process</th>' +
'<th>Total</th>' +
'<th>Responding</th>' +
'<th>Not Responding</th>' +
'</tr>' +
'</thead>' +
'<tbody>' + rows + '</tbody>' +
'</table>' +
'</div>';
}
Loading