Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public final class Configuration {

public static final String FILTER_SINGLE_TEST = "singleTestTime";

public static final String FILTER_CONTINUOUS_FAILURE_THRESHOLD = "continuousFailureThreshold";

public static final String TRANSACTIONCONTROL_OPTIONS_PREFIX_PATTERN =
"cse.loadbalance.%s.transactionControl.options";

Expand Down Expand Up @@ -251,4 +253,21 @@ public static String getStringProperty(String defaultValue, String... keys) {
return defaultValue;
}
}

public int getContinuousFailureThreshold(String microservice) {
final int defaultValue = 0;
String p = getStringProperty("0",
PROP_ROOT + microservice + "." + FILTER_ISOLATION + FILTER_CONTINUOUS_FAILURE_THRESHOLD,
PROP_ROOT + FILTER_ISOLATION + FILTER_CONTINUOUS_FAILURE_THRESHOLD);
try {
int result = Integer.parseInt(p);
if (result > 0) {
return result;
} else {
return defaultValue;
}
} catch (NumberFormatException e) {
return defaultValue;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.servicecomb.loadbalance;

import java.util.concurrent.atomic.AtomicInteger;

import com.netflix.loadbalancer.Server;

import io.servicecomb.core.Endpoint;
Expand All @@ -36,6 +38,11 @@ public class CseServer extends Server {

private long lastVisitTime = System.currentTimeMillis();

/**
* Count the continuous invocation failure. Once invocation successes, set this to zero.
*/
private AtomicInteger continuousFailureCount = new AtomicInteger(0);

public long getLastVisitTime() {
return lastVisitTime;
}
Expand Down Expand Up @@ -71,6 +78,20 @@ public String getHost() {
return endpoint.getEndpoint();
}

public void clearContinuousFailure() {
continuousFailureCount.set(0);
}

public void incrementContinuousFailureCount() {
if (continuousFailureCount.get() < Integer.MAX_VALUE) {
continuousFailureCount.incrementAndGet();
}
}

public int getCountinuousFailureCount() {
return continuousFailureCount.get();
}

public boolean equals(Object o) {
if (o instanceof CseServer) {
return this.getHost().equals(((CseServer) o).getHost());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public Thread newThread(Runnable r) {
private final Object lock = new Object();

private String policy = null;

private String strategy = null;


Expand All @@ -90,9 +91,9 @@ public LoadbalanceHandler() {
public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception {
String p = Configuration.INSTANCE.getPolicy(invocation.getMicroserviceName());
String strategy = Configuration.INSTANCE.getRuleStrategyName(invocation.getMicroserviceName());
if (this.policy != null && !this.policy.equals(p) ||
(this.strategy != null && !this.strategy.equals(strategy))) {

if (this.policy != null && !this.policy.equals(p)
|| (this.strategy != null && !this.strategy.equals(strategy))) {
//配置变化,需要重新生成所有的lb实例
synchronized (lock) {
loadBalancerMap.clear();
Expand Down Expand Up @@ -164,8 +165,10 @@ private void send(Invocation invocation, AsyncResponse asyncResp, final LoadBala
chosenLB.getLoadBalancerStats().noteResponseTime(server, (System.currentTimeMillis() - time));
if (resp.isFailed()) {
chosenLB.getLoadBalancerStats().incrementSuccessiveConnectionFailureCount(server);
server.incrementContinuousFailureCount();
} else {
chosenLB.getLoadBalancerStats().incrementActiveRequestsCount(server);
server.clearContinuousFailure();
}
asyncResp.handle(resp);
});
Expand Down Expand Up @@ -263,9 +266,11 @@ public Observable<Response> call(Server s) {
((Throwable) resp.getResult()).getMessage(),
s);
chosenLB.getLoadBalancerStats().incrementSuccessiveConnectionFailureCount(s);
((CseServer) s).incrementContinuousFailureCount();
f.onError(resp.getResult());
} else {
chosenLB.getLoadBalancerStats().incrementActiveRequestsCount(s);
((CseServer) s).clearContinuousFailure();
chosenLB.getLoadBalancerStats().noteResponseTime(s,
(System.currentTimeMillis() - time));
f.onNext(resp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public final class IsolationServerListFilter implements ServerListFilterExt {

private long enableRequestThreshold;

private int continuousFailureThreshold;

private Invocation invocation;

private LoadBalancerStats stats;
Expand Down Expand Up @@ -88,6 +90,7 @@ private void updateSettings() {
errorThresholdPercentage = Configuration.INSTANCE.getErrorThresholdPercentage(microserviceName);
singleTestTime = Configuration.INSTANCE.getSingleTestTime(microserviceName);
enableRequestThreshold = Configuration.INSTANCE.getEnableRequestThreshold(microserviceName);
continuousFailureThreshold = Configuration.INSTANCE.getContinuousFailureThreshold(microserviceName);
}

private boolean allowVisit(Server server) {
Expand All @@ -100,15 +103,25 @@ private boolean allowVisit(Server server) {
return true;
}

if ((failureRequest / (double) totalRequest) * PERCENT < errorThresholdPercentage) {
return true;
if (continuousFailureThreshold > 0) {
// continuousFailureThreshold has higher priority to decide the result
if (((CseServer) server).getCountinuousFailureCount() < continuousFailureThreshold) {
return true;
}
} else {
// if continuousFailureThreshold, then check error percentage
if ((failureRequest / (double) totalRequest) * PERCENT < errorThresholdPercentage) {
return true;
}
}

if ((System.currentTimeMillis() - ((CseServer) server).getLastVisitTime()) > singleTestTime) {
LOGGER.info("The Service {}'s instance {} has been break, will give a single test opportunity.",
microserviceName,
server);
return true;
}

LOGGER.warn("The Service {}'s instance {} has been break!", microserviceName, server);
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.servicecomb.loadbalance;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import org.junit.Assert;
Expand Down Expand Up @@ -48,7 +49,7 @@ public void testGetEndpoint() {

@Test
public void testEqualsMethod() {
Assert.assertFalse(cs.equals((Object)"abcd"));
Assert.assertFalse(cs.equals((Object) "abcd"));

CseServer other = new CseServer(transport, new CacheEndpoint("1234", null));
Assert.assertFalse(cs.equals(other));
Expand All @@ -74,4 +75,19 @@ public void testHashCodeMethod() {
cs.hashCode();
assertNotNull(cs.hashCode());
}

@Test
public void testIncrementContinuousFailureCount() {
int countBefore = cs.getCountinuousFailureCount();
cs.incrementContinuousFailureCount();
int countAfter = cs.getCountinuousFailureCount();
assertEquals(countBefore + 1, countAfter);
}

@Test
public void testClearContinuousFailure() {
cs.incrementContinuousFailureCount();
cs.clearContinuousFailure();
assertEquals(0, cs.getCountinuousFailureCount());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ String getMicroserviceName() {
void next(AsyncResponse asyncResp) throws Exception {
asyncResp.handle(sendResponse);
}

};

CseContext.getInstance().setTransportManager(transportManager);
Expand Down Expand Up @@ -379,6 +378,7 @@ long currentTimeMillis() {
result = server;
}
};
int continuousFailureCount = server.getCountinuousFailureCount();

sendResponse = Response.create(Status.BAD_REQUEST, "send failed");

Expand All @@ -392,6 +392,7 @@ long currentTimeMillis() {
loadBalancer.getLoadBalancerStats().getSingleServerStat(server).getSuccessiveConnectionFailureCount());
Assert.assertEquals("InvocationException: code=400;msg=send failed",
result.value.getMessage());
Assert.assertEquals(continuousFailureCount + 1, server.getCountinuousFailureCount());
}

@Test
Expand All @@ -410,6 +411,7 @@ long currentTimeMillis() {
result = server;
}
};
server.incrementContinuousFailureCount();

sendResponse = Response.ok("success");

Expand All @@ -422,15 +424,16 @@ long currentTimeMillis() {
Assert.assertEquals(1,
loadBalancer.getLoadBalancerStats().getSingleServerStat(server).getActiveRequestsCount());
Assert.assertEquals("success", result.value);
Assert.assertEquals(0, server.getCountinuousFailureCount());
}

@Test
public void sendWithRetry() {
Holder<String> result = new Holder<>();
Deencapsulation.invoke(handler, "sendWithRetry", invocation, (AsyncResponse) resp -> {
result.value = resp.getResult();
}, loadBalancer);

// no exception
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public void setUp() throws Exception {
public void tearDown() throws Exception {
IsolationServerListFilter = null;
loadBalancerStats = null;

AbstractConfiguration configuration =
(AbstractConfiguration) DynamicPropertyFactory.getBackingConfigurationSource();
configuration.clearProperty("cse.loadbalance.isolation.continuousFailureThreshold");
}

@Test
Expand Down Expand Up @@ -112,4 +116,50 @@ public void testGetFilteredListOfServers() {
returnedServerList = IsolationServerListFilter.getFilteredListOfServers(serverList);
Assert.assertEquals(returnedServerList.size(), 0);
}

@Test
public void testGetFilteredListOfServersOnContinuousFailureReachesThreshold() {
((AbstractConfiguration) DynamicPropertyFactory.getBackingConfigurationSource())
.addProperty("cse.loadbalance.isolation.continuousFailureThreshold",
"3");
Invocation invocation = Mockito.mock(Invocation.class);
CseServer testServer = Mockito.mock(CseServer.class);
Mockito.when(invocation.getMicroserviceName()).thenReturn("microserviceName");
Mockito.when(testServer.getCountinuousFailureCount()).thenReturn(3);
Mockito.when(testServer.getLastVisitTime()).thenReturn(System.currentTimeMillis());

for (int i = 0; i < 3; ++i) {
loadBalancerStats.incrementNumRequests(testServer);
}

List<Server> serverList = new ArrayList<Server>();
serverList.add(testServer);
IsolationServerListFilter.setLoadBalancerStats(loadBalancerStats);
IsolationServerListFilter.setInvocation(invocation);
List<Server> returnedServerList = IsolationServerListFilter.getFilteredListOfServers(serverList);
Assert.assertEquals(0, returnedServerList.size());
}

@Test
public void testGetFilteredListOfServersOnContinuousFailureIsBelowThreshold() {
((AbstractConfiguration) DynamicPropertyFactory.getBackingConfigurationSource())
.addProperty("cse.loadbalance.isolation.continuousFailureThreshold",
"3");
Invocation invocation = Mockito.mock(Invocation.class);
CseServer testServer = Mockito.mock(CseServer.class);
Mockito.when(invocation.getMicroserviceName()).thenReturn("microserviceName");
Mockito.when(testServer.getCountinuousFailureCount()).thenReturn(2);
Mockito.when(testServer.getLastVisitTime()).thenReturn(System.currentTimeMillis());

for (int i = 0; i < 3; ++i) {
loadBalancerStats.incrementNumRequests(testServer);
}

List<Server> serverList = new ArrayList<Server>();
serverList.add(testServer);
IsolationServerListFilter.setLoadBalancerStats(loadBalancerStats);
IsolationServerListFilter.setInvocation(invocation);
List<Server> returnedServerList = IsolationServerListFilter.getFilteredListOfServers(serverList);
Assert.assertEquals(1, returnedServerList.size());
}
}