Skip to content

Commit

Permalink
introduce intermediate node to avoid ABA problem
Browse files Browse the repository at this point in the history
  • Loading branch information
robberphex committed Apr 4, 2024
1 parent 8d90fcf commit e8e293f
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 78 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
* Copyright 1999-2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,16 +15,6 @@
*/
package com.alibaba.csp.sentinel.slots.block.flow.param;

import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import com.alibaba.csp.sentinel.cluster.ClusterStateManager;
import com.alibaba.csp.sentinel.cluster.TokenResult;
import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
Expand All @@ -37,6 +27,13 @@
import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap;
import com.alibaba.csp.sentinel.util.TimeUtil;

import java.lang.reflect.Array;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
* Rule checker for parameter flow control.
*
Expand All @@ -46,7 +43,7 @@
public final class ParamFlowChecker {

public static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count,
Object... args) {
Object... args) {
if (args == null) {
return true;
}
Expand Down Expand Up @@ -79,7 +76,7 @@ private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlow
Object value) {
try {
if (Collection.class.isAssignableFrom(value.getClass())) {
for (Object param : ((Collection)value)) {
for (Object param : ((Collection) value)) {
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
Expand Down Expand Up @@ -117,7 +114,7 @@ static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRu
int itemThreshold = rule.getParsedHotItems().get(value);
return ++threadCount <= itemThreshold;
}
long threshold = (long)rule.getCount();
long threshold = (long) rule.getCount();
return ++threadCount <= threshold;
}

Expand All @@ -127,16 +124,16 @@ static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRu
static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
ParameterMetric metric = getParameterMetric(resourceWrapper);
CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);
CacheMap<Object, AtomicReference<TokenUpdateStatus>> tokenCounters = metric == null ? null : metric.getRuleStampedTokenCounter(rule);

if (tokenCounters == null || timeCounters == null) {
DateTimeFormatter dtf = DateTimeFormatter.ISO_DATE_TIME;
if (tokenCounters == null) {
return true;
}

// Calculate max token count (threshold)
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long tokenCount = (long)rule.getCount();
long tokenCount = (long) rule.getCount();
if (exclusionItems.contains(value)) {
tokenCount = rule.getParsedHotItems().get(value);
}
Expand All @@ -153,49 +150,44 @@ static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowR
while (true) {
long currentTime = TimeUtil.currentTimeMillis();

AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
if (lastAddTokenTime == null) {
AtomicReference<TokenUpdateStatus> atomicLastStatus = tokenCounters.putIfAbsent(value, new AtomicReference<>(
new TokenUpdateStatus(currentTime, maxCount - acquireCount)
));
if (atomicLastStatus == null) {
// Token never added, just replenish the tokens and consume {@code acquireCount} immediately.
tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
return true;
}

// Calculate the time duration since last token was added.
long passTime = currentTime - lastAddTokenTime.get();
TokenUpdateStatus lastStatus = atomicLastStatus.get();
long passTime = currentTime - lastStatus.getLastAddTokenTime();
// A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed.
long newQps;
if (passTime > rule.getDurationInSec() * 1000) {
AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
if (oldQps == null) {
// Might not be accurate here.
lastAddTokenTime.set(currentTime);
return true;
} else {
long restQps = oldQps.get();
long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
long restQps = lastStatus.getRestQps();
long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
: (restQps + toAddCount - acquireCount);

if (newQps < 0) {
return false;
}
if (oldQps.compareAndSet(restQps, newQps)) {
lastAddTokenTime.set(currentTime);
return true;
}
Thread.yield();
if (newQps < 0) {
return false;
}
TokenUpdateStatus newStatus = new TokenUpdateStatus(currentTime, newQps);
if (atomicLastStatus.compareAndSet(lastStatus, newStatus)) {
return true;
}
Thread.yield();
} else {
AtomicLong oldQps = tokenCounters.get(value);
if (oldQps != null) {
long oldQpsValue = oldQps.get();
if (oldQpsValue - acquireCount >= 0) {
if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {
return true;
}
} else {
return false;
newQps = lastStatus.getRestQps() - acquireCount;
if (newQps >= 0) {
TokenUpdateStatus newStatus = new TokenUpdateStatus(lastStatus.getLastAddTokenTime(), newQps);
if (atomicLastStatus.compareAndSet(lastStatus, newStatus)) {
return true;
}
} else {
return false;
}

Thread.yield();
}
}
Expand All @@ -211,7 +203,7 @@ static boolean passThrottleLocalCheck(ResourceWrapper resourceWrapper, ParamFlow

// Calculate max token count (threshold)
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long tokenCount = (long)rule.getCount();
long tokenCount = (long) rule.getCount();
if (exclusionItems.contains(value)) {
tokenCount = rule.getParsedHotItems().get(value);
}
Expand Down Expand Up @@ -261,7 +253,7 @@ private static ParameterMetric getParameterMetric(ResourceWrapper resourceWrappe
@SuppressWarnings("unchecked")
private static Collection<Object> toCollection(Object value) {
if (value instanceof Collection) {
return (Collection<Object>)value;
return (Collection<Object>) value;
} else if (value.getClass().isArray()) {
List<Object> params = new ArrayList<Object>();
int length = Array.getLength(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap;
Expand All @@ -46,25 +47,35 @@ public class ParameterMetric {
* @since 1.6.0
*/
private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTimeCounters = new HashMap<>();

/**
* Format: (rule, (value, tokenCounter))
*
* @since 1.6.0
*/
private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTokenCounter = new HashMap<>();
private final Map<ParamFlowRule, CacheMap<Object, AtomicReference<TokenUpdateStatus>>> ruleTokenCounter = new HashMap<>();

private final Map<Integer, CacheMap<Object, AtomicInteger>> threadCountMap = new HashMap<>();

/**
* Get the token counter for given parameter rule.
*
* @param rule valid parameter rule
* @return the associated token counter
* @since 1.6.0
* @since 1.8.8
*/
public CacheMap<Object, AtomicLong> getRuleTokenCounter(ParamFlowRule rule) {
CacheMap<Object, AtomicReference<TokenUpdateStatus>> getRuleStampedTokenCounter(ParamFlowRule rule) {
return ruleTokenCounter.get(rule);
}

public void clear() {
synchronized (lock) {
ruleTimeCounters.clear();
ruleTokenCounter.clear();
threadCountMap.clear();
}
}

/**
* Get the time record counter for given parameter rule.
*
Expand All @@ -76,14 +87,6 @@ public CacheMap<Object, AtomicLong> getRuleTimeCounter(ParamFlowRule rule) {
return ruleTimeCounters.get(rule);
}

public void clear() {
synchronized (lock) {
threadCountMap.clear();
ruleTimeCounters.clear();
ruleTokenCounter.clear();
}
}

public void clearForRule(ParamFlowRule rule) {
synchronized (lock) {
ruleTimeCounters.remove(rule);
Expand All @@ -106,7 +109,7 @@ public void initialize(ParamFlowRule rule) {
synchronized (lock) {
if (ruleTokenCounter.get(rule) == null) {
long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY);
ruleTokenCounter.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(size));
ruleTokenCounter.put(rule, new ConcurrentLinkedHashMapWrapper<>(size));
}
}
}
Expand Down Expand Up @@ -253,7 +256,7 @@ public long getThreadCount(int index, Object value) {
*
* @return the token counter map
*/
Map<ParamFlowRule, CacheMap<Object, AtomicLong>> getRuleTokenCounterMap() {
Map<ParamFlowRule, CacheMap<Object, AtomicReference<TokenUpdateStatus>>> getRuleTokenCounterMap() {
return ruleTokenCounter;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.alibaba.csp.sentinel.slots.block.flow.param;

class TokenUpdateStatus {

private final long lastAddTokenTime;

private final long restQps;

public TokenUpdateStatus(long lastAddTokenTime, long restQps) {
this.lastAddTokenTime = lastAddTokenTime;
this.restQps = restQps;
}

public long getLastAddTokenTime() {
return lastAddTokenTime;
}

public long getRestQps() {
return restQps;
}

@Override
public String toString() {
return "TokenUpdateStatus{" +
"hash=" + System.identityHashCode(this) +
", lastAddTokenTime=" + lastAddTokenTime +
", requestCount=" + restQps +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -158,7 +159,7 @@ public void testPassLocalCheckForCollection() throws InterruptedException {
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<>(4000));

assertTrue(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, list));
assertFalse(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, list));
Expand Down Expand Up @@ -215,7 +216,7 @@ public Object paramFlowKey() {
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<>(4000));

assertTrue(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, args));
assertFalse(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, args));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ public void testCheckQpsWithLongIntervalAndHighThreshold() {
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<>(4000));

// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(mocked, System.currentTimeMillis());
Expand Down Expand Up @@ -102,8 +101,7 @@ public void testParamFlowDefaultCheckSingleQps() {
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<>(4000));

// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(mocked, System.currentTimeMillis());
Expand Down Expand Up @@ -144,8 +142,7 @@ public void testParamFlowDefaultCheckSingleQpsWithBurst() throws InterruptedExce
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<>(4000));

// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(mocked, System.currentTimeMillis());
Expand Down Expand Up @@ -216,8 +213,7 @@ public void testParamFlowDefaultCheckQpsInDifferentDuration() throws Interrupted
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<>(4000));

// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(mocked, System.currentTimeMillis());
Expand Down Expand Up @@ -268,8 +264,7 @@ public void testParamFlowDefaultCheckSingleValueCheckQpsMultipleThreads() throws
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<>(4000));
int threadCount = 40;

final CountDownLatch waitLatch = new CountDownLatch(threadCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import static org.mockito.Mockito.when;

import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -100,9 +100,9 @@ public void testEntryWhenParamFlowExists() throws Throwable {
ParameterMetric metric = mock(ParameterMetric.class);

CacheMap<Object, AtomicLong> map = new ConcurrentLinkedHashMapWrapper<>(4000);
CacheMap<Object, AtomicLong> map2 = new ConcurrentLinkedHashMapWrapper<>(4000);
CacheMap<Object, AtomicReference<TokenUpdateStatus>> map2 = new ConcurrentLinkedHashMapWrapper<>(4000);
when(metric.getRuleTimeCounter(rule)).thenReturn(map);
when(metric.getRuleTokenCounter(rule)).thenReturn(map2);
when(metric.getRuleStampedTokenCounter(rule)).thenReturn(map2);
map.put(argToGo, new AtomicLong(TimeUtil.currentTimeMillis()));

// Insert the mock metric to control pass or block.
Expand Down

0 comments on commit e8e293f

Please sign in to comment.