Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【Netty源码解析】FastThreadLocal源码分析 #6

Open
TFdream opened this issue Apr 17, 2020 · 0 comments
Open

【Netty源码解析】FastThreadLocal源码分析 #6

TFdream opened this issue Apr 17, 2020 · 0 comments

Comments

@TFdream
Copy link
Owner

TFdream commented Apr 17, 2020

netty自行封装了FastThreadLocal 以替换jdk提供的ThreadLocal,结合封装的FastThreadLocalThread在多线程环境下的变量提高了ThreadLocal对象的查询以及更新效率。

关于ThreadLocal 介绍请参考 jdk源码剖析 - ThreadLocal

注:本文Netty版本为 4.1.50.Final,不同版本源码可能略有不同。

FastThreadLocal介绍

FastThreadLocal 对JDK中ThreadLocal进行了优化,由于ThreadLocal底层存储数据是一个ThreadLocalMap 结构,是一个数组结构,通过threadLocalHashCode查找在数组中的元素Entry, 当hash冲突时,继续向前检测查找(又叫 线性探测), 所以当Hash冲突时,检索的效率就会降低,具体可查看ThreadLocal#get 与ThreadLocal.ThreadLocalMap#getEntry 方法。而FastThreadLocal则正是处理了这个问题,使其时间复杂度一直为O(1)。

这里多提一嘴,哈希冲突的处理方法有2种:开放定址法(线性探测、二次探测) 和 链地址法,ThreadLocal 采用的是 开放定址法,HashMap采用的是 链地址法。

InternalThreadLocalMap

FastThredLocal 需配合 FastThreadLocalThread一起使用,它内部使用 InternalThreadLocalMap,而 InternalThreadLocalMap 继承自 UnpaddedInternalThreadLocalMap类,如下:

class UnpaddedInternalThreadLocalMap {
    //对非FastThreadLocalThread线程做兼容,从中获取InternalThreadLocalMap
    static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
    static final AtomicInteger nextIndex = new AtomicInteger();

    /** Used by {@link FastThreadLocal} */
    Object[] indexedVariables;

    UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
        this.indexedVariables = indexedVariables;
    }
}

我们再来看看 InternalThreadLocalMap类,如下:

public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {

    public static final Object UNSET = new Object();

    // Cache line padding (must be public)
    // With CompressedOops enabled, an instance of this class should occupy at least 128 bytes.
    public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;
    
    //构造方法
    private InternalThreadLocalMap() {
        super(newIndexedVariableTable());
    }

    private static Object[] newIndexedVariableTable() {
        Object[] array = new Object[INDEXED_VARIABLE_TABLE_INITIAL_SIZE];
        Arrays.fill(array, UNSET);
        return array;
    }

    public static InternalThreadLocalMap getIfSet() {
        Thread thread = Thread.currentThread();
        if (thread instanceof FastThreadLocalThread) {
            return ((FastThreadLocalThread) thread).threadLocalMap();
        }
        return slowThreadLocalMap.get();
    }

    //获取当前线程的InternalThreadLocalMap,没有则创建
    public static InternalThreadLocalMap get() {
        Thread thread = Thread.currentThread();
        if (thread instanceof FastThreadLocalThread) {
            return fastGet((FastThreadLocalThread) thread);
        } else {
            return slowGet();
        }
    }

    private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
        InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
        if (threadLocalMap == null) {
            thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
        }
        return threadLocalMap;
    }

    //对于非FastThreadLocalThread 线程,从ThreadLocal中获取
    private static InternalThreadLocalMap slowGet() {
        ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
        InternalThreadLocalMap ret = slowThreadLocalMap.get();
        if (ret == null) {
            ret = new InternalThreadLocalMap();
            slowThreadLocalMap.set(ret);
        }
        return ret;
    }
}

首先,看一下 InternalThreadLocalMap 的构造方法:

    private InternalThreadLocalMap() {
        super(newIndexedVariableTable());
    }

    private static Object[] newIndexedVariableTable() {
        Object[] array = new Object[INDEXED_VARIABLE_TABLE_INITIAL_SIZE];
        Arrays.fill(array, UNSET);
        return array;
    }

它对 indexedVariables 数组进行赋值,indexedVariables 是用来保存FastThreadLocal#set方法传递进来的对象。

很容易看出,对于FastThreadLocalThread,都会有一个InternalThreadLocalMap的引用,可直接获取。非FastThreadLocalThread直接从UnpaddedInternalThreadLocalMap.slowThreadLocalMap中获取,

FastThreadLocalThread如下:

/**
 * A special {@link Thread} that provides fast access to {@link FastThreadLocal} variables.
 */
public class FastThreadLocalThread extends Thread {
    // This will be set to true if we have a chance to wrap the Runnable.
    private final boolean cleanupFastThreadLocals;

    private InternalThreadLocalMap threadLocalMap;

    public FastThreadLocalThread() {
        cleanupFastThreadLocals = false;
    }
}

FastThreadLocal

首先,我们来看看 FastThreadLocal 的构造方法,如下:

public class FastThreadLocal<V> {

    private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();

    private final int index;

    public FastThreadLocal() {
        index = InternalThreadLocalMap.nextVariableIndex();
    }
}

FastThreadLocal 有一个 index成员变量,在创建FastThreadLocal对象时被赋值为 InternalThreadLocalMap.nextVariableIndex();,它保证了index的唯一性,InternalThreadLocalMap#nextVariableIndex方法如下:

    public static int nextVariableIndex() {
        int index = nextIndex.getAndIncrement();
        if (index < 0) {
            nextIndex.decrementAndGet();
            throw new IllegalStateException("too many thread-local indexed variables");
        }
        return index;
    }

index表示UnpaddedInternalThreadLocalMap属性indexedVariables数组的下标, 从而获取到保存在indexedVariables中的数据。

FastThreadLocal#set方法

FastThreadLocal#set方法如下:


    /**
     * Set the value for the current thread.
     */
    public final void set(V value) {
        if (value != InternalThreadLocalMap.UNSET) {
            InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
            setKnownNotUnset(threadLocalMap, value);
        } else {
            remove();
        }
    }

    /**
     * Set the value for the specified thread local map. The specified thread local map must be for the current thread.
     */
    public final void set(InternalThreadLocalMap threadLocalMap, V value) {
        if (value != InternalThreadLocalMap.UNSET) {
            setKnownNotUnset(threadLocalMap, value);
        } else {
            remove(threadLocalMap);
        }
    }

其中,InternalThreadLocalMap.get() 在上一个章节说过了,就是获取当前线程绑定的InternalThreadLocalMap对象,重点看一下 setKnownNotUnset方法,如下:

    /**
     * @return see {@link InternalThreadLocalMap#setIndexedVariable(int, Object)}.
     */
    private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
        if (threadLocalMap.setIndexedVariable(index, value)) {
            addToVariablesToRemove(threadLocalMap, this);
        }
    }

FastThreadLocal#get方法

FastThreadLocal#get方法 代码如下:

    public final V get() {
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
        Object v = threadLocalMap.indexedVariable(index);
        if (v != InternalThreadLocalMap.UNSET) {
            return (V) v;
        }

        return initialize(threadLocalMap);
    }

    public final V getIfExists() {
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
        if (threadLocalMap != null) {
            Object v = threadLocalMap.indexedVariable(index);
            if (v != InternalThreadLocalMap.UNSET) {
                return (V) v;
            }
        }
        return null;
    }

    public final V get(InternalThreadLocalMap threadLocalMap) {
        Object v = threadLocalMap.indexedVariable(index);
        if (v != InternalThreadLocalMap.UNSET) {
            return (V) v;
        }

        return initialize(threadLocalMap);
    }

    private V initialize(InternalThreadLocalMap threadLocalMap) {
        V v = null;
        try {
            v = initialValue();
        } catch (Exception e) {
            PlatformDependent.throwException(e);
        }

        threadLocalMap.setIndexedVariable(index, v);
        addToVariablesToRemove(threadLocalMap, this);
        return v;
    }

    /**
     * Returns the initial value for this thread-local variable.
     */
    protected V initialValue() throws Exception {
        return null;
    }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant