Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Atomic包之FieldUpdater深度解析 #10

Open
aCoder2013 opened this issue Sep 23, 2017 · 0 comments
Open

Atomic包之FieldUpdater深度解析 #10

aCoder2013 opened this issue Sep 23, 2017 · 0 comments

Comments

@aCoder2013
Copy link
Owner

aCoder2013 commented Sep 23, 2017

前言

Java 5 中由Doug Lea大神写的atomic classes 中引入了 Field Updater,本质上来说就是volatile 字段的包装器,下面我们看看该如何使用:

AtomicIntegerFieldUpdater

首先看看类的定义:

public abstract class AtomicIntegerFieldUpdater<T> {

    /**
    *  抽象方法,提供一个静态方法以便得到实例
    */
    @CallerSensitive
    public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,
                                                              String fieldName) {
        return new AtomicIntegerFieldUpdaterImpl<U>
            (tclass, fieldName, Reflection.getCallerClass());
    }
}


private static final class AtomicIntegerFieldUpdaterImpl<T>
        extends AtomicIntegerFieldUpdater<T> {
        
    AtomicIntegerFieldUpdaterImpl(final Class<T> tclass,
                                      final String fieldName,
                                      final Class<?> caller) {
            final Field field;
            final int modifiers;
            try {
                field = AccessController.doPrivileged(
                    new PrivilegedExceptionAction<Field>() {
                        public Field run() throws NoSuchFieldException {
                            //字段不存在会抛异常
                            return tclass.getDeclaredField(fieldName);
                        }
                    });
                //检查访问级别,
                modifiers = field.getModifiers();
                sun.reflect.misc.ReflectUtil.ensureMemberAccess(
                    caller, tclass, null, modifiers);
                ClassLoader cl = tclass.getClassLoader();
                ClassLoader ccl = caller.getClassLoader();
                if ((ccl != null) && (ccl != cl) &&
                    ((cl == null) || !isAncestor(cl, ccl))) {
                    sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass);
                }
            } catch (PrivilegedActionException pae) {
                throw new RuntimeException(pae.getException());
            } catch (Exception ex) {
                throw new RuntimeException(ex);
            }

            //必须是int
            if (field.getType() != int.class)
                throw new IllegalArgumentException("Must be integer type");
            //必须用volatile修饰
            if (!Modifier.isVolatile(modifiers))
                throw new IllegalArgumentException("Must be volatile type");

            this.cclass = (Modifier.isProtected(modifiers)) ? caller : tclass;
            this.tclass = tclass;
            //用Unsafe里的那一坨方法去原子更新
            this.offset = U.objectFieldOffset(field);
        }
}

那么有那些使用场景呢,我们可以看到Cassandra中有一些地方用到了,比如:

public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackWithFailure<T>{
    private static final AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater
    = AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, "failures");
    
    private volatile int failures = 0;

        public void get() throws WriteTimeoutException, WriteFailureException
    {
        long timeout = currentTimeout();

        boolean success;
        try
        {
            success = condition.await(timeout, TimeUnit.NANOSECONDS);
        }
        catch (InterruptedException ex)
        {
            throw new AssertionError(ex);
        }

        if (!success)
        {
            int blockedFor = totalBlockFor();
            int acks = ackCount();
            if (acks >= blockedFor)
                acks = blockedFor - 1;
            throw new WriteTimeoutException(writeType, consistencyLevel, acks, blockedFor);
        }

        //可以看到平常可以直接访问failures,而不用像AtomicInteger一样还需要调用get方法
        if (totalBlockFor() + failures > totalEndpoints())
        {
            throw new WriteFailureException(consistencyLevel, ackCount(), totalBlockFor(), writeType, failureReasonByEndpoint);
        }
    }

    @Override
    public void onFailure(InetAddress from, RequestFailureReason failureReason)
    {
        logger.trace("Got failure from {}", from);

        //当需要类似AtomicInteger一样的功能时,我们知道volatile执行++是非原子的
        int n = waitingFor(from)
                ? failuresUpdater.incrementAndGet(this)
                : failures;

        failureReasonByEndpoint.put(from, failureReason);

        if (totalBlockFor() + n > totalEndpoints())
            signal();
    }

}

小结

总体来看使用场景不多,有点类似AtomicInteger,但是可以使用比较方便,比如定义了一个变量x,你可以直接正常访问x,而如果使用了AtomicInteger的话,还是需要调用AtomicInteger#get方法,另外如果你又需要原子的get-set的话也可以满足使用场景;另外比如说你有个比较大的链表,内部每个节点都需要原子的get-set,那每个节点都要定义一个AtomicInteger,这样会消耗更多的内存,那么就可以定义个static的AtomicIntegerFieldUpdater。

AtomicReferenceFieldUpdater

和Integer差不多了,只不过这个是用来更新引用的,所以就不过多解读了,Java类库中BufferedInputStream就调用了这个类:

public
class BufferedInputStream extends FilterInputStream {

    protected volatile byte buf[];

    /*
    *  原子的更新内部数组,比如扩容、关闭时,
    */
    private static final
        AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater =
        AtomicReferenceFieldUpdater.newUpdater
        (BufferedInputStream.class,  byte[].class, "buf");


    public void close() throws IOException {
        byte[] buffer;
        while ( (buffer = buf) != null) {
            //放在一个循环中,如果CAS更新失败,那么就读取最新的buf引用,继续CAS更新
            if (bufUpdater.compareAndSet(this, buffer, null)) {
                InputStream input = in;
                in = null;
                if (input != null)
                    input.close();
                return;
            }
        }
    }
}

另外还有JDK1.7之前的ConcurrentLinkedQueue,也有了这个机制,但是可以1.7之后切换到了Unsafe的方案, 主要还是因为性能原因,FieldUpdater的方案需要使用反射API配合,而Unsafe不用,而且有些JVM会把Unsafe的调用内联,理论上看会快很多(Todo :待测试)

//JDK1.6
private static class Node<E> {
        private volatile E item;
        private volatile Node<E> next;

        private static final
            AtomicReferenceFieldUpdater<Node, Node>
            nextUpdater =
            AtomicReferenceFieldUpdater.newUpdater
            (Node.class, Node.class, "next");
        private static final
            AtomicReferenceFieldUpdater<Node, Object>
            itemUpdater =
            AtomicReferenceFieldUpdater.newUpdater
            (Node.class, Object.class, "item");
}
//JDK1.8
private static class Node<E> {
        volatile E item;
        volatile Node<E> next;

        /**
         * Constructs a new node.  Uses relaxed write because item can
         * only be seen after publication via casNext.
         */
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }

        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }

        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        // Unsafe mechanics

        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;

        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

总结

总的来说,JDK类库里面使用较多一点,大部分场景直接使用JUC下面类似ConcurrentHashMap这些就够了,真遇到性能不满足的场景,再根据Profile的结果针对性优化。

Flag Counter

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant