diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClient.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClient.java index 1a0acf6c66..3feb6c2969 100644 --- a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClient.java +++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClient.java @@ -16,7 +16,13 @@ package com.alibaba.csp.sentinel.cluster.client; import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import com.alibaba.csp.sentinel.cluster.ClusterConstants; import com.alibaba.csp.sentinel.cluster.ClusterErrorMessages; @@ -35,6 +41,7 @@ import com.alibaba.csp.sentinel.cluster.response.data.FlowTokenResponseData; import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.util.StringUtil; +import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory; /** * Default implementation of {@link ClusterTokenClient}. @@ -43,11 +50,30 @@ * @since 1.4.0 */ public class DefaultClusterTokenClient implements ClusterTokenClient { + public class CachedTokenData { + AtomicInteger count; + AtomicInteger lastStatus; + AtomicLong lastWaitUntilMs; + AtomicInteger lastWaitPrefetchCnt; + AtomicInteger lastRemaining; + public CachedTokenData() { + count = new AtomicInteger(0); + lastStatus = new AtomicInteger(TokenResultStatus.OK); + lastWaitUntilMs = new AtomicLong(0); + lastWaitPrefetchCnt = new AtomicInteger(0); + lastRemaining = new AtomicInteger(0); + } + } private ClusterTransportClient transportClient; private TokenServerDescriptor serverDescriptor; private final AtomicBoolean shouldStart = new AtomicBoolean(false); + private int checkInterval = 2; + ConcurrentHashMap localPrefetchedTokens = new ConcurrentHashMap<>(); + @SuppressWarnings("PMD.ThreadPoolCreationRule") + private final ScheduledExecutorService prefetchScheduler = Executors.newScheduledThreadPool(2, + new NamedThreadFactory("sentinel-cluster-prefetch-scheduler", true)); public DefaultClusterTokenClient() { ClusterClientConfigManager.addServerChangeObserver(new ServerChangeObserver() { @@ -146,6 +172,141 @@ public TokenServerDescriptor currentServer() { return serverDescriptor; } + public void setInterval(int val) { + checkInterval = val; + } + + public void resetCache() { + localPrefetchedTokens.clear(); + } + + public int currentRuleCached(Long flowId) { + CachedTokenData d = localPrefetchedTokens.get(flowId); + if (d == null) { + return 0; + } + return d.count.get(); + } + + private void preFetch(Long flowId, CachedTokenData value, int prefetchCnt) { + long waitUntil = value.lastWaitUntilMs.get(); + if (waitUntil > 0 && System.currentTimeMillis() < waitUntil) { + return; + } + if (waitUntil > 0) { + value.count.addAndGet(value.lastWaitPrefetchCnt.get()); + value.lastStatus.set(TokenResultStatus.OK); + value.lastWaitUntilMs.set(0); + value.lastWaitPrefetchCnt.set(0); + } + int current = value.count.get(); + if (current >= prefetchCnt / 2) { + return; + } + if (current < -1 * prefetchCnt) { + // avoid too much prefetch + current = -1 * prefetchCnt; + } + prefetchCnt = prefetchCnt - current; + TokenResult fetched = requestToken(flowId, prefetchCnt, true); + value.lastWaitUntilMs.set(0); + value.lastStatus.set(fetched.getStatus()); + value.lastRemaining.set(fetched.getRemaining()); + if (fetched.getStatus() == TokenResultStatus.OK) { + value.count.addAndGet(prefetchCnt); + } else if (fetched.getStatus() == TokenResultStatus.SHOULD_WAIT) { + value.lastWaitUntilMs.set(System.currentTimeMillis() + fetched.getWaitInMs()); + value.lastWaitPrefetchCnt.set(prefetchCnt); + } + } + + private TokenResult tryLocalCachedToken(CachedTokenData data, int acquireCount, int prefetchCnt) { + int count = data.count.get(); + TokenResult ret = new TokenResult(data.lastStatus.get()); + ret.setFromCached(true); + ret.setRemaining(data.lastRemaining.get()); + if (count >= acquireCount) { + // here we allow the concurrency which may cause decrease to negative count, it + // is just skipped some requests + // and it will be refilled by the bg prefetch in next round. + data.count.addAndGet(-1 * acquireCount); + ret.setStatus(TokenResultStatus.OK); + return ret; + } + if (acquireCount > prefetchCnt) { + return null; + } + if (ret.getStatus() == TokenResultStatus.SHOULD_WAIT) { + int newN = data.count.addAndGet(-1 * acquireCount); + if (newN + data.lastWaitPrefetchCnt.get() < -1 * prefetchCnt) { + data.count.addAndGet(acquireCount); + if (acquireCount <= prefetchCnt / 2) { + // since last status is still waiting, we should not block directly, make it failover to local + ret.setStatus(TokenResultStatus.FAIL); + return ret; + } + // for the large acquireCount, we can try remote again, since large request will + // much slower which will have less pressure to remote + return null; + } + int waitMs = (int) (data.lastWaitUntilMs.get() - System.currentTimeMillis()); + if (waitMs > 0) { + ret.setWaitInMs(waitMs); + } + return ret; + } else if (ret.getStatus() == TokenResultStatus.OK) { + // last ok, but the cached count is not enough, we can preuse it to avoid remote + // request too often, + // otherwise just try remote request + int newN = data.count.addAndGet(-1 * acquireCount); + if (newN < -1 * prefetchCnt * 2) { + // preuse failed since not enough, added it back + data.count.addAndGet(acquireCount); + if (acquireCount <= prefetchCnt / 2) { + // since last is still ok, we should not block directly, make it failover to local + ret.setStatus(TokenResultStatus.FAIL); + return ret; + } + // for the large acquireCount, we can try remote again, since large request will much slower which will have less pressure to remote + return null; + } + // preuse ok + return ret; + } else { + // should fail directly + return ret; + } + } + + @Override + public TokenResult requestTokenWithCache(Long flowId, int acquireCount, int prefetchCnt) { + if (notValidRequest(flowId, acquireCount)) { + return badRequest(); + } + // try local prefetched first + CachedTokenData data = localPrefetchedTokens.get(flowId); + if (data != null) { + TokenResult ret = tryLocalCachedToken(data, acquireCount, prefetchCnt); + if (ret != null) { + return ret; + } + } else { + localPrefetchedTokens.computeIfAbsent(flowId, k -> { + CachedTokenData v = new CachedTokenData(); + prefetchScheduler.scheduleAtFixedRate(() -> { + try { + preFetch(flowId, v, prefetchCnt); + } catch (Throwable e) { + RecordLog.info("[DefaultClusterTokenClient] prefetch failed for flowId {}", flowId, e); + } + }, 0, checkInterval, TimeUnit.MILLISECONDS); + return v; + }); + } + // fallback to remote request + return requestToken(flowId, acquireCount, true); + } + @Override public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) { if (notValidRequest(flowId, acquireCount)) { diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/test/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClientTest.java b/sentinel-cluster/sentinel-cluster-client-default/src/test/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClientTest.java new file mode 100644 index 0000000000..cf0cee86af --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-client-default/src/test/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClientTest.java @@ -0,0 +1,432 @@ +package com.alibaba.csp.sentinel.cluster.client; + +import org.junit.Test; +import org.mockito.Spy; +import org.junit.runner.RunWith; +import org.junit.Assert; +import org.junit.FixMethodOrder; +import org.junit.runners.MethodSorters; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import org.mockito.junit.MockitoJUnitRunner; + +import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory; +import com.alibaba.csp.sentinel.cluster.TokenResult; +import com.alibaba.csp.sentinel.cluster.TokenResultStatus; + +@RunWith(MockitoJUnitRunner.class) +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class DefaultClusterTokenClientTest { + + @Spy + DefaultClusterTokenClient client = new DefaultClusterTokenClient(); + final int testInterval = 60; + + @Test + public void testClientCacheWithRemoteOK() throws Exception { + client.setInterval(testInterval); + client.resetCache(); + doReturn(new TokenResult(TokenResultStatus.OK)).when(client).requestToken(anyLong(), anyInt(), anyBoolean()); + int prefetch = 10; + TokenResult ret = client.requestTokenWithCache(1L, 1, prefetch); + // first should remote + Assert.assertTrue(!ret.isFromCached()); + Assert.assertEquals((long)TokenResultStatus.OK, (long)ret.getStatus()); + Assert.assertEquals(0, ret.getWaitInMs()); + + Thread.sleep(testInterval); + Assert.assertEquals(prefetch, client.currentRuleCached(1L)); + for (int i = 0;i < prefetch * 3; i++) { + ret = client.requestTokenWithCache(1L, 1, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus()); + Assert.assertEquals(0, ret.getWaitInMs()); + } + Assert.assertEquals(-1 * prefetch * 2, client.currentRuleCached(1L)); + for (int cnt = 1; cnt <= prefetch / 2; cnt++) { + ret = client.requestTokenWithCache(1L, cnt, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.FAIL, (long) ret.getStatus()); + } + ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch); + Assert.assertTrue(!ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus()); + + ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch); + Assert.assertTrue(!ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus()); + Assert.assertEquals(0, ret.getWaitInMs()); + + Thread.sleep(testInterval * 2); + Assert.assertEquals(prefetch, client.currentRuleCached(1L)); + // should refill prefetch * 2 in once to make sure we have at least prefetch count in cache + ret = client.requestTokenWithCache(1L, prefetch, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus()); + Assert.assertEquals(0, ret.getWaitInMs()); + Assert.assertEquals(0, client.currentRuleCached(1L)); + + Thread.sleep(testInterval); + Assert.assertEquals(prefetch, client.currentRuleCached(1L)); + ret = client.requestTokenWithCache(1L, prefetch / 2, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus()); + Assert.assertEquals(0, ret.getWaitInMs()); + Assert.assertEquals(prefetch / 2, client.currentRuleCached(1L)); + + Thread.sleep(testInterval); + Assert.assertEquals(prefetch / 2, client.currentRuleCached(1L)); + ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch); + // use less than half will not refill, so cache is not enough + Assert.assertTrue(!ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus()); + Assert.assertEquals(0, ret.getWaitInMs()); + + ret = client.requestTokenWithCache(1L, 1, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus()); + Assert.assertEquals(0, ret.getWaitInMs()); + Assert.assertEquals(prefetch / 2 - 1, client.currentRuleCached(1L)); + // refill at least prefetch at once, so we can get at most 1.5 * prefetch in cache + Thread.sleep(testInterval); + Assert.assertEquals(prefetch, client.currentRuleCached(1L)); + ret = client.requestTokenWithCache(1L, prefetch, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus()); + Assert.assertEquals(0, ret.getWaitInMs()); + Assert.assertEquals(0, client.currentRuleCached(1L)); + } + + @Test + public void testClientCacheWithRemoteWait() throws Exception { + client.setInterval(testInterval); + client.resetCache(); + doReturn(new TokenResult(TokenResultStatus.OK)).when(client).requestToken(anyLong(), anyInt(), anyBoolean()); + int prefetch = 10; + TokenResult ret = client.requestTokenWithCache(1L, 1, prefetch); + // first should remote + Assert.assertTrue(!ret.isFromCached()); + Assert.assertEquals((long)TokenResultStatus.OK, (long)ret.getStatus()); + Assert.assertEquals(0, ret.getWaitInMs()); + + Thread.sleep(testInterval); + + TokenResult waitResult = new TokenResult(TokenResultStatus.SHOULD_WAIT); + waitResult.setWaitInMs(testInterval * 4); + doReturn(waitResult).when(client).requestToken(anyLong(), anyInt(), anyBoolean()); + + ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch); + Assert.assertTrue(!ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus()); + Assert.assertEquals(waitResult.getWaitInMs(), ret.getWaitInMs()); + + for (int i = 0;i < prefetch * 3; i++) { + ret = client.requestTokenWithCache(1L, 1, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus()); + Assert.assertEquals(0, ret.getWaitInMs()); + } + Assert.assertEquals(-1 * prefetch * 2, client.currentRuleCached(1L)); + + for (int cnt = 1; cnt <= prefetch / 2; cnt++) { + ret = client.requestTokenWithCache(1L, cnt, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.FAIL, (long) ret.getStatus()); + } + ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch); + Assert.assertTrue(!ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus()); + Assert.assertEquals(waitResult.getWaitInMs(), ret.getWaitInMs()); + + Thread.sleep(testInterval * 2); + // prefetch count will be 2 * prefetch, and last status became should wait + Assert.assertEquals(-1 * prefetch * 2, client.currentRuleCached(1L)); + // refill will be waited until the timeout + ret = client.requestTokenWithCache(1L, 1, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus()); + Assert.assertTrue(String.format("wait ms not as expected: %d", ret.getWaitInMs()), waitResult.getWaitInMs() - testInterval * 3 <= ret.getWaitInMs()); + Assert.assertTrue(String.format("wait ms not as expected: %d", ret.getWaitInMs()), waitResult.getWaitInMs() - testInterval > ret.getWaitInMs()); + + ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus()); + Assert.assertTrue(String.format("wait ms not as expected: %d", ret.getWaitInMs()), waitResult.getWaitInMs() - testInterval * 3 <= ret.getWaitInMs()); + Assert.assertTrue(String.format("wait ms not as expected: %d", ret.getWaitInMs()), waitResult.getWaitInMs() - testInterval > ret.getWaitInMs()); + + ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch); + Assert.assertTrue(!ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus()); + Assert.assertEquals(waitResult.getWaitInMs(), ret.getWaitInMs()); + + Thread.sleep(testInterval); + Assert.assertEquals(-1 * prefetch * 2 - 1 - prefetch / 2 - 1, client.currentRuleCached(1L)); + // wait the timeout + Thread.sleep(waitResult.getWaitInMs()); + // the prefetch count should be added to the count + Assert.assertEquals(- 1 - prefetch / 2 - 1, client.currentRuleCached(1L)); + + doReturn(new TokenResult(TokenResultStatus.OK)).when(client).requestToken(anyLong(), anyInt(), anyBoolean()); + Thread.sleep(waitResult.getWaitInMs()); + Thread.sleep(testInterval); + Assert.assertEquals(prefetch, client.currentRuleCached(1L)); + ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch); + Assert.assertTrue(!ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus()); + + doReturn(waitResult).when(client).requestToken(anyLong(), anyInt(), anyBoolean()); + + ret = client.requestTokenWithCache(1L, prefetch - 1, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus()); + Assert.assertEquals(0, ret.getWaitInMs()); + Assert.assertEquals(1, client.currentRuleCached(1L)); + + Thread.sleep(testInterval); + // refill will be waiting and the last state became waiting, but local has some tokens + ret = client.requestTokenWithCache(1L, 1, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus()); + Assert.assertEquals(0, ret.getWaitInMs()); + + Assert.assertEquals(0, client.currentRuleCached(1L)); + + ret = client.requestTokenWithCache(1L, prefetch, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus()); + Assert.assertTrue(String.format("wait ms not as expected: %d", ret.getWaitInMs()), waitResult.getWaitInMs() - testInterval * 2 <= ret.getWaitInMs()); + Assert.assertTrue(String.format("wait ms not as expected: %d", ret.getWaitInMs()), waitResult.getWaitInMs() > ret.getWaitInMs()); + ret = client.requestTokenWithCache(1L, prefetch - 1, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus()); + + ret = client.requestTokenWithCache(1L, 1, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.FAIL, (long) ret.getStatus()); + + ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch); + Assert.assertTrue(!ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus()); + Assert.assertEquals(waitResult.getWaitInMs(), ret.getWaitInMs()); + + Assert.assertEquals(-1 * prefetch * 2 + 1, client.currentRuleCached(1L)); + Thread.sleep(testInterval); + Assert.assertEquals(-1 * prefetch * 2 + 1, client.currentRuleCached(1L)); + // refill will be waiting + ret = client.requestTokenWithCache(1L, 1, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.FAIL, (long) ret.getStatus()); + ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch); + Assert.assertTrue(!ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus()); + Assert.assertEquals(waitResult.getWaitInMs(), ret.getWaitInMs()); + ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch); + Assert.assertTrue(!ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.SHOULD_WAIT, (long) ret.getStatus()); + Assert.assertEquals(waitResult.getWaitInMs(), ret.getWaitInMs()); + + Thread.sleep(waitResult.getWaitInMs() + testInterval); + // the prefetch count should be added to the count + Assert.assertEquals(-1 * prefetch, client.currentRuleCached(1L)); + Thread.sleep(waitResult.getWaitInMs() + testInterval); + Assert.assertEquals(prefetch, client.currentRuleCached(1L)); + } + + @Test + public void testClientCacheWithRemoteBlocked() throws Exception { + client.setInterval(testInterval); + client.resetCache(); + doReturn(new TokenResult(TokenResultStatus.OK)).when(client).requestToken(anyLong(), anyInt(), anyBoolean()); + int prefetch = 10; + TokenResult ret = client.requestTokenWithCache(1L, 1, prefetch); + // first should remote + Assert.assertTrue(!ret.isFromCached()); + Assert.assertEquals((long)TokenResultStatus.OK, (long)ret.getStatus()); + Assert.assertEquals(0, ret.getWaitInMs()); + + Thread.sleep(testInterval); + // begin test while remote refused + doReturn(new TokenResult(TokenResultStatus.BLOCKED)).when(client).requestToken(anyLong(), anyInt(), anyBoolean()); + + ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch); + Assert.assertTrue(!ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus()); + Assert.assertEquals(0, ret.getWaitInMs()); + + for (int i = 0;i < prefetch * 3; i++) { + ret = client.requestTokenWithCache(1L, 1, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus()); + Assert.assertEquals(0, ret.getWaitInMs()); + } + Assert.assertEquals(-1 * prefetch * 2, client.currentRuleCached(1L)); + + for (int cnt = 1; cnt <= prefetch / 2; cnt++) { + ret = client.requestTokenWithCache(1L, cnt, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.FAIL, (long) ret.getStatus()); + } + ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch); + Assert.assertTrue(!ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus()); + + Thread.sleep(testInterval); + // refill will be blocked + ret = client.requestTokenWithCache(1L, 1, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus()); + ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus()); + ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch); + Assert.assertTrue(!ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus()); + Assert.assertEquals(0, ret.getWaitInMs()); + + + doReturn(new TokenResult(TokenResultStatus.OK)).when(client).requestToken(anyLong(), anyInt(), anyBoolean()); + Thread.sleep(testInterval * 2); + Assert.assertEquals(prefetch, client.currentRuleCached(1L)); + + doReturn(new TokenResult(TokenResultStatus.BLOCKED)).when(client).requestToken(anyLong(), anyInt(), anyBoolean()); + + ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch); + Assert.assertTrue(!ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus()); + Assert.assertEquals(0, ret.getWaitInMs()); + + for (int i = 0;i < prefetch / 2 + 1; i++) { + ret = client.requestTokenWithCache(1L, 1, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus()); + Assert.assertEquals(0, ret.getWaitInMs()); + } + Thread.sleep(testInterval); + Assert.assertEquals(prefetch - prefetch / 2 - 1, client.currentRuleCached(1L)); + // refill will be blocked and the last state became blocked, but local has some tokens + ret = client.requestTokenWithCache(1L, prefetch - prefetch / 2 - 1, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus()); + Assert.assertEquals(0, ret.getWaitInMs()); + Assert.assertEquals(0, client.currentRuleCached(1L)); + + for (int cnt = 1; cnt <= prefetch / 2; cnt++) { + ret = client.requestTokenWithCache(1L, cnt, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus()); + } + ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus()); + + Thread.sleep(testInterval); + // refill will be blocked + ret = client.requestTokenWithCache(1L, 1, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus()); + ret = client.requestTokenWithCache(1L, prefetch / 2 + 1, prefetch); + Assert.assertTrue(ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus()); + ret = client.requestTokenWithCache(1L, prefetch + 1, prefetch); + Assert.assertTrue(!ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.BLOCKED, (long) ret.getStatus()); + Assert.assertEquals(0, ret.getWaitInMs()); + } + + @Test + public void testConcurrencyRequestClientCache() throws Exception { + client.setInterval(1); + client.resetCache(); + doReturn(new TokenResult(TokenResultStatus.OK)).when(client).requestToken(anyLong(), anyInt(), anyBoolean()); + int prefetch = 200; + + ScheduledExecutorService testScheduler = Executors.newScheduledThreadPool(16, + new NamedThreadFactory("test-scheduler", true)); + + AtomicInteger blocked = new AtomicInteger(); + AtomicInteger failed = new AtomicInteger(); + AtomicInteger ok = new AtomicInteger(); + AtomicInteger cached = new AtomicInteger(); + AtomicInteger notCached = new AtomicInteger(); + AtomicBoolean stopped = new AtomicBoolean(false); + for (int concurrency = 0; concurrency < 8; concurrency++) { + testScheduler.submit(() -> { + System.out.println("running begin"); + for (int loop = 0; loop < 200; loop++) { + for (int cnt = 1; cnt < prefetch * 2; cnt++) { + TokenResult ret = client.requestTokenWithCache(1L, cnt, prefetch); + if (cnt > prefetch * 1.5) { + Assert.assertTrue(!ret.isFromCached()); + Assert.assertEquals((long) TokenResultStatus.OK, (long) ret.getStatus()); + notCached.incrementAndGet(); + } else { + if (ret.getStatus() == TokenResultStatus.BLOCKED) { + Assert.assertTrue(ret.isFromCached()); + blocked.incrementAndGet(); + cached.incrementAndGet(); + } else if (ret.getStatus() == TokenResultStatus.FAIL) { + Assert.assertTrue(ret.isFromCached()); + failed.incrementAndGet(); + cached.incrementAndGet(); + } else { + ok.incrementAndGet(); + if (ret.isFromCached()) { + cached.incrementAndGet(); + } else { + notCached.incrementAndGet(); + } + } + } + Assert.assertEquals(0, ret.getWaitInMs()); + if (cnt % 50 == 0) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + } + } + } + if (stopped.get()) { + break; + } + try { + Thread.sleep(1); + } catch (InterruptedException e) {} + } + System.out.println("running done"); + }); + } + + testScheduler.submit(() -> { + for (; !stopped.get() ;) { + System.out.println("current rule cached: " + client.currentRuleCached(1L)); + System.out.println("current failed: " + failed.get() + ", passed: " + ok.get() + ", cached: " + cached.get() + ", not cached: " + notCached.get()); + try { + Thread.sleep(3); + } catch (InterruptedException e) { + } + } + }); + Thread.sleep(2000); + + stopped.set(true); + testScheduler.shutdown(); + testScheduler.awaitTermination(1, TimeUnit.SECONDS); + + System.out.println("current rule cached: " + client.currentRuleCached(1L)); + System.out.println("current failed: " + failed.get() + ", passed: " + ok.get() + ", cached: " + cached.get() + + ", not cached: " + notCached.get()); + Assert.assertTrue(blocked.get() + failed.get() < cached.get()); + Assert.assertTrue(cached.get() + notCached.get() > blocked.get() + ok.get() + failed.get()); + Assert.assertTrue(failed.get() > 0); + Assert.assertTrue(ok.get() > 0); + Assert.assertTrue(client.currentRuleCached(1L) >= prefetch / 2); + } +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowChecker.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowChecker.java index df69bef9dc..963d421a47 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowChecker.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowChecker.java @@ -37,13 +37,14 @@ final class ClusterFlowChecker { private static double calcGlobalThreshold(FlowRule rule) { double count = rule.getCount(); + double preCnt = rule.getClusterConfig().getPrefetchCntRatio() * count; switch (rule.getClusterConfig().getThresholdType()) { case ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL: - return count; + return count + preCnt; case ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL: default: int connectedCount = ClusterFlowRuleManager.getConnectedCount(rule.getClusterConfig().getFlowId()); - return count * connectedCount; + return count * connectedCount + preCnt; } } diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/DefaultTokenService.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/DefaultTokenService.java index 21d82acdf7..18a9030d5b 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/DefaultTokenService.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/DefaultTokenService.java @@ -35,6 +35,11 @@ @Spi(isDefault = true) public class DefaultTokenService implements TokenService { + @Override + public TokenResult requestTokenWithCache(Long ruleId, int acquireCount, int prefetchCnt) { + return requestToken(ruleId, acquireCount, true); + } + @Override public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) { if (notValidRequest(ruleId, acquireCount)) { diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/DefaultEmbeddedTokenServer.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/DefaultEmbeddedTokenServer.java index 9fb21bdde5..8f238fe618 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/DefaultEmbeddedTokenServer.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/DefaultEmbeddedTokenServer.java @@ -43,6 +43,11 @@ public void stop() throws Exception { server.stop(); } + @Override + public TokenResult requestTokenWithCache(Long ruleId, int acquireCount, int prefetchCnt) { + return requestToken(ruleId, acquireCount, true); + } + @Override public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) { if (tokenService != null) { diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/TokenResult.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/TokenResult.java index b16b76f0e8..20badfa487 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/TokenResult.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/TokenResult.java @@ -34,6 +34,8 @@ public class TokenResult { private Map attachments; + private boolean isFromCached; + public TokenResult() { } @@ -85,6 +87,15 @@ public TokenResult setAttachments(Map attachments) { return this; } + public boolean isFromCached() { + return isFromCached; + } + + public TokenResult setFromCached(boolean fromCached) { + isFromCached = fromCached; + return this; + } + @Override public String toString() { return "TokenResult{" + @@ -93,6 +104,7 @@ public String toString() { ", waitInMs=" + waitInMs + ", attachments=" + attachments + ", tokenId=" + tokenId + + ", isFromCached=" + isFromCached + '}'; } } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/TokenService.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/TokenService.java index 9e8c7a49fe..f079b82ddb 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/TokenService.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/TokenService.java @@ -35,6 +35,15 @@ public interface TokenService { */ TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized); + /** + * Request tokens from cache and remote token server. + * + * @param ruleId the unique rule ID + * @param acquireCount token count to acquire + * @return result of the token request + */ + TokenResult requestTokenWithCache(Long ruleId, int acquireCount, int prefetchCnt); + /** * Request tokens for a specific parameter from remote token server. * diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/DefaultNode.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/DefaultNode.java index f878637520..975b02752a 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/DefaultNode.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/DefaultNode.java @@ -118,6 +118,12 @@ public void increaseExceptionQps(int count) { this.clusterNode.increaseExceptionQps(count); } + @Override + public void increaseFallbackQps(int count) { + super.increaseFallbackQps(count); + this.clusterNode.increaseFallbackQps(count); + } + @Override public void addRtAndSuccess(long rt, int successCount) { super.addRtAndSuccess(rt, successCount); diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/Node.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/Node.java index 0927d8e43c..5a51ce5f47 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/Node.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/Node.java @@ -67,6 +67,8 @@ public interface Node extends OccupySupport, DebugSupport { * @return total business exception count per minute */ long totalException(); + + long totalFallback(); /** * Get pass request per second. @@ -109,6 +111,8 @@ public interface Node extends OccupySupport, DebugSupport { * @return QPS of exception occurs */ double exceptionQps(); + + double fallbackQps(); /** * Get average rt per second. @@ -186,6 +190,8 @@ public interface Node extends OccupySupport, DebugSupport { */ void increaseExceptionQps(int count); + void increaseFallbackQps(int count); + /** * Increase current thread count. */ diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/StatisticNode.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/StatisticNode.java index 82777a4404..a7cb6aad77 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/StatisticNode.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/StatisticNode.java @@ -149,6 +149,7 @@ private boolean isValidMetricNode(MetricNode node) { @Override public void reset() { rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL); + rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false); } @Override @@ -196,6 +197,16 @@ public long totalException() { return rollingCounterInMinute.exception(); } + @Override + public double fallbackQps() { + return rollingCounterInSecond.fallback() / rollingCounterInSecond.getWindowIntervalInSec(); + } + + @Override + public long totalFallback() { + return rollingCounterInMinute.fallback(); + } + @Override public double passQps() { return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec(); @@ -269,6 +280,12 @@ public void increaseExceptionQps(int count) { rollingCounterInMinute.addException(count); } + @Override + public void increaseFallbackQps(int count) { + rollingCounterInSecond.addFallback(count); + rollingCounterInMinute.addFallback(count); + } + @Override public void increaseThreadNum() { curThreadNum.increment(); diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/ClusterFlowConfig.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/ClusterFlowConfig.java index 0ce97548ff..6683483e0a 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/ClusterFlowConfig.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/ClusterFlowConfig.java @@ -73,6 +73,27 @@ public class ClusterFlowConfig { */ private long clientOfflineTime = 2000; + // prefetch ratio if used to calculate prefetch count (ratio * count) for each request, prefetch can be used + // to reduce the number of requests to the server and reduce the latency for the request. + // 0: disable prefetch. + // value should be in the range [0.001 ~ 0.1] + // We suggest disable prefetch if the limit count is less than 100, + // for count <100, suggest ratio 0.1 + // for count 100~1000, suggest ratio 0.06 + // for count 1000~10000, suggest ratio 0.04 + // for count 10000~100000, suggest ratio 0.01 + // for count 100000~1M, suggest ratio 0.01~0.002 + // note the prefetch count should less than 50% of the avg count of each client (cluster total count / nums of clients). + private double prefetchCntRatio = 0; + + public double getPrefetchCntRatio() { + return prefetchCntRatio; + } + + public void setPrefetchCntRatio(double prefetchCntRatio) { + this.prefetchCntRatio = prefetchCntRatio; + } + public long getResourceTimeout() { return resourceTimeout; } @@ -197,6 +218,9 @@ public boolean equals(Object o) { if (acquireRefuseStrategy != that.acquireRefuseStrategy) { return false; } + if (Double.compare(prefetchCntRatio, that.prefetchCntRatio) != 0) { + return false; + } return Objects.equals(flowId, that.flowId); } @@ -212,6 +236,7 @@ public int hashCode() { result = (int) (31 * result + clientOfflineTime); result = 31 * result + resourceTimeoutStrategy; result = 31 * result + acquireRefuseStrategy; + result = (int) (31 * result + prefetchCntRatio); return result; } @@ -228,6 +253,7 @@ public String toString() { ", resourceTimeoutStrategy=" + resourceTimeoutStrategy + ", acquireRefuseStrategy=" + acquireRefuseStrategy + ", clientOfflineTime=" + clientOfflineTime + + ", prefetchCntRatio=" + prefetchCntRatio + '}'; } } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java index 26d34d39b4..cef3262597 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java @@ -152,8 +152,15 @@ private static boolean passClusterCheck(FlowRule rule, Context context, DefaultN return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized); } long flowId = rule.getClusterConfig().getFlowId(); - TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized); - return applyTokenResult(result, rule, context, node, acquireCount, prioritized); + double ratio = rule.getClusterConfig().getPrefetchCntRatio(); + int prefetchCnt = (int)(ratio * rule.getCount()); + if (prefetchCnt > 1) { + TokenResult result = clusterService.requestTokenWithCache(flowId, acquireCount, prefetchCnt); + return applyTokenResult(result, rule, context, node, acquireCount, prioritized); + } else { + TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized); + return applyTokenResult(result, rule, context, node, acquireCount, prioritized); + } // If client is absent, then fallback to local mode. } catch (Throwable ex) { RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex); @@ -166,6 +173,7 @@ private static boolean passClusterCheck(FlowRule rule, Context context, DefaultN private static boolean fallbackToLocalOrPass(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { if (rule.getClusterConfig().isFallbackToLocalWhenFail()) { + node.increaseFallbackQps(acquireCount); return passLocalCheck(rule, context, node, acquireCount, prioritized); } else { // The rule won't be activated, just pass. @@ -192,7 +200,11 @@ private static boolean applyTokenResult(/*@NonNull*/ TokenResult result, FlowRul case TokenResultStatus.SHOULD_WAIT: // Wait for next tick. try { - Thread.sleep(result.getWaitInMs()); + int waitMs = result.getWaitInMs(); + if (waitMs > rule.getMaxQueueingTimeMs()) { + waitMs = rule.getMaxQueueingTimeMs(); + } + Thread.sleep(waitMs); } catch (InterruptedException e) { e.printStackTrace(); } @@ -207,4 +219,4 @@ private static boolean applyTokenResult(/*@NonNull*/ TokenResult result, FlowRul return false; } } -} \ No newline at end of file +} diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/MetricEvent.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/MetricEvent.java index 73dbd2721d..8fbd684cbd 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/MetricEvent.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/MetricEvent.java @@ -35,5 +35,7 @@ public enum MetricEvent { /** * Passed in future quota (pre-occupied, since 1.5.0). */ - OCCUPIED_PASS + OCCUPIED_PASS, + // fallback pass in cluster mode + FALLBACK } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/data/MetricBucket.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/data/MetricBucket.java index aaeb87e9d9..0dcb5e1003 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/data/MetricBucket.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/data/MetricBucket.java @@ -91,6 +91,10 @@ public long exception() { return get(MetricEvent.EXCEPTION); } + public long fallback() { + return get(MetricEvent.FALLBACK); + } + public long rt() { return get(MetricEvent.RT); } @@ -115,6 +119,10 @@ public void addException(int n) { add(MetricEvent.EXCEPTION, n); } + public void addFallback(int n) { + add(MetricEvent.FALLBACK, n); + } + public void addBlock(int n) { add(MetricEvent.BLOCK, n); } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/ArrayMetric.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/ArrayMetric.java index c21223a78c..b529875261 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/ArrayMetric.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/ArrayMetric.java @@ -93,6 +93,17 @@ public long exception() { return exception; } + @Override + public long fallback() { + data.currentWindow(); + long fallback = 0; + List list = data.values(); + for (MetricBucket window : list) { + fallback += window.fallback(); + } + return fallback; + } + @Override public long block() { data.currentWindow(); @@ -216,6 +227,12 @@ public void addException(int count) { wrap.value().addException(count); } + @Override + public void addFallback(int count) { + WindowWrap wrap = data.currentWindow(); + wrap.value().addFallback(count); + } + @Override public void addBlock(int count) { WindowWrap wrap = data.currentWindow(); diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/Metric.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/Metric.java index b79cd881bf..a42f3298bb 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/Metric.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/Metric.java @@ -50,6 +50,13 @@ public interface Metric extends DebugSupport { */ long exception(); + /** + * Get total fallback count. + * + * @return fallback count + */ + long fallback(); + /** * Get total block count. * @@ -108,6 +115,13 @@ public interface Metric extends DebugSupport { */ void addException(int n); + /** + * Add current fallback count. + * + * @param n count to add + */ + void addFallback(int n); + /** * Add current block count. * diff --git a/sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/command/vo/NodeVo.java b/sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/command/vo/NodeVo.java index d3e0ebbd55..50c9093cbb 100755 --- a/sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/command/vo/NodeVo.java +++ b/sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/command/vo/NodeVo.java @@ -39,9 +39,11 @@ public class NodeVo { private Long averageRt; private Long successQps; private Long exceptionQps; + private Long fallbackQps; private Long oneMinutePass; private Long oneMinuteBlock; private Long oneMinuteException; + private Long oneMinuteFallback; private Long oneMinuteTotal; private Long timestamp; @@ -69,7 +71,9 @@ public static NodeVo fromDefaultNode(DefaultNode node, String parentId) { vo.averageRt = (long) node.avgRt(); vo.successQps = (long) node.successQps(); vo.exceptionQps = (long) node.exceptionQps(); + vo.fallbackQps = (long) node.fallbackQps(); vo.oneMinuteException = node.totalException(); + vo.oneMinuteFallback = node.totalFallback(); vo.oneMinutePass = node.totalRequest() - node.blockRequest(); vo.oneMinuteBlock = node.blockRequest(); vo.oneMinuteTotal = node.totalRequest(); @@ -108,7 +112,9 @@ public static NodeVo fromClusterNode(String name, ClusterNode node) { vo.averageRt = (long) node.avgRt(); vo.successQps = (long) node.successQps(); vo.exceptionQps = (long) node.exceptionQps(); + vo.fallbackQps = (long) node.fallbackQps(); vo.oneMinuteException = node.totalException(); + vo.oneMinuteFallback = node.totalFallback(); vo.oneMinutePass = node.totalRequest() - node.blockRequest(); vo.oneMinuteBlock = node.blockRequest(); vo.oneMinuteTotal = node.totalRequest(); @@ -204,6 +210,22 @@ public void setOneMinuteException(Long oneMinuteException) { this.oneMinuteException = oneMinuteException; } + public Long getFallbackQps() { + return fallbackQps; + } + + public void setFallbackQps(Long fallbackQps) { + this.fallbackQps = fallbackQps; + } + + public Long getOneMinuteFallback() { + return oneMinuteFallback; + } + + public void setOneMinuteFallback(Long oneMinuteFallback) { + this.oneMinuteFallback = oneMinuteFallback; + } + public Long getOneMinutePass() { return oneMinutePass; }