diff --git a/gateway-ha/pom.xml b/gateway-ha/pom.xml index 27b1fcf62..11805deca 100644 --- a/gateway-ha/pom.xml +++ b/gateway-ha/pom.xml @@ -159,6 +159,18 @@ test + + io.dropwizard + dropwizard-testing + test + + + junit + junit + + + + com.github.tomakehurst wiremock diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/HaGatewayLauncher.java b/gateway-ha/src/main/java/io/trino/gateway/ha/HaGatewayLauncher.java index 1aeb5ce7e..ad87ab9da 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/HaGatewayLauncher.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/HaGatewayLauncher.java @@ -12,6 +12,8 @@ public HaGatewayLauncher(String... basePackages) { super(basePackages); } + public HaGatewayLauncher(){} + @Override public void initialize(Bootstrap bootstrap) { super.initialize(bootstrap); diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/BackendHealthState.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/BackendHealthState.java new file mode 100644 index 000000000..e96216ace --- /dev/null +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/BackendHealthState.java @@ -0,0 +1,8 @@ +package io.trino.gateway.ha.clustermonitor; + +public enum BackendHealthState { + HEALTHY, + PENDING, + UNHEALTHY +} + diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStats.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStats.java index 3176248f4..7e5bf6c9d 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStats.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStats.java @@ -12,7 +12,7 @@ public class ClusterStats { private int queuedQueryCount; private int blockedQueryCount; private int numWorkerNodes; - private boolean healthy; + private BackendHealthState healthy = BackendHealthState.PENDING; private String clusterId; private String proxyTo; private String externalUrl; diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsHttpMonitor.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsHttpMonitor.java index b1cfc2f18..35a84d7f4 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsHttpMonitor.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsHttpMonitor.java @@ -49,17 +49,17 @@ public ClusterStats monitor(ProxyBackendConfiguration backend) { HashMap result = null; result = new ObjectMapper().readValue(response, HashMap.class); - clusterStats.setNumWorkerNodes((int) result.get("activeWorkers")); - clusterStats.setQueuedQueryCount((int) result.get("queuedQueries")); - clusterStats.setRunningQueryCount((int) result.get("runningQueries")); - clusterStats.setBlockedQueryCount((int) result.get("blockedQueries")); - clusterStats.setHealthy(clusterStats.getNumWorkerNodes() > 0); + clusterStats.setNumWorkerNodes((int) result.getOrDefault("activeWorkers", 0)); + clusterStats.setQueuedQueryCount((int) result.getOrDefault("queuedQueries", 0)); + clusterStats.setRunningQueryCount((int) result.getOrDefault("runningQueries", 0)); + clusterStats.setBlockedQueryCount((int) result.getOrDefault("blockedQueries", 0)); + clusterStats.setHealthy(clusterStats.getNumWorkerNodes() > 0 ? BackendHealthState.HEALTHY : BackendHealthState.UNHEALTHY); clusterStats.setProxyTo(backend.getProxyTo()); clusterStats.setExternalUrl(backend.getExternalUrl()); clusterStats.setRoutingGroup(backend.getRoutingGroup()); } catch (Exception e) { - log.error("Error parsing cluster stats from [{}]", response, e); + log.error("Error parsing cluster stats from {}", response, e); } // Fetch User Level Stats. diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsJdbcMonitor.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsJdbcMonitor.java index 93e2b3362..40643d8f0 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsJdbcMonitor.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsJdbcMonitor.java @@ -78,14 +78,14 @@ public ClusterStats monitor(ProxyBackendConfiguration backend) { while (rs.next()) { partialState.put(rs.getString("state"), rs.getInt("count")); } - clusterStats.setHealthy(true); + clusterStats.setHealthy(BackendHealthState.HEALTHY); clusterStats.setQueuedQueryCount(partialState.getOrDefault("QUEUED", 0)); clusterStats.setRunningQueryCount(partialState.getOrDefault("RUNNING", 0)); return clusterStats; } catch (TimeoutException e) { - log.error("timed out fetching status for {} backend, {}", url, e); + log.error("timed out fetching status for {} backend, {}", jdbcUrl, e); } catch (Exception e) { - log.error("could not fetch status for {} backend, {}", url, e); + log.error("could not fetch status for {} backend, {}", jdbcUrl, e); } return clusterStats; } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/HealthCheckObserver.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/HealthCheckObserver.java index cd81b037c..c38ae79cc 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/HealthCheckObserver.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/HealthCheckObserver.java @@ -14,8 +14,7 @@ public HealthCheckObserver(RoutingManager routingManager) { @Override public void observe(java.util.List clustersStats) { for (ClusterStats clusterStats : clustersStats) { - routingManager.upateBackEndHealth(clusterStats.getClusterId(), clusterStats.isHealthy()); - routingManager.updateBackEndHealthDB(clusterStats); + routingManager.upateBackEndHealth(clusterStats.getClusterId(), clusterStats.getHealthy()); } } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/HealthChecker.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/HealthChecker.java index 4487c1d32..c08dfba90 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/HealthChecker.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/HealthChecker.java @@ -15,7 +15,7 @@ public HealthChecker(Notifier notifier) { @Override public void observe(List clustersStats) { for (ClusterStats clusterStats : clustersStats) { - if (!clusterStats.isHealthy()) { + if (clusterStats.getHealthy() != BackendHealthState.HEALTHY) { notifyUnhealthyCluster(clusterStats); } else { if (clusterStats.getQueuedQueryCount() > MAX_THRESHOLD_QUEUED_QUERY_COUNT) { diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/TrinoQueueLengthChecker.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/TrinoQueueLengthChecker.java index bfc25bdd6..28e14f8c5 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/TrinoQueueLengthChecker.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/TrinoQueueLengthChecker.java @@ -27,7 +27,7 @@ public void observe(List stats) { = new HashMap<>(); for (ClusterStats stat : stats) { - if (!stat.isHealthy()) { + if (stat.getHealthy() != BackendHealthState.HEALTHY) { // Skip if the cluster isn't healthy continue; } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/resource/EntityEditorResource.java b/gateway-ha/src/main/java/io/trino/gateway/ha/resource/EntityEditorResource.java index e2230cef7..847c52b40 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/resource/EntityEditorResource.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/resource/EntityEditorResource.java @@ -8,7 +8,9 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import io.dropwizard.views.common.View; +import io.trino.gateway.ha.clustermonitor.BackendHealthState; import io.trino.gateway.ha.config.ProxyBackendConfiguration; +import io.trino.gateway.ha.router.BackendStateManager; import io.trino.gateway.ha.router.GatewayBackendManager; import io.trino.gateway.ha.router.ResourceGroupsManager; import io.trino.gateway.ha.router.RoutingManager; @@ -43,6 +45,9 @@ public class EntityEditorResource { @Inject private ResourceGroupsManager resourceGroupsManager; + @Inject + private BackendStateManager backendStateManager; + @Inject private RoutingManager routingManager; @@ -75,7 +80,8 @@ public Response updateEntity(@QueryParam("entityType") String entityTypeStr, OBJECT_MAPPER.readValue(jsonPayload, ProxyBackendConfiguration.class); gatewayBackendManager.updateBackend(backend); log.info("Setting up the backend {} with healthy state", backend.getName()); - routingManager.upateBackEndHealth(backend.getName(), true); + routingManager.upateBackEndHealth(backend.getName(), BackendHealthState.PENDING); + backendStateManager.updateHealthState(backend.getName(), BackendHealthState.PENDING); break; case RESOURCE_GROUP: ResourceGroupsDetail resourceGroupDetails = OBJECT_MAPPER.readValue(jsonPayload, diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/BackendStateManager.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/BackendStateManager.java index 6d3243042..552355c54 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/router/BackendStateManager.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/router/BackendStateManager.java @@ -1,5 +1,6 @@ package io.trino.gateway.ha.router; +import io.trino.gateway.ha.clustermonitor.BackendHealthState; import io.trino.gateway.ha.clustermonitor.ClusterStats; import io.trino.gateway.ha.config.BackendStateConfiguration; import io.trino.gateway.ha.config.ProxyBackendConfiguration; @@ -29,7 +30,7 @@ public BackendState getBackendState(ProxyBackendConfiguration backend) { Map state = new HashMap<>(); state.put("QUEUED", stats.getQueuedQueryCount()); state.put("RUNNING", stats.getRunningQueryCount()); - return new BackendState(name, state); + return new BackendState(name, state, stats.getHealthy()); } public BackendStateConfiguration getBackendStateConfiguration() { @@ -40,14 +41,22 @@ public void updateStates(String clusterId, ClusterStats stats) { clusterStats.put(clusterId, stats); } + public void updateHealthState(String clusterId, BackendHealthState state) { + ClusterStats stats = clusterStats.getOrDefault(clusterId, new ClusterStats()); + stats.setHealthy(state); + clusterStats.put(clusterId, stats); + } + @Data public static class BackendState { private final String name; private final Map state; + private final BackendHealthState healthy; - public BackendState(String name, Map state) { + public BackendState(String name, Map state, BackendHealthState healthy) { this.name = name; this.state = state; + this.healthy = healthy; } } } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingManager.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingManager.java index 46a97f094..4013983bf 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingManager.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingManager.java @@ -3,6 +3,7 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import io.trino.gateway.ha.clustermonitor.BackendHealthState; import io.trino.gateway.ha.clustermonitor.ClusterStats; import io.trino.gateway.ha.config.ProxyBackendConfiguration; import io.trino.gateway.proxyserver.ProxyServerConfiguration; @@ -31,7 +32,7 @@ public abstract class RoutingManager { private final LoadingCache queryIdBackendCache; private ExecutorService executorService = Executors.newFixedThreadPool(5); private GatewayBackendManager gatewayBackendManager; - private ConcurrentHashMap backendToHealth; + private ConcurrentHashMap backendToHealth; public RoutingManager(GatewayBackendManager gatewayBackendManager) { this.gatewayBackendManager = gatewayBackendManager; @@ -47,7 +48,7 @@ public String load(String queryId) { } }); - this.backendToHealth = new ConcurrentHashMap(); + this.backendToHealth = new ConcurrentHashMap<>(); } protected GatewayBackendManager getGatewayBackendManager() { @@ -109,21 +110,11 @@ public String findBackendForQueryId(String queryId) { return backendAddress; } - public void upateBackEndHealth(String backendId, Boolean value) { + public void upateBackEndHealth(String backendId, BackendHealthState value) { log.info("backend {} isHealthy {}", backendId, value); backendToHealth.put(backendId, value); } - public void updateBackEndHealthDB(ClusterStats stats) { - String name = stats.getClusterId(); - if (stats.isHealthy()) { - gatewayBackendManager.activateBackend(name); - } else { - gatewayBackendManager.deactivateBackend(name); - } - } - - /** * This tries to find out which backend may have info about given query id. If not found returns * the first healthy backend. @@ -176,11 +167,11 @@ private boolean isBackendNotHealthy(String backendId) { log.error("backends can not be empty"); return true; } - Boolean isHealthy = backendToHealth.get(backendId); + BackendHealthState isHealthy = backendToHealth.get(backendId); if (isHealthy == null) { return true; } - return !isHealthy; + return isHealthy != BackendHealthState.HEALTHY; } } diff --git a/gateway-ha/src/main/resources/template/gateway-view.ftl b/gateway-ha/src/main/resources/template/gateway-view.ftl index 2a6e46eb4..f7c0f76d7 100644 --- a/gateway-ha/src/main/resources/template/gateway-view.ftl +++ b/gateway-ha/src/main/resources/template/gateway-view.ftl @@ -24,6 +24,18 @@ background-color: red; } + .active_HEALTHY { + background-color: green; + } + + .active_UNHEALTHY { + background-color: red; + } + + .active_PENDING { + background-color: grey; + } + #availableClusters { width: 75%; border: 1px solid #ddd; @@ -67,6 +79,7 @@ Url Group Active + Healthy <#if backendStates?keys?size != 0> Queued Running @@ -76,10 +89,11 @@ <#list backendConfigurations as bc> - ${bc.name} + ${bc.name} ${bc.externalUrl} ${bc.routingGroup} ${bc.active?c} + ${backendStates[bc.name].healthy} <#if backendStates?keys?size != 0 && backendStates[bc.name]??> ${backendStates[bc.name].state["QUEUED"]} ${backendStates[bc.name].state["RUNNING"]} diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/HaGatewayTestUtils.java b/gateway-ha/src/test/java/io/trino/gateway/ha/HaGatewayTestUtils.java index f8ccfd575..49bf68046 100644 --- a/gateway-ha/src/test/java/io/trino/gateway/ha/HaGatewayTestUtils.java +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/HaGatewayTestUtils.java @@ -31,6 +31,8 @@ public class HaGatewayTestUtils { private static final OkHttpClient httpClient = new OkHttpClient(); private static final Random RANDOM = new Random(); + public static final int WAIT_FOR_BACKEND_IN_SECONDS = 65; + public static void seedRequiredData(TestConfig testConfig) { String jdbcUrl = "jdbc:h2:" + testConfig.getH2DbFilePath(); DataStoreConfiguration db = new DataStoreConfiguration(jdbcUrl, "sa", "sa", "org.h2.Driver", 4); @@ -40,16 +42,26 @@ public static void seedRequiredData(TestConfig testConfig) { connectionManager.close(); } - public static void prepareMockBackend( - WireMockServer backend, String endPoint, String expectedResonse) { - backend.start(); + public static void prepareMockPostBackend( + WireMockServer backend, String endPoint, String expectedResonse, int status) { backend.stubFor( WireMock.post(endPoint) .willReturn( WireMock.aResponse() .withBody(expectedResonse) .withHeader("Content-Encoding", "plain") - .withStatus(200))); + .withStatus(status))); + } + + public static void prepareMockGetBackend( + WireMockServer backend, String endPoint, String response, int status) { + backend.stubFor( + WireMock.get(endPoint) + .willReturn( + WireMock.aResponse() + .withBody(response) + .withHeader("Content-Encoding", "plain") + .withStatus(status))); } public static TestConfig buildGatewayConfigAndSeedDb(int routerPort, String configFile) diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/TestGatewayHaMultipleBackend.java b/gateway-ha/src/test/java/io/trino/gateway/ha/TestGatewayHaMultipleBackend.java index ad53b0fc2..dc0135aa7 100644 --- a/gateway-ha/src/test/java/io/trino/gateway/ha/TestGatewayHaMultipleBackend.java +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/TestGatewayHaMultipleBackend.java @@ -1,5 +1,10 @@ package io.trino.gateway.ha; +import static io.trino.gateway.ha.HaGatewayTestUtils.WAIT_FOR_BACKEND_IN_SECONDS; +import static io.trino.gateway.ha.handler.QueryIdCachingProxyHandler.UI_API_QUEUED_LIST_PATH; +import static io.trino.gateway.ha.handler.QueryIdCachingProxyHandler.UI_API_STATS_PATH; +import static io.trino.gateway.ha.handler.QueryIdCachingProxyHandler.UI_LOGIN_PATH; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -7,7 +12,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.github.tomakehurst.wiremock.WireMockServer; import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import io.dropwizard.testing.junit5.DropwizardExtensionsSupport; import io.trino.gateway.ha.config.ProxyBackendConfiguration; +import lombok.extern.slf4j.Slf4j; import okhttp3.MediaType; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -18,11 +25,14 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestInstance.Lifecycle; +import org.junit.jupiter.api.extension.ExtendWith; @TestInstance(Lifecycle.PER_CLASS) +@ExtendWith(DropwizardExtensionsSupport.class) +@Slf4j public class TestGatewayHaMultipleBackend { - public static final String EXPECTED_RESPONSE1 = "{\"id\":\"testId1\"}"; - public static final String EXPECTED_RESPONSE2 = "{\"id\":\"testId2\"}"; + public static final String EXPECTED_RESPONSE_1 = "{\"id\":\"testId1\"}"; + public static final String EXPECTED_RESPONSE_2 = "{\"id\":\"testId2\"}"; public static final String CUSTOM_RESPONSE = "123"; public static final String CUSTOM_PATH = "/v1/custom/extra"; @@ -43,9 +53,25 @@ public class TestGatewayHaMultipleBackend { @BeforeAll public void setup() throws Exception { - HaGatewayTestUtils.prepareMockBackend(adhocBackend, "/v1/statement", EXPECTED_RESPONSE1); - HaGatewayTestUtils.prepareMockBackend(scheduledBackend, "/v1/statement", EXPECTED_RESPONSE2); - HaGatewayTestUtils.prepareMockBackend(customBackend, CUSTOM_PATH, CUSTOM_RESPONSE); + adhocBackend.start(); + scheduledBackend.start(); + customBackend.start(); + // mock adhocBackend response + HaGatewayTestUtils.prepareMockPostBackend(adhocBackend, "/v1/statement", EXPECTED_RESPONSE_1, 200); + HaGatewayTestUtils.prepareMockPostBackend(adhocBackend, UI_LOGIN_PATH, "", 200); + HaGatewayTestUtils.prepareMockGetBackend(adhocBackend, UI_API_STATS_PATH, "{\"activeWorkers\": 1}", 200); + HaGatewayTestUtils.prepareMockGetBackend(adhocBackend, UI_API_QUEUED_LIST_PATH, null, 200); + + // mock scheduledBackend response + HaGatewayTestUtils.prepareMockPostBackend(scheduledBackend, "/v1/statement", EXPECTED_RESPONSE_2, 200); + HaGatewayTestUtils.prepareMockPostBackend(scheduledBackend, UI_LOGIN_PATH, "", 200); + HaGatewayTestUtils.prepareMockGetBackend(scheduledBackend, UI_API_STATS_PATH, "{\"activeWorkers\": 1}", 200); + HaGatewayTestUtils.prepareMockGetBackend(scheduledBackend, UI_API_QUEUED_LIST_PATH, null, 200); + + HaGatewayTestUtils.prepareMockPostBackend(customBackend, CUSTOM_PATH, CUSTOM_RESPONSE, 200); + HaGatewayTestUtils.prepareMockPostBackend(customBackend, UI_LOGIN_PATH, "", 200); + HaGatewayTestUtils.prepareMockGetBackend(customBackend, UI_API_STATS_PATH, "{\"activeWorkers\": 1}", 200); + HaGatewayTestUtils.prepareMockGetBackend(customBackend, UI_API_QUEUED_LIST_PATH, null, 200); // seed database HaGatewayTestUtils.TestConfig testConfig = @@ -64,6 +90,8 @@ public void setup() throws Exception { "custom", "http://localhost:" + customBackendPort, "externalUrl", true, "custom", routerPort); + log.info("waiting for backend to become healthy"); + SECONDS.sleep(WAIT_FOR_BACKEND_IN_SECONDS); } @Test @@ -77,7 +105,7 @@ public void testCustomPath() throws Exception { .addHeader("X-Trino-Routing-Group", "custom") .build(); Response response1 = httpClient.newCall(request1).execute(); - assertEquals(response1.body().string(), CUSTOM_RESPONSE); + assertEquals(CUSTOM_RESPONSE, response1.body().string()); Request request2 = new Request.Builder() @@ -86,7 +114,7 @@ public void testCustomPath() throws Exception { .addHeader("X-Trino-Routing-Group", "custom") .build(); Response response2 = httpClient.newCall(request2).execute(); - assertEquals(response2.code(), 404); + assertEquals(404, response2.code()); } @Test @@ -100,7 +128,7 @@ public void testQueryDeliveryToMultipleRoutingGroups() throws Exception { .post(requestBody) .build(); Response response1 = httpClient.newCall(request1).execute(); - assertEquals(EXPECTED_RESPONSE1, response1.body().string()); + assertEquals(EXPECTED_RESPONSE_1, response1.body().string()); // When X-Trino-Routing-Group is set in header, query should be routed to cluster under the // routing group Request request4 = @@ -110,7 +138,7 @@ public void testQueryDeliveryToMultipleRoutingGroups() throws Exception { .addHeader("X-Trino-Routing-Group", "scheduled") .build(); Response response4 = httpClient.newCall(request4).execute(); - assertEquals(EXPECTED_RESPONSE2, response4.body().string()); + assertEquals(EXPECTED_RESPONSE_2, response4.body().string()); } @Test diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/TestGatewayHaSingleBackend.java b/gateway-ha/src/test/java/io/trino/gateway/ha/TestGatewayHaSingleBackend.java index 6a8268518..243d1e7e2 100644 --- a/gateway-ha/src/test/java/io/trino/gateway/ha/TestGatewayHaSingleBackend.java +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/TestGatewayHaSingleBackend.java @@ -1,5 +1,13 @@ package io.trino.gateway.ha; +import static io.trino.gateway.ha.HaGatewayTestUtils.WAIT_FOR_BACKEND_IN_SECONDS; +import static io.trino.gateway.ha.HaGatewayTestUtils.prepareMockGetBackend; +import static io.trino.gateway.ha.HaGatewayTestUtils.prepareMockPostBackend; +import static io.trino.gateway.ha.HaGatewayTestUtils.setUpBackend; +import static io.trino.gateway.ha.handler.QueryIdCachingProxyHandler.UI_API_QUEUED_LIST_PATH; +import static io.trino.gateway.ha.handler.QueryIdCachingProxyHandler.UI_API_STATS_PATH; +import static io.trino.gateway.ha.handler.QueryIdCachingProxyHandler.UI_LOGIN_PATH; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -7,7 +15,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.github.tomakehurst.wiremock.WireMockServer; import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import io.dropwizard.testing.junit5.DropwizardExtensionsSupport; import io.trino.gateway.ha.config.ProxyBackendConfiguration; +import lombok.extern.slf4j.Slf4j; import okhttp3.MediaType; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -18,8 +28,11 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestInstance.Lifecycle; +import org.junit.jupiter.api.extension.ExtendWith; @TestInstance(Lifecycle.PER_CLASS) +@ExtendWith(DropwizardExtensionsSupport.class) +@Slf4j public class TestGatewayHaSingleBackend { public static final String EXPECTED_RESPONSE = "{\"id\":\"testId\"}"; int backendPort = 20000 + (int) (Math.random() * 1000); @@ -31,7 +44,11 @@ public class TestGatewayHaSingleBackend { @BeforeAll public void setup() throws Exception { - HaGatewayTestUtils.prepareMockBackend(backend, "/v1/statement", EXPECTED_RESPONSE); + backend.start(); + prepareMockPostBackend(backend, "/v1/statement", EXPECTED_RESPONSE, 200); + prepareMockPostBackend(backend, UI_LOGIN_PATH, "", 200); + prepareMockGetBackend(backend, UI_API_STATS_PATH, "{\"activeWorkers\": 1}", 200); + prepareMockGetBackend(backend, UI_API_QUEUED_LIST_PATH, null, 200); // seed database HaGatewayTestUtils.TestConfig testConfig = @@ -40,8 +57,10 @@ public void setup() throws Exception { String[] args = {"server", testConfig.getConfigFilePath()}; HaGatewayLauncher.main(args); // Now populate the backend - HaGatewayTestUtils.setUpBackend( - "trino1", "http://localhost:" + backendPort, "externalUrl", true, "adhoc", routerPort); + setUpBackend("trino1", "http://localhost:" + backendPort,"externalUrl",true, "adhoc", routerPort); + + log.info("waiting for backend to become healthy"); + SECONDS.sleep(WAIT_FOR_BACKEND_IN_SECONDS); } @Test diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestHaRoutingManager.java b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestHaRoutingManager.java index 4cd43ae02..9906f1cbe 100644 --- a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestHaRoutingManager.java +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestHaRoutingManager.java @@ -1,6 +1,7 @@ package io.trino.gateway.ha.router; import io.trino.gateway.ha.HaGatewayTestUtils; +import io.trino.gateway.ha.clustermonitor.BackendHealthState; import io.trino.gateway.ha.config.DataStoreConfiguration; import io.trino.gateway.ha.config.ProxyBackendConfiguration; import io.trino.gateway.ha.persistence.JdbcConnectionManager; @@ -49,8 +50,8 @@ private void addMockBackends() { proxyBackend.setProxyTo(backend + ".trino.example.com"); proxyBackend.setExternalUrl("trino.example.com"); backendManager.addBackend(proxyBackend); - //set backend as healthyti start with - haRoutingManager.upateBackEndHealth(backend, true); + //set backend as healthy to start with + haRoutingManager.upateBackEndHealth(backend, BackendHealthState.HEALTHY); } //Keep only 1st backend as healthy, mark all the others as unhealthy @@ -58,7 +59,7 @@ private void addMockBackends() { for (int i = 1; i < numBackends; i++) { backend = groupName + i; - haRoutingManager.upateBackEndHealth(backend, false); + haRoutingManager.upateBackEndHealth(backend, BackendHealthState.UNHEALTHY); } assert (haRoutingManager.provideBackendForRoutingGroup(groupName, "") diff --git a/gateway-ha/src/test/resources/auth/ldapTestConfig.yml b/gateway-ha/src/test/resources/auth/ldapTestConfig.yml index 154bcd49c..91ed6c844 100644 --- a/gateway-ha/src/test/resources/auth/ldapTestConfig.yml +++ b/gateway-ha/src/test/resources/auth/ldapTestConfig.yml @@ -1,12 +1,12 @@ -ldapHost: 'exmple.com' +ldapHost: "exmple.com" ldapPort: 389 useTls: true #ldapPort: 636 useSsl: false -ldapAdminBindDn: 'CN=AD Query,OU=accts,DC=dept1,DC=example,DC=com' -ldapUserBaseDn: 'OU=accts,DC=dept1,DC=example,DC=com' -ldapUserSearch: '(&(objectclass=user)(sAMAccountName=${USER}))' -ldapGroupMemberAttribute: 'memberOf' -ldapAdminPassword: 'passit' -ldapTrustStorePath: '/etc/ssl/certs/java/cacerts' -ldapTrustStorePassword: 'secret' +ldapAdminBindDn: "CN=AD Query,OU=accts,DC=dept1,DC=example,DC=com" +ldapUserBaseDn: "OU=accts,DC=dept1,DC=example,DC=com" +ldapUserSearch: "(&(objectclass=user)(sAMAccountName=${USER}))" +ldapGroupMemberAttribute: "memberOf" +ldapAdminPassword: "passit" +ldapTrustStorePath: "/etc/ssl/certs/java/cacerts" +ldapTrustStorePassword: "secret" diff --git a/gateway-ha/src/test/resources/test-config-template.yml b/gateway-ha/src/test/resources/test-config-template.yml index 1dae38d27..e9a977f5f 100644 --- a/gateway-ha/src/test/resources/test-config-template.yml +++ b/gateway-ha/src/test/resources/test-config-template.yml @@ -17,11 +17,21 @@ dataStore: password: sa driver: org.h2.Driver +backendState: + username: lb_query + ssl: false + +clusterStatsConfiguration: + useApi: true + modules: - io.trino.gateway.ha.module.HaGatewayProviderModule + - io.trino.gateway.ha.module.ClusterStateListenerModule + - io.trino.gateway.ha.module.ClusterStatsMonitorModule managedApps: - io.trino.gateway.ha.GatewayManagedApp + - io.trino.gateway.ha.clustermonitor.ActiveClusterMonitor extraWhitelistPaths: - "/v1/custom"