Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add local token cache for cluster #3381

Open
wants to merge 3 commits into
base: 1.8
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@
package com.alibaba.csp.sentinel.cluster.client;

import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.alibaba.csp.sentinel.cluster.ClusterConstants;
import com.alibaba.csp.sentinel.cluster.ClusterErrorMessages;
Expand All @@ -35,6 +41,7 @@
import com.alibaba.csp.sentinel.cluster.response.data.FlowTokenResponseData;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.util.StringUtil;
import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;

/**
* Default implementation of {@link ClusterTokenClient}.
Expand All @@ -43,11 +50,30 @@
* @since 1.4.0
*/
public class DefaultClusterTokenClient implements ClusterTokenClient {
public class CachedTokenData {
AtomicInteger count;
AtomicInteger lastStatus;
AtomicLong lastWaitUntilMs;
AtomicInteger lastWaitPrefetchCnt;
AtomicInteger lastRemaining;
public CachedTokenData() {
count = new AtomicInteger(0);
lastStatus = new AtomicInteger(TokenResultStatus.OK);
lastWaitUntilMs = new AtomicLong(0);
lastWaitPrefetchCnt = new AtomicInteger(0);
lastRemaining = new AtomicInteger(0);
}
}

private ClusterTransportClient transportClient;
private TokenServerDescriptor serverDescriptor;

private final AtomicBoolean shouldStart = new AtomicBoolean(false);
private int checkInterval = 2;
ConcurrentHashMap<Long, CachedTokenData> localPrefetchedTokens = new ConcurrentHashMap<>();
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private final ScheduledExecutorService prefetchScheduler = Executors.newScheduledThreadPool(2,
new NamedThreadFactory("sentinel-cluster-prefetch-scheduler", true));

public DefaultClusterTokenClient() {
ClusterClientConfigManager.addServerChangeObserver(new ServerChangeObserver() {
Expand Down Expand Up @@ -146,6 +172,140 @@
return serverDescriptor;
}

public void setInterval(int val) {
checkInterval = val;
}

public void resetCache() {
localPrefetchedTokens.clear();
}

public int currentRuleCached(Long flowId) {
CachedTokenData d = localPrefetchedTokens.get(flowId);
if (d == null) {
return 0;

Check warning on line 186 in sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClient.java

View check run for this annotation

Codecov / codecov/patch

sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClient.java#L186

Added line #L186 was not covered by tests
}
return d.count.get();
}

private void preFetch(Long flowId, CachedTokenData value, int prefetchCnt) {
long waitUntil = value.lastWaitUntilMs.get();
if (waitUntil > 0 && System.currentTimeMillis() < waitUntil) {
return;
}
if (waitUntil > 0) {
value.count.addAndGet(value.lastWaitPrefetchCnt.get());
value.lastStatus.set(TokenResultStatus.OK);
value.lastWaitUntilMs.set(0);
value.lastWaitPrefetchCnt.set(0);
}
int current = value.count.get();
if (current >= prefetchCnt / 2) {
return;
}
if (current < -1 * prefetchCnt) {
// avoid too much prefetch
current = -1 * prefetchCnt;
}
prefetchCnt = prefetchCnt - current;
TokenResult fetched = requestToken(flowId, prefetchCnt, true);
value.lastWaitUntilMs.set(0);
value.lastStatus.set(fetched.getStatus());
value.lastRemaining.set(fetched.getRemaining());
if (fetched.getStatus() == TokenResultStatus.OK) {
value.count.addAndGet(prefetchCnt);
} else if (fetched.getStatus() == TokenResultStatus.SHOULD_WAIT) {
value.lastWaitUntilMs.set(System.currentTimeMillis() + fetched.getWaitInMs());
value.lastWaitPrefetchCnt.set(prefetchCnt);
}
}

private TokenResult tryLocalCachedToken(CachedTokenData data, int acquireCount, int prefetchCnt) {
int count = data.count.get();
TokenResult ret = new TokenResult(data.lastStatus.get());
ret.setFromCached(true);
ret.setRemaining(data.lastRemaining.get());
if (count >= acquireCount) {
// here we allow the concurrency which may cause decrease to negative count, it
// is just skipped some requests
// and it will be refilled by the bg prefetch in next round.
data.count.addAndGet(-1 * acquireCount);
ret.setStatus(TokenResultStatus.OK);
return ret;
}
if (acquireCount > prefetchCnt) {
return null;
}
if (ret.getStatus() == TokenResultStatus.SHOULD_WAIT) {
int newN = data.count.addAndGet(-1 * acquireCount);
if (newN + data.lastWaitPrefetchCnt.get() < -1 * prefetchCnt) {
data.count.addAndGet(acquireCount);
if (acquireCount <= prefetchCnt / 2) {
ret.setStatus(TokenResultStatus.BLOCKED);
return ret;
}
// for the large acquireCount, we can try remote again, since large request will
// much slower which will have less pressure to remote
return null;
}
int waitMs = (int) (data.lastWaitUntilMs.get() - System.currentTimeMillis());
if (waitMs > 0) {
ret.setWaitInMs(waitMs);
}
return ret;
} else if (ret.getStatus() == TokenResultStatus.OK) {
// last ok, but the cached count is not enough, we can preuse it to avoid remote
// request too often,
// otherwise just try remote request
int newN = data.count.addAndGet(-1 * acquireCount);
if (newN < -1 * prefetchCnt * 2) {
// preuse failed since not enough, added it back
data.count.addAndGet(acquireCount);
if (acquireCount <= prefetchCnt / 2) {
// since last is still ok, we should not block directly, make it failover to local
ret.setStatus(TokenResultStatus.FAIL);
return ret;
}
// for the large acquireCount, we can try remote again, since large request will much slower which will have less pressure to remote
return null;
}
// preuse ok
return ret;
} else {
// should fail directly
return ret;
}
}

@Override
public TokenResult requestTokenWithCache(Long flowId, int acquireCount, int prefetchCnt) {
if (notValidRequest(flowId, acquireCount)) {
return badRequest();

Check warning on line 283 in sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClient.java

View check run for this annotation

Codecov / codecov/patch

sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClient.java#L283

Added line #L283 was not covered by tests
}
// try local prefetched first
CachedTokenData data = localPrefetchedTokens.get(flowId);
if (data != null) {
TokenResult ret = tryLocalCachedToken(data, acquireCount, prefetchCnt);
if (ret != null) {
return ret;
}
} else {
localPrefetchedTokens.computeIfAbsent(flowId, k -> {
CachedTokenData v = new CachedTokenData();
prefetchScheduler.scheduleAtFixedRate(() -> {
try {
preFetch(flowId, v, prefetchCnt);
} catch (Throwable e) {
RecordLog.info("[DefaultClusterTokenClient] prefetch failed for flowId {}", flowId, e);

Check warning on line 299 in sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClient.java

View check run for this annotation

Codecov / codecov/patch

sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClient.java#L298-L299

Added lines #L298 - L299 were not covered by tests
}
}, 0, checkInterval, TimeUnit.MILLISECONDS);
return v;
});
}
// fallback to remote request
return requestToken(flowId, acquireCount, true);
}

@Override
public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) {
if (notValidRequest(flowId, acquireCount)) {
Expand Down