Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[docs]ASoC 2020 中期总结(集群并发流控) #1639

Open
yunfeiyanggzq opened this issue Jul 31, 2020 · 3 comments
Open

[docs]ASoC 2020 中期总结(集群并发流控) #1639

yunfeiyanggzq opened this issue Jul 31, 2020 · 3 comments
Labels
ASoC2020 Issue or PR related to Alibaba Summer of Code 2020

Comments

@yunfeiyanggzq
Copy link
Contributor

yunfeiyanggzq commented Jul 31, 2020

阿里巴巴编程之夏中期总结

本人目前负责的是集群并发流控设计,相关资料可见 #1477 , #1629 ,本issue旨在向社区成员汇报进展、讨论完善设计方案,欢迎各位指正。

目前贡献

​ 对流控部分的代码补充了单元测试、升级了slot-spi-demo.同时集群并发流控已经完成了基本的功能,目前正着手代码优化(性能优化、排查bug、增加注释)并逐步提交。还希望社区的各位朋友帮忙review相关pr,目前比较紧急的是集群并发流控规则源部分 #1631
图片
图片

总体进展

目前已经基本实现集群并发流控,已经完成了了集群流控规则ClusterFlowRuleManager改造,可以实现动态规则源注册流控规则,实现了ConcurrentFlowRuleChecker完成token的发放和回收,实现集群并发流控。改造了FlowRuleSlot接入集群并发流控管控。可参见: #1629

总体思路

为了实现集群并发流控,我们必须存储每个调用的信息,发现并清除那些不正常的资源调用。具体的讲,当token client发起调用时会获得token Server颁发的身份标识tokenId, token server会存储这个tokenId值和相关的信息。当调用结束时,token client会携带着该次调用被分配的tokenId前往token server请求释放资源,token server会删除这个调用对应的 tokenId。通过这样的操作,我们能够实时的统计当前正在进行的调用数量,即并发量。如果出现资源调用超时或者token client掉线的情况,token server会尝试着去发现删除对应调用并清除存储的tokenId,从而获得准确的并发量nowCalls。

图片 图片

具体可见:
Sentinel.pdf

解决办法

  • tokenId如何生成

    long tokenId = UUID.randomUUID().getMostSignificantBits()
    

    目前生成全局唯一的算法有

    • 利用redis、mysql等数据库辅助生成id
    • UUID
      UUID是一个比较好的方案,但是考虑到生成的字符串比较大,占用网络资源很大。
      getMostSignificantBits() 方法用来返回此UUID的128位最显著64位值,可以牺牲一定的唯一性来节省网络传输的开销,发生冲突的概率极小(几乎不可能,如果冲突也不会对系统造成很多大的影响,只会让并发量统计稍有误差),可以生成long类型的tokenID节省网络传输资源。
    • 雪花算法及变形
  • 如何设计超时自动释放

    我们采用了netty使用的时间轮框架在client端实现高效的自动检测超时机制。

    这里写图片描述
    为了设计高效的定时释放,我们采用了netty的延时执行框架 HashWheelTimer! 一个Hash Wheel Timer是一个环形结构,可以想象成时钟,分为很多格子,一个格子代表一段时间(越短Timer精度越高),并用一个List保存在该格子上到期的所有任务,同时一个指针随着时间流逝一格一格转动,并执行对应List中所有到期的任务。任务通过取模决定应该放入哪个格子。
    环形结构可以根据超时时间的 hash 值(这个 hash 值实际上就是ticks & mask)将 task 分布到不同的槽位中, 当 tick 到那个槽位时, 只需要遍历那个槽位的 task 即可知道哪些任务会超时(而使用线性结构, 你每次 tick 都需要遍历所有 task), 所以, 我们任务量大的时候, 相应的增加 wheel 的 ticksPerWheel 值, 可以减少 tick 时遍历任务的个数.

  • ConcurrentLinkedHashMap 读改变顺序影响过期删除策略删除效率

    public static TokenCacheNode getTokenCacheNode(long tokenId) {
         return TOKEN_CACHE_NODE_MAP.getQuietly(tokenId);
    }
    //getQuietly()可以防止ConcurrentLinkedHashMap发生LRU操作乱序
    
  • 若超时检测在 client 端触发,客户端超时后向 server 端发出 releaseToken 请求,万一这个请求也超时怎么办?超时的情况包括最终未到达对端,和延迟到达对端

    server端执行定时任务删除相应的token,将存储时间超过3倍resourceTimeout的token删除,认为其是client端资源调用超时且未主动释放,我们可以在FlowRule中配置 resourceTimeoutStrategy让用户选择是否主动释放:

    • 0:让server端释放资源调用超时token
    • 1:让client端主动请求释放资源调用超时token

    server端定期清除异常token:

    public class RegularExpireStrategy implements ExpireStrategy {
       /**
        * The max number of token deleted each time,
        * the number of expired key-value pairs deleted each time does not exceed this number
        */
       private long executeCount;
       /**
        * Length of time for task execution
        */
       private long executeDuration;
       /**
        * Frequency of task execution
        */
       private long executeRate;
       /**
        * the local cache of tokenId
        */
       private ConcurrentLinkedHashMap<Long, TokenCacheNode> localCache;
    
       @SuppressWarnings("PMD.ThreadPoolCreationRule")
       private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
    
       private final long DEFAULT_EXECUTE_COUNT = 1000;
       private final long DEFAULT_EXECUTE_DURATION = 600;
       private final long DEFAULT_EXECUTE_RATE = 1000;
    
       public RegularExpireStrategy() {
           this.executeCount = DEFAULT_EXECUTE_COUNT;
           this.executeDuration = DEFAULT_EXECUTE_DURATION;
           this.executeRate = DEFAULT_EXECUTE_RATE;
       }
    
       @Override
       public void removeExpireKey(ConcurrentLinkedHashMap localCache) {
           AssertUtil.isTrue(localCache != null, " local cache can't be null");
           this.localCache = localCache;
           executor.scheduleAtFixedRate(new MyTask(), 0, executeRate, TimeUnit.MILLISECONDS);
       }
    
       private class MyTask implements Runnable {
           @Override
           public void run() {
               try {
                   clearToken();
               } catch (Throwable e) {
                   e.printStackTrace();
                   RecordLog.warn("[RegularExpireStrategy] undefined throwable<{}> during clear token", e);
               }
           }
       }
    
       private void clearToken() {
           long start = System.currentTimeMillis();
           List<Long> keyList = new ArrayList<>(localCache.keySet());
           for (int i = 0; i < executeCount && i < keyList.size(); i++) {
               // time out execution exit
               if (System.currentTimeMillis() - start > executeDuration) {
                   RecordLog.info("[RegularExpireStrategy] End the process of expired token detection because of execute time is more than executeDuration<{}>", executeDuration);
                   break;
               }
               // use ConcurrentLinkedHashMap to improve the expiration detection progress
               Long key = keyList.get(i);
               TokenCacheNode node = localCache.get(key);
               if (node == null) {
                   continue;
               }
               // 客户端掉线删除
               if (!ConnectionManager.isClientOnline(node.getClientAddress()) && node.getClientTimeout() - System.currentTimeMillis() < 0) {
                   removeToken(key, node);    
                   RecordLog.info("[RegularExpireStrategy] Delete the expired token<{}> because of client offline", node.getTokenId());
                   continue;
               }
               //资源调用超时删除
               // If we find that token's save time is much longer than the client's call resource timeout time, token will be determined to timeout and the client go wrong
               long resourceTimeout = ClusterFlowRuleManager.getFlowRuleById(node.getFlowId()).getClusterConfig().getResourceTimeout();
               if (System.currentTimeMillis() - node.getResourceTimeout() > 2 * resourceTimeout) {
                   RecordLog.info("[RegularExpireStrategy] Delete the expired token<{}> because of resource timeout", node.getTokenId());
               }
           }
       }
    
       private void removeToken(long tokenId, TokenCacheNode node) {
           if (localCache.remove(tokenId) == null) {
               RecordLog.info("[RegularExpireStrategy] Token<{}> is already released", tokenId);
               return;
           }
           AtomicInteger nowCalls = CurrentConcurrencyManager.get(node.getFlowId());
           if (nowCalls == null) {
               return;
           }
           nowCalls.getAndAdd(node.getAcquireCount() * -1);
       }
    }
  • 如果 currentClusterConcurrency >= threshold 的时候,是否可以设计一种排队等待机制(类似于锁等待的机制),等待一段时间不 ok 再返回

    目前的实现是对于prioritized=true的请求我们设计了acquireRefuseStrategy可以在FlowRule中进行配置,请求block时可以在client端执行相应的策略:

    • 0 :忽略直接拒绝请求
    • 1:再次请求一次
    • 2:一直请求直到成功

    目前存在的问题是每次请求的休眠时间的计算算法尚未确定,我的思路是:

        private static void sleep(DefaultNode node, FlowRule rule) {
            try {
                int sleepTime = 0;
                ClusterNode clusterNode = node.getClusterNode();
                if (clusterNode.avgRt() != 0
                    && clusterNode.blockQps() != 0 
                    && clusterNode.passQps() != 0) {
                    sleepTime = (int) (clusterNode.avgRt() * clusterNode.blockQps() / rule.getCount());
                } else {
    
                    sleepTime = new Random().nextInt(300);
                }
                sleepTime = sleepTime == 0 ? 10 : sleepTime;
                TimeUnit.MILLISECONDS.sleep(sleepTime);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    单个线程每秒能处理的最多请求: 1000/clusterNode.avgRt() ;

    count个线程每秒最多处理的请求:maxRequest= rule.getCount()*(1000/clusterNode.avgRt()) ;

    拥挤程度用ratio表示:

    因为:clusterNode.blockQps()=block/1000
    
    所以:block=1000*clusterNode.blockQps();
    
    因此:ratio= 1000*clusterNode.blockQps()/maxRequest
    
    =clusterNode.blockQps()/( rule.getCount()/clusterNode.avgRt())
    
    =clusterNode.avgRt() * clusterNode.blockQps() / rule.getCount()
    

    表示拥挤程度,值越大越拥挤,等待时间越长。

    但是考虑到在某些情况下各个参数可能为0(突发情况下统计不及时),我们可以产生随机数设置睡眠时间。

  • 单机并发流控如何设计

    目前的单机并发流控已经在使用了,但是由于没有做同步设计,所以并发流控并不是很精确。这个具体的优化我还在思考。可参考: 高并发情况下,各种规则限流并不准确 #1620

  • 如何展示并发流控效果

    由于并发量是一个瞬间效果,所以我们很难精确的记录相关流控效果,退而求其次采用每秒采样记录到本地日志的方式进行展示。也可以通过接口进行实时查看:

    接口:

    @CommandMapping(name = "cluster/server/concurrency", desc = "get cluster concurrency")
    public class FetchClusterConcurrencyCommandHandler implements CommandHandler<String> {
        @Override
        public CommandResponse<String> handle(CommandRequest request) {
            String flowId = request.getParam("flowId");
            if (!StringUtil.isEmpty(flowId)) {
                return CommandResponse.ofSuccess(JSON.toJSONString(CurrentConcurrencyManager.get(Long.valueOf(flowId))));
            } else {
                return CommandResponse.ofSuccess(JSON.toJSONString(CurrentConcurrencyManager.getConcurrencyMap()));
            }
        }
    }

    日志:

    public class ClusterConcurrentCheckerLogListener implements Runnable {
        @Override
        public void run() {
            try {
                collectInformation();
            } catch (Exception e) {
                RecordLog.warn("[ClusterConcurrentCheckerLogListener] Failed to record concurrent flow control  regularly", e);
            }
        }
    
        private void collectInformation() {
            ConcurrentHashMap<Long, AtomicInteger> nowCallsMap = CurrentConcurrencyManager.getConcurrencyMap();
            for (long flowId : nowCallsMap.keySet()) {
                FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(flowId);
                if (rule == null || nowCallsMap.get(flowId).get() == 0) {
                    continue;
                }
                double concurrencyLevel = ConcurrentClusterFlowChecker.calcGlobalThreshold(rule);
                String resource = rule.getResource();
                ClusterServerStatLogUtil.log("concurrent|resource:" + resource + "|flowId:" + flowId + "|concurrencyLevel:" + concurrencyLevel, nowCallsMap.get(flowId).get());
            }
        }
    }
  • 如何在集群流控保证并发安全

    • 存储并发量的CurrentConcurrency中使用并发安全的ConcurrentHashMap

    • 存储token信息的TokenCacheNodeManager中使用并发安全的ConcurrentLinkedHashMap

    • 使用synchronized保证内存可见性和操作的原子性。

       public static TokenResult acquireConcurrentToken(String clientAddress,/*@Valid*/ FlowRule rule, int acquireCount) {
              long flowId = rule.getClusterConfig().getFlowId();
              AtomicInteger nowCalls = CurrentConcurrencyManager.get(flowId);
              if (nowCalls == null) {
                  RecordLog.warn("[ConcurrentClusterFlowChecker] Fail to get nowCalls by flowId<{}>", flowId);
                  return new TokenResult(TokenResultStatus.FAIL);
              }
      
              // 提前检测进行阻塞减小锁竞争,提高效率
              if (nowCalls.get() + acquireCount > calcGlobalThreshold(rule)) {
                  ClusterServerStatLogUtil.log("concurrent|block|" + flowId, acquireCount);
                  return new TokenResult(TokenResultStatus.BLOCKED);
              }
      
              // 使用锁确保原子性
              // 对不同的rule的并发量加锁可以实现类似分段锁的功能,提高锁效率
              synchronized (nowCalls) {
                  // 再次检测判断是否通过
                  if (nowCalls.get() + acquireCount > calcGlobalThreshold(rule)) {
                      ClusterServerStatLogUtil.log("concurrent|block|" + flowId, acquireCount);
                      return new TokenResult(TokenResultStatus.BLOCKED);
                  } else {
                      nowCalls.getAndAdd(acquireCount);
                  }
              }
              ClusterServerStatLogUtil.log("concurrent|pass|" + flowId, acquireCount);
              TokenCacheNode node = TokenCacheNode.generateTokenCacheNode(rule, acquireCount, clientAddress);
              TokenCacheNodeManager.putTokenCacheNode(node.getTokenId(), node);
              TokenResult tokenResult = new TokenResult(TokenResultStatus.OK);
              tokenResult.setTokenId(node.getTokenId());
              return tokenResult;
          }

未来计划

  • 完善现有代码(添加单元测试、demo、注释),分批提交pr等待review,根据review意见修改代码。

  • 研究如何完善本地并发流控(设计同步机制)

  • 时间充足的话可以完善集群流控和dashboard的相结合的部分

@sczyh30 sczyh30 added the ASoC2020 Issue or PR related to Alibaba Summer of Code 2020 label Jul 31, 2020
@caopengan
Copy link

说实话,这个集群限流的使用上真的是不太方便,文档写的也不太清楚,研究了好久……

@yunfeiyanggzq
Copy link
Contributor Author

说实话,这个集群限流的使用上真的是不太方便,文档写的也不太清楚,研究了好久……
可能是现在社区的qps的集群流控的文档还不完善,集群并发流控这一块我会注意文档的完善,有时间一并完善一下

CST11021 pushed a commit to CST11021/Sentinel that referenced this issue Nov 3, 2021
[ISSUE alibaba#1639] use ipv4 first when choosing local IP
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ASoC2020 Issue or PR related to Alibaba Summer of Code 2020
Projects
None yet
Development

No branches or pull requests

4 participants
@sczyh30 @caopengan @yunfeiyanggzq and others