Skip to content

Commit

Permalink
Introduced 'Pending' state for backend's health
Browse files Browse the repository at this point in the history
This commit adds a 'Pending' state to keep track of backend's health.

Resolves: trinodb#80
  • Loading branch information
Andy Su (Apps) committed Nov 13, 2023
1 parent e155380 commit 9df6315
Show file tree
Hide file tree
Showing 19 changed files with 171 additions and 60 deletions.
12 changes: 12 additions & 0 deletions gateway-ha/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,18 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.dropwizard</groupId>
<artifactId>dropwizard-testing</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public HaGatewayLauncher(String... basePackages) {
super(basePackages);
}

public HaGatewayLauncher(){}

@Override
public void initialize(Bootstrap<HaGatewayConfiguration> bootstrap) {
super.initialize(bootstrap);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.trino.gateway.ha.clustermonitor;

public enum BackendHealthState {
HEALTHY,
PENDING,
UNHEALTHY
}

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class ClusterStats {
private int queuedQueryCount;
private int blockedQueryCount;
private int numWorkerNodes;
private boolean healthy;
private BackendHealthState healthy = BackendHealthState.PENDING;
private String clusterId;
private String proxyTo;
private String externalUrl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,17 @@ public ClusterStats monitor(ProxyBackendConfiguration backend) {
HashMap<String, Object> result = null;
result = new ObjectMapper().readValue(response, HashMap.class);

clusterStats.setNumWorkerNodes((int) result.get("activeWorkers"));
clusterStats.setQueuedQueryCount((int) result.get("queuedQueries"));
clusterStats.setRunningQueryCount((int) result.get("runningQueries"));
clusterStats.setBlockedQueryCount((int) result.get("blockedQueries"));
clusterStats.setHealthy(clusterStats.getNumWorkerNodes() > 0);
clusterStats.setNumWorkerNodes((int) result.getOrDefault("activeWorkers", 0));
clusterStats.setQueuedQueryCount((int) result.getOrDefault("queuedQueries", 0));
clusterStats.setRunningQueryCount((int) result.getOrDefault("runningQueries", 0));
clusterStats.setBlockedQueryCount((int) result.getOrDefault("blockedQueries", 0));
clusterStats.setHealthy(clusterStats.getNumWorkerNodes() > 0 ? BackendHealthState.HEALTHY : BackendHealthState.UNHEALTHY);
clusterStats.setProxyTo(backend.getProxyTo());
clusterStats.setExternalUrl(backend.getExternalUrl());
clusterStats.setRoutingGroup(backend.getRoutingGroup());

} catch (Exception e) {
log.error("Error parsing cluster stats from [{}]", response, e);
log.error("Error parsing cluster stats from {}", response, e);
}

// Fetch User Level Stats.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ public ClusterStats monitor(ProxyBackendConfiguration backend) {
while (rs.next()) {
partialState.put(rs.getString("state"), rs.getInt("count"));
}
clusterStats.setHealthy(true);
clusterStats.setHealthy(BackendHealthState.HEALTHY);
clusterStats.setQueuedQueryCount(partialState.getOrDefault("QUEUED", 0));
clusterStats.setRunningQueryCount(partialState.getOrDefault("RUNNING", 0));
return clusterStats;
} catch (TimeoutException e) {
log.error("timed out fetching status for {} backend, {}", url, e);
log.error("timed out fetching status for {} backend, {}", jdbcUrl, e);
} catch (Exception e) {
log.error("could not fetch status for {} backend, {}", url, e);
log.error("could not fetch status for {} backend, {}", jdbcUrl, e);
}
return clusterStats;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ public HealthCheckObserver(RoutingManager routingManager) {
@Override
public void observe(java.util.List<ClusterStats> clustersStats) {
for (ClusterStats clusterStats : clustersStats) {
routingManager.upateBackEndHealth(clusterStats.getClusterId(), clusterStats.isHealthy());
routingManager.updateBackEndHealthDB(clusterStats);
routingManager.upateBackEndHealth(clusterStats.getClusterId(), clusterStats.getHealthy());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public HealthChecker(Notifier notifier) {
@Override
public void observe(List<ClusterStats> clustersStats) {
for (ClusterStats clusterStats : clustersStats) {
if (!clusterStats.isHealthy()) {
if (clusterStats.getHealthy() != BackendHealthState.HEALTHY) {
notifyUnhealthyCluster(clusterStats);
} else {
if (clusterStats.getQueuedQueryCount() > MAX_THRESHOLD_QUEUED_QUERY_COUNT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public void observe(List<ClusterStats> stats) {
= new HashMap<>();

for (ClusterStats stat : stats) {
if (!stat.isHealthy()) {
if (stat.getHealthy() != BackendHealthState.HEALTHY) {
// Skip if the cluster isn't healthy
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.dropwizard.views.common.View;
import io.trino.gateway.ha.clustermonitor.BackendHealthState;
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
import io.trino.gateway.ha.router.BackendStateManager;
import io.trino.gateway.ha.router.GatewayBackendManager;
import io.trino.gateway.ha.router.ResourceGroupsManager;
import io.trino.gateway.ha.router.RoutingManager;
Expand Down Expand Up @@ -43,6 +45,9 @@ public class EntityEditorResource {
@Inject
private ResourceGroupsManager resourceGroupsManager;

@Inject
private BackendStateManager backendStateManager;

@Inject
private RoutingManager routingManager;

Expand Down Expand Up @@ -75,7 +80,8 @@ public Response updateEntity(@QueryParam("entityType") String entityTypeStr,
OBJECT_MAPPER.readValue(jsonPayload, ProxyBackendConfiguration.class);
gatewayBackendManager.updateBackend(backend);
log.info("Setting up the backend {} with healthy state", backend.getName());
routingManager.upateBackEndHealth(backend.getName(), true);
routingManager.upateBackEndHealth(backend.getName(), BackendHealthState.PENDING);
backendStateManager.updateHealthState(backend.getName(), BackendHealthState.PENDING);
break;
case RESOURCE_GROUP:
ResourceGroupsDetail resourceGroupDetails = OBJECT_MAPPER.readValue(jsonPayload,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.trino.gateway.ha.router;

import io.trino.gateway.ha.clustermonitor.BackendHealthState;
import io.trino.gateway.ha.clustermonitor.ClusterStats;
import io.trino.gateway.ha.config.BackendStateConfiguration;
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
Expand Down Expand Up @@ -29,7 +30,7 @@ public BackendState getBackendState(ProxyBackendConfiguration backend) {
Map<String, Integer> state = new HashMap<>();
state.put("QUEUED", stats.getQueuedQueryCount());
state.put("RUNNING", stats.getRunningQueryCount());
return new BackendState(name, state);
return new BackendState(name, state, stats.getHealthy());
}

public BackendStateConfiguration getBackendStateConfiguration() {
Expand All @@ -40,14 +41,22 @@ public void updateStates(String clusterId, ClusterStats stats) {
clusterStats.put(clusterId, stats);
}

public void updateHealthState(String clusterId, BackendHealthState state) {
ClusterStats stats = clusterStats.getOrDefault(clusterId, new ClusterStats());
stats.setHealthy(state);
clusterStats.put(clusterId, stats);
}

@Data
public static class BackendState {
private final String name;
private final Map<String, Integer> state;
private final BackendHealthState healthy;

public BackendState(String name, Map<String, Integer> state) {
public BackendState(String name, Map<String, Integer> state, BackendHealthState healthy) {
this.name = name;
this.state = state;
this.healthy = healthy;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.trino.gateway.ha.clustermonitor.BackendHealthState;
import io.trino.gateway.ha.clustermonitor.ClusterStats;
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
import io.trino.gateway.proxyserver.ProxyServerConfiguration;
Expand Down Expand Up @@ -31,7 +32,7 @@ public abstract class RoutingManager {
private final LoadingCache<String, String> queryIdBackendCache;
private ExecutorService executorService = Executors.newFixedThreadPool(5);
private GatewayBackendManager gatewayBackendManager;
private ConcurrentHashMap<String, Boolean> backendToHealth;
private ConcurrentHashMap<String, BackendHealthState> backendToHealth;

public RoutingManager(GatewayBackendManager gatewayBackendManager) {
this.gatewayBackendManager = gatewayBackendManager;
Expand All @@ -47,7 +48,7 @@ public String load(String queryId) {
}
});

this.backendToHealth = new ConcurrentHashMap<String, Boolean>();
this.backendToHealth = new ConcurrentHashMap<>();
}

protected GatewayBackendManager getGatewayBackendManager() {
Expand Down Expand Up @@ -109,21 +110,11 @@ public String findBackendForQueryId(String queryId) {
return backendAddress;
}

public void upateBackEndHealth(String backendId, Boolean value) {
public void upateBackEndHealth(String backendId, BackendHealthState value) {
log.info("backend {} isHealthy {}", backendId, value);
backendToHealth.put(backendId, value);
}

public void updateBackEndHealthDB(ClusterStats stats) {
String name = stats.getClusterId();
if (stats.isHealthy()) {
gatewayBackendManager.activateBackend(name);
} else {
gatewayBackendManager.deactivateBackend(name);
}
}


/**
* This tries to find out which backend may have info about given query id. If not found returns
* the first healthy backend.
Expand Down Expand Up @@ -176,11 +167,11 @@ private boolean isBackendNotHealthy(String backendId) {
log.error("backends can not be empty");
return true;
}
Boolean isHealthy = backendToHealth.get(backendId);
BackendHealthState isHealthy = backendToHealth.get(backendId);
if (isHealthy == null) {
return true;
}
return !isHealthy;
return isHealthy != BackendHealthState.HEALTHY;
}

}
16 changes: 15 additions & 1 deletion gateway-ha/src/main/resources/template/gateway-view.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@
background-color: red;
}
.active_HEALTHY {
background-color: green;
}
.active_UNHEALTHY {
background-color: red;
}
.active_PENDING {
background-color: grey;
}
#availableClusters {
width: 75%;
border: 1px solid #ddd;
Expand Down Expand Up @@ -67,6 +79,7 @@
<th>Url</th>
<th>Group</th>
<th>Active</th>
<th>Healthy</th>
<#if backendStates?keys?size != 0>
<th>Queued<th>
<th>Running<th>
Expand All @@ -76,10 +89,11 @@
<tbody>
<#list backendConfigurations as bc>
<tr>
<td> ${bc.name}</td>
<td>${bc.name}</td>
<td><a href="${bc.externalUrl}/ui" target="_blank">${bc.externalUrl}</a></td>
<td> ${bc.routingGroup}</td>
<td class="active_${bc.active?c}"> ${bc.active?c} </td>
<td class="active_${backendStates[bc.name].healthy}">${backendStates[bc.name].healthy}</td>
<#if backendStates?keys?size != 0 && backendStates[bc.name]??>
<td>${backendStates[bc.name].state["QUEUED"]}<td>
<td>${backendStates[bc.name].state["RUNNING"]}<td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class HaGatewayTestUtils {
private static final OkHttpClient httpClient = new OkHttpClient();
private static final Random RANDOM = new Random();

public static final int WAIT_FOR_BACKEND_IN_SECONDS = 65;

public static void seedRequiredData(TestConfig testConfig) {
String jdbcUrl = "jdbc:h2:" + testConfig.getH2DbFilePath();
DataStoreConfiguration db = new DataStoreConfiguration(jdbcUrl, "sa", "sa", "org.h2.Driver", 4);
Expand All @@ -40,16 +42,26 @@ public static void seedRequiredData(TestConfig testConfig) {
connectionManager.close();
}

public static void prepareMockBackend(
WireMockServer backend, String endPoint, String expectedResonse) {
backend.start();
public static void prepareMockPostBackend(
WireMockServer backend, String endPoint, String expectedResonse, int status) {
backend.stubFor(
WireMock.post(endPoint)
.willReturn(
WireMock.aResponse()
.withBody(expectedResonse)
.withHeader("Content-Encoding", "plain")
.withStatus(200)));
.withStatus(status)));
}

public static void prepareMockGetBackend(
WireMockServer backend, String endPoint, String response, int status) {
backend.stubFor(
WireMock.get(endPoint)
.willReturn(
WireMock.aResponse()
.withBody(response)
.withHeader("Content-Encoding", "plain")
.withStatus(status)));
}

public static TestConfig buildGatewayConfigAndSeedDb(int routerPort, String configFile)
Expand Down

0 comments on commit 9df6315

Please sign in to comment.