Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Netty源码解析 - HashedWheelTimer #20

Open
TFdream opened this issue Apr 26, 2021 · 0 comments
Open

Netty源码解析 - HashedWheelTimer #20

TFdream opened this issue Apr 26, 2021 · 0 comments

Comments

@TFdream
Copy link
Owner

TFdream commented Apr 26, 2021

背景

由于netty动辄管理100w+的连接,每一个连接都会有很多超时任务。比如发送超时、心跳检测间隔等,如果每一个定时任务都启动一个Timer,不仅低效,而且会消耗大量的资源。

时间轮

根据George Varghese 和 Tony Lauck 1996 年的论文:Hashed and Hierarchical Timing Wheels: data structures to efficiently implement a timer facility。提出了一种定时轮的方式来管理和维护大量的Timer调度。

时间轮原理

时间轮底层数据结构 是 环形队列,它能够让需要处理的数据(任务的抽象)集中,在 Netty 中存在大量的延迟操作,比如发送超时、心跳检测等。Netty 并没有使用 JDK 自带的 Timer 或者 DelayQueue 来实现延迟的功能,而是基于时间轮自定义了一个用于实现延迟功能的定时器(HashedWheelTimer)。JDK 的 Timer 和 DelayQueue 插入和删除操作的平均时间复杂度为 O(nlog(n)),并不能满足 Kafka 的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为 O(1)

时间轮其实就是一种环形的数据结构,可以想象成时钟,分成很多格子,一个格子代码一段时间(这个时间越短,Timer的精度越高)。并用一个链表报错在该格子上的到期任务,同时一个指针随着时间一格一格转动,并执行相应格子中的到期任务。任务通过取模决定放入那个格子。如下图所示:

image

针对上图的几个名词简单解释下:

  • tickMs: 时间轮由多个时间格组成,每个时间格就是 tickMs,它代表当前时间轮的基本时间跨度。
  • wheelSize: 代表每一层时间轮的格数
  • interval: 当前时间轮的总体时间跨度,interval=tickMs × wheelSize
  • startMs: 构造当层时间轮时候的当前时间。

HashedWheelTimer

netty中的HashedWheelTimer提供的是一个定时任务的一个优化实现方案,在netty中主要用于异步IO的定时规划触发(A timer optimized for approximated I/O timeout scheduling)。数据结构如下:

image

这个图基本上就涵盖了HashedWheelTimer的所有的概念要素:

  • wheel 一个时间轮,其实就是一个环形数组,数组中的每个元素代表的就是未来的某些时间片段上需要执行的定时任务的集合。
    这里需要注意的就是不是某个时间而是某些时间。因为比方说我时间轮上的大小是10,时间间隔是1s,那么我1s和11s的要执行的定时任务都会在index为1的格子上。
  • tick 工作线程当前运行的tick数,每一个tick代表worker线程当前的一次工作时间
  • hash 在时间轮上的hash函数。默认是tick%bucket的数量,即将某个时间节点映射到了时间轮上的某个唯一的格子上。
  • bucket 时间轮上的一个格子,它维护的是一个Timeout的双向链表,保存的是这个哈希到这个格子上的所有Timeout任务。
  • timeout 代表一个定时任务,其中记录了自己的deadline,运行逻辑以及在bucket中需要呆满的圈数,比方说之前1s和11s的例子,他们对应的timeout中圈数就应该是0和1。 这样当遍历一个bucket中所有的timeout的时候,只要圈数为0说明就应该被执行,而其他情况就把圈数-1就好。
  • workerThread 单线程用于处理所有的定时任务,它会在每个tick执行一个bucket中所有的定时任务,以及一些其他的操作。意味着定时任务不能有较大的阻塞和耗时,不然就会影响定时任务执行的准时性和有效性。

除此之外,netty的HashedWheelTimer实现还有两个东西值得关注,分别是pending-timeouts队列和cancelled-timeouts队列。这两个队列分别记录新添加的定时任务和要取消的定时任务,当workerThread每次循环运行时,它会先将取消掉cancelled-timeouts中所有的任务,然后将pending-timeouts队列中一定数量的任务移动到它们对应的bucket。由于添加和取消任务可以由任意线程发起,而相应的处理只会在workerThread里,所以为了进一步提高性能,这两个队列都是用了JCTools里面的MPSC(multiple-producer-single-consumer)队列。

与传统定时任务的区别

延迟操作,通常可以采用两个方案:

  • Timer:定时器维护一个优先队列,到时间点执行,Java中可以使用JDK自带的Timer或者DelayQueue来实现延迟的功能,插入和删除操作的平均时间复杂度为O(nlog(n));
  • 时间轮(timingWheel ),维护一个存放任务组的数组,每一个槽都维护一个存储 task 的双向链表。开始执行时,计时器每隔指定时间执行一个槽里面的 tasks。

时间轮(timingWheel ) 把维护 task 从 优先队列 O(nlog(n)) 降到 双向链表 O(1),而执行 task 也只要轮询一个时间点的 tasks O(N),不需要像优先队列,放入和删除元素 O(nlog(n))。

Netty时间轮核心API

下面我们就来分析时间轮涉及的核心接口和实现。

在 Netty 中,所有延时任务都要实现 TimerTask 接口。TimerTask 只定义了一个 run() 方法,入参是一个 Timeout 接口对象,如下:

public interface TimerTask {

    void run(Timeout timeout) throws Exception;
}

Timeout 对象与 TimerTask 对象一一对应,类似线程池返回的 Future 对象与提交到线程池中的任务对象之间的关系。
通过 Timeout 对象,不仅可以查看定时任务的状态,还可以操作定时任务(例如取消关联的定时任务),如下:

public interface Timeout {

    Timer timer();

    TimerTask task();

    boolean isExpired();

    boolean isCancelled();

    boolean cancel();
}

Timer 接口定义了定时器的基本行为,核心是 newTimeout() :提交一个定时任务(TimerTask)并返回关联的 Timeout 对象,类似于向线程池提交任务。如下:

public interface Timer {

    Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);

    Set<Timeout> stop();
}

HashedWheelTimeout

HashedWheelTimeout 是 Timeout 接口的唯一实现,是 HashedWheelTimer 的内部类。HashedWheelTimeout 扮演了两个角色:

  • 时间轮中双向链表的节点,即定时任务 TimerTask 在 HashedWheelTimer 中的容器;
  • 定时任务 TimerTask 提交到 HashedWheelTimer 之后返回的句柄(Handle),用于在时间轮外部查看和控制定时任务;
    private static final class HashedWheelTimeout implements Timeout {

        private static final int ST_INIT = 0;
        private static final int ST_CANCELLED = 1;
        private static final int ST_EXPIRED = 2;
        private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
                AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");

        private final HashedWheelTimer timer;
        private final TimerTask task;
        private final long deadline;

        @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
        private volatile int state = ST_INIT;

        // remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
        // HashedWheelTimeout will be added to the correct HashedWheelBucket.
        long remainingRounds;

        // This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
        // As only the workerThread will act on it there is no need for synchronization / volatile.
        HashedWheelTimeout next;
        HashedWheelTimeout prev;

        // The bucket to which the timeout was added
        HashedWheelBucket bucket;

        HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
            this.timer = timer;
            this.task = task;
            this.deadline = deadline;
        }

其中,deadline 是 定时任务执行的时间,是在创建 HashedWheelTimeout 时指定
计算公式:currentTime(创建 HashedWheelTimeout 的时间) + delay(任务延迟时间) - startTime(HashedWheelTimer 的启动时间),逻辑在 HashedWheelTimer#newTimeout方法中,如下:

        // Add the timeout to the timeout queue which will be processed on the next tick.
        // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

        // Guard against overflow.
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;

state,定时任务当前所处状态,取值如下:

  • ST_INIT 初始值
  • ST_CANCELLED 取消
  • ST_EXPIRED 过期

STATE_UPDATER 用于实现 state 状态变更的原子性。

remainingRounds 当前任务剩余的时钟周期数。时间轮所能表示的时间长度有限,在任务到期时间与当前时刻的时间差,超过时间轮单圈能表示时长,就出现套圈,需要该字段值表示剩余的时钟周期。

HashedWheelTimeout 的方法如下:

        public boolean compareAndSetState(int expected, int state) {
            return STATE_UPDATER.compareAndSet(this, expected, state);
        }

        public int state() {
            return state;
        }

        @Override
        public boolean isCancelled() {
            return state() == ST_CANCELLED;
        }

        @Override
        public boolean isExpired() {
            return state() == ST_EXPIRED;
        }

cancel方法:

        @Override
        public boolean cancel() {
            // only update the state it will be removed from HashedWheelBucket on next tick.
            if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
                return false;
            }
            // If a task should be canceled we put this to another queue which will be processed on each tick.
            // So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
            // we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
            timer.cancelledTimeouts.add(this);
            return true;
        }

expire方法:

        public void expire() {
            if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
                return;
            }

            try {
                task.run(this);
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
                }
            }
        }

remove方法:

        void remove() {
            HashedWheelBucket bucket = this.bucket;
            if (bucket != null) {
                bucket.remove(this);
            } else {
                timer.pendingTimeouts.decrementAndGet();
            }
        }

HashedWheelBucket

HashedWheelBucket 是 时间轮中的一个槽。
时间轮中的槽实际上就是一个用于缓存和管理双向链表的容器,双向链表中的每一个节点就是一个 HashedWheelTimeout 对象,也就关联了一个 TimerTask 定时任务。

HashedWheelBucket 持有双向链表的首尾两个节点 - head 和 tail,再加上每个 HashedWheelTimeout 节点均持有前驱和后继引用,即可正、逆向遍历整个链表。

相关资料

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant