diff --git a/handlers/handler-loadbalance/src/main/java/io/servicecomb/loadbalance/Configuration.java b/handlers/handler-loadbalance/src/main/java/io/servicecomb/loadbalance/Configuration.java index dd78097b7d1..a677bf31805 100644 --- a/handlers/handler-loadbalance/src/main/java/io/servicecomb/loadbalance/Configuration.java +++ b/handlers/handler-loadbalance/src/main/java/io/servicecomb/loadbalance/Configuration.java @@ -74,6 +74,8 @@ public final class Configuration { public static final String FILTER_SINGLE_TEST = "singleTestTime"; + public static final String FILTER_CONTINUOUS_FAILURE_THRESHOLD = "continuousFailureThreshold"; + public static final String TRANSACTIONCONTROL_OPTIONS_PREFIX_PATTERN = "cse.loadbalance.%s.transactionControl.options"; @@ -251,4 +253,21 @@ public static String getStringProperty(String defaultValue, String... keys) { return defaultValue; } } + + public int getContinuousFailureThreshold(String microservice) { + final int defaultValue = 0; + String p = getStringProperty("0", + PROP_ROOT + microservice + "." + FILTER_ISOLATION + FILTER_CONTINUOUS_FAILURE_THRESHOLD, + PROP_ROOT + FILTER_ISOLATION + FILTER_CONTINUOUS_FAILURE_THRESHOLD); + try { + int result = Integer.parseInt(p); + if (result > 0) { + return result; + } else { + return defaultValue; + } + } catch (NumberFormatException e) { + return defaultValue; + } + } } diff --git a/handlers/handler-loadbalance/src/main/java/io/servicecomb/loadbalance/CseServer.java b/handlers/handler-loadbalance/src/main/java/io/servicecomb/loadbalance/CseServer.java index 318b2c74688..faccc83af2f 100644 --- a/handlers/handler-loadbalance/src/main/java/io/servicecomb/loadbalance/CseServer.java +++ b/handlers/handler-loadbalance/src/main/java/io/servicecomb/loadbalance/CseServer.java @@ -16,6 +16,8 @@ package io.servicecomb.loadbalance; +import java.util.concurrent.atomic.AtomicInteger; + import com.netflix.loadbalancer.Server; import io.servicecomb.core.Endpoint; @@ -36,6 +38,11 @@ public class CseServer extends Server { private long lastVisitTime = System.currentTimeMillis(); + /** + * Count the continuous invocation failure. Once invocation successes, set this to zero. + */ + private AtomicInteger continuousFailureCount = new AtomicInteger(0); + public long getLastVisitTime() { return lastVisitTime; } @@ -71,6 +78,20 @@ public String getHost() { return endpoint.getEndpoint(); } + public void clearContinuousFailure() { + continuousFailureCount.set(0); + } + + public void incrementContinuousFailureCount() { + if (continuousFailureCount.get() < Integer.MAX_VALUE) { + continuousFailureCount.incrementAndGet(); + } + } + + public int getCountinuousFailureCount() { + return continuousFailureCount.get(); + } + public boolean equals(Object o) { if (o instanceof CseServer) { return this.getHost().equals(((CseServer) o).getHost()); diff --git a/handlers/handler-loadbalance/src/main/java/io/servicecomb/loadbalance/LoadbalanceHandler.java b/handlers/handler-loadbalance/src/main/java/io/servicecomb/loadbalance/LoadbalanceHandler.java index d47f31eca8c..9606cc5d782 100644 --- a/handlers/handler-loadbalance/src/main/java/io/servicecomb/loadbalance/LoadbalanceHandler.java +++ b/handlers/handler-loadbalance/src/main/java/io/servicecomb/loadbalance/LoadbalanceHandler.java @@ -77,6 +77,7 @@ public Thread newThread(Runnable r) { private final Object lock = new Object(); private String policy = null; + private String strategy = null; @@ -90,9 +91,9 @@ public LoadbalanceHandler() { public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception { String p = Configuration.INSTANCE.getPolicy(invocation.getMicroserviceName()); String strategy = Configuration.INSTANCE.getRuleStrategyName(invocation.getMicroserviceName()); - - if (this.policy != null && !this.policy.equals(p) || - (this.strategy != null && !this.strategy.equals(strategy))) { + + if (this.policy != null && !this.policy.equals(p) + || (this.strategy != null && !this.strategy.equals(strategy))) { //配置变化,需要重新生成所有的lb实例 synchronized (lock) { loadBalancerMap.clear(); @@ -164,8 +165,10 @@ private void send(Invocation invocation, AsyncResponse asyncResp, final LoadBala chosenLB.getLoadBalancerStats().noteResponseTime(server, (System.currentTimeMillis() - time)); if (resp.isFailed()) { chosenLB.getLoadBalancerStats().incrementSuccessiveConnectionFailureCount(server); + server.incrementContinuousFailureCount(); } else { chosenLB.getLoadBalancerStats().incrementActiveRequestsCount(server); + server.clearContinuousFailure(); } asyncResp.handle(resp); }); @@ -263,9 +266,11 @@ public Observable call(Server s) { ((Throwable) resp.getResult()).getMessage(), s); chosenLB.getLoadBalancerStats().incrementSuccessiveConnectionFailureCount(s); + ((CseServer) s).incrementContinuousFailureCount(); f.onError(resp.getResult()); } else { chosenLB.getLoadBalancerStats().incrementActiveRequestsCount(s); + ((CseServer) s).clearContinuousFailure(); chosenLB.getLoadBalancerStats().noteResponseTime(s, (System.currentTimeMillis() - time)); f.onNext(resp); diff --git a/handlers/handler-loadbalance/src/main/java/io/servicecomb/loadbalance/filter/IsolationServerListFilter.java b/handlers/handler-loadbalance/src/main/java/io/servicecomb/loadbalance/filter/IsolationServerListFilter.java index 0d2be530a47..f899f53af73 100644 --- a/handlers/handler-loadbalance/src/main/java/io/servicecomb/loadbalance/filter/IsolationServerListFilter.java +++ b/handlers/handler-loadbalance/src/main/java/io/servicecomb/loadbalance/filter/IsolationServerListFilter.java @@ -45,6 +45,8 @@ public final class IsolationServerListFilter implements ServerListFilterExt { private long enableRequestThreshold; + private int continuousFailureThreshold; + private Invocation invocation; private LoadBalancerStats stats; @@ -88,6 +90,7 @@ private void updateSettings() { errorThresholdPercentage = Configuration.INSTANCE.getErrorThresholdPercentage(microserviceName); singleTestTime = Configuration.INSTANCE.getSingleTestTime(microserviceName); enableRequestThreshold = Configuration.INSTANCE.getEnableRequestThreshold(microserviceName); + continuousFailureThreshold = Configuration.INSTANCE.getContinuousFailureThreshold(microserviceName); } private boolean allowVisit(Server server) { @@ -100,15 +103,25 @@ private boolean allowVisit(Server server) { return true; } - if ((failureRequest / (double) totalRequest) * PERCENT < errorThresholdPercentage) { - return true; + if (continuousFailureThreshold > 0) { + // continuousFailureThreshold has higher priority to decide the result + if (((CseServer) server).getCountinuousFailureCount() < continuousFailureThreshold) { + return true; + } + } else { + // if continuousFailureThreshold, then check error percentage + if ((failureRequest / (double) totalRequest) * PERCENT < errorThresholdPercentage) { + return true; + } } + if ((System.currentTimeMillis() - ((CseServer) server).getLastVisitTime()) > singleTestTime) { LOGGER.info("The Service {}'s instance {} has been break, will give a single test opportunity.", microserviceName, server); return true; } + LOGGER.warn("The Service {}'s instance {} has been break!", microserviceName, server); return false; } diff --git a/handlers/handler-loadbalance/src/test/java/io/servicecomb/loadbalance/TestCseServer.java b/handlers/handler-loadbalance/src/test/java/io/servicecomb/loadbalance/TestCseServer.java index a2ed00fc1a1..d96ad8a61ed 100644 --- a/handlers/handler-loadbalance/src/test/java/io/servicecomb/loadbalance/TestCseServer.java +++ b/handlers/handler-loadbalance/src/test/java/io/servicecomb/loadbalance/TestCseServer.java @@ -16,6 +16,7 @@ package io.servicecomb.loadbalance; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import org.junit.Assert; @@ -48,7 +49,7 @@ public void testGetEndpoint() { @Test public void testEqualsMethod() { - Assert.assertFalse(cs.equals((Object)"abcd")); + Assert.assertFalse(cs.equals((Object) "abcd")); CseServer other = new CseServer(transport, new CacheEndpoint("1234", null)); Assert.assertFalse(cs.equals(other)); @@ -74,4 +75,19 @@ public void testHashCodeMethod() { cs.hashCode(); assertNotNull(cs.hashCode()); } + + @Test + public void testIncrementContinuousFailureCount() { + int countBefore = cs.getCountinuousFailureCount(); + cs.incrementContinuousFailureCount(); + int countAfter = cs.getCountinuousFailureCount(); + assertEquals(countBefore + 1, countAfter); + } + + @Test + public void testClearContinuousFailure() { + cs.incrementContinuousFailureCount(); + cs.clearContinuousFailure(); + assertEquals(0, cs.getCountinuousFailureCount()); + } } diff --git a/handlers/handler-loadbalance/src/test/java/io/servicecomb/loadbalance/TestLoadbalanceHandler.java b/handlers/handler-loadbalance/src/test/java/io/servicecomb/loadbalance/TestLoadbalanceHandler.java index ba6f8f747aa..df596274c45 100644 --- a/handlers/handler-loadbalance/src/test/java/io/servicecomb/loadbalance/TestLoadbalanceHandler.java +++ b/handlers/handler-loadbalance/src/test/java/io/servicecomb/loadbalance/TestLoadbalanceHandler.java @@ -130,7 +130,6 @@ String getMicroserviceName() { void next(AsyncResponse asyncResp) throws Exception { asyncResp.handle(sendResponse); } - }; CseContext.getInstance().setTransportManager(transportManager); @@ -379,6 +378,7 @@ long currentTimeMillis() { result = server; } }; + int continuousFailureCount = server.getCountinuousFailureCount(); sendResponse = Response.create(Status.BAD_REQUEST, "send failed"); @@ -392,6 +392,7 @@ long currentTimeMillis() { loadBalancer.getLoadBalancerStats().getSingleServerStat(server).getSuccessiveConnectionFailureCount()); Assert.assertEquals("InvocationException: code=400;msg=send failed", result.value.getMessage()); + Assert.assertEquals(continuousFailureCount + 1, server.getCountinuousFailureCount()); } @Test @@ -410,6 +411,7 @@ long currentTimeMillis() { result = server; } }; + server.incrementContinuousFailureCount(); sendResponse = Response.ok("success"); @@ -422,15 +424,16 @@ long currentTimeMillis() { Assert.assertEquals(1, loadBalancer.getLoadBalancerStats().getSingleServerStat(server).getActiveRequestsCount()); Assert.assertEquals("success", result.value); + Assert.assertEquals(0, server.getCountinuousFailureCount()); } - + @Test public void sendWithRetry() { Holder result = new Holder<>(); Deencapsulation.invoke(handler, "sendWithRetry", invocation, (AsyncResponse) resp -> { result.value = resp.getResult(); }, loadBalancer); - + // no exception } } diff --git a/handlers/handler-loadbalance/src/test/java/io/servicecomb/loadbalance/filter/TestIsolationServerListFilter.java b/handlers/handler-loadbalance/src/test/java/io/servicecomb/loadbalance/filter/TestIsolationServerListFilter.java index 508b89e0619..68315bdcac9 100644 --- a/handlers/handler-loadbalance/src/test/java/io/servicecomb/loadbalance/filter/TestIsolationServerListFilter.java +++ b/handlers/handler-loadbalance/src/test/java/io/servicecomb/loadbalance/filter/TestIsolationServerListFilter.java @@ -75,6 +75,10 @@ public void setUp() throws Exception { public void tearDown() throws Exception { IsolationServerListFilter = null; loadBalancerStats = null; + + AbstractConfiguration configuration = + (AbstractConfiguration) DynamicPropertyFactory.getBackingConfigurationSource(); + configuration.clearProperty("cse.loadbalance.isolation.continuousFailureThreshold"); } @Test @@ -112,4 +116,50 @@ public void testGetFilteredListOfServers() { returnedServerList = IsolationServerListFilter.getFilteredListOfServers(serverList); Assert.assertEquals(returnedServerList.size(), 0); } + + @Test + public void testGetFilteredListOfServersOnContinuousFailureReachesThreshold() { + ((AbstractConfiguration) DynamicPropertyFactory.getBackingConfigurationSource()) + .addProperty("cse.loadbalance.isolation.continuousFailureThreshold", + "3"); + Invocation invocation = Mockito.mock(Invocation.class); + CseServer testServer = Mockito.mock(CseServer.class); + Mockito.when(invocation.getMicroserviceName()).thenReturn("microserviceName"); + Mockito.when(testServer.getCountinuousFailureCount()).thenReturn(3); + Mockito.when(testServer.getLastVisitTime()).thenReturn(System.currentTimeMillis()); + + for (int i = 0; i < 3; ++i) { + loadBalancerStats.incrementNumRequests(testServer); + } + + List serverList = new ArrayList(); + serverList.add(testServer); + IsolationServerListFilter.setLoadBalancerStats(loadBalancerStats); + IsolationServerListFilter.setInvocation(invocation); + List returnedServerList = IsolationServerListFilter.getFilteredListOfServers(serverList); + Assert.assertEquals(0, returnedServerList.size()); + } + + @Test + public void testGetFilteredListOfServersOnContinuousFailureIsBelowThreshold() { + ((AbstractConfiguration) DynamicPropertyFactory.getBackingConfigurationSource()) + .addProperty("cse.loadbalance.isolation.continuousFailureThreshold", + "3"); + Invocation invocation = Mockito.mock(Invocation.class); + CseServer testServer = Mockito.mock(CseServer.class); + Mockito.when(invocation.getMicroserviceName()).thenReturn("microserviceName"); + Mockito.when(testServer.getCountinuousFailureCount()).thenReturn(2); + Mockito.when(testServer.getLastVisitTime()).thenReturn(System.currentTimeMillis()); + + for (int i = 0; i < 3; ++i) { + loadBalancerStats.incrementNumRequests(testServer); + } + + List serverList = new ArrayList(); + serverList.add(testServer); + IsolationServerListFilter.setLoadBalancerStats(loadBalancerStats); + IsolationServerListFilter.setInvocation(invocation); + List returnedServerList = IsolationServerListFilter.getFilteredListOfServers(serverList); + Assert.assertEquals(1, returnedServerList.size()); + } }