diff --git a/core/src/main/java/io/grpc/internal/CensusStatsModule.java b/core/src/main/java/io/grpc/internal/CensusStatsModule.java index 37c658af657..ddf95df3884 100644 --- a/core/src/main/java/io/grpc/internal/CensusStatsModule.java +++ b/core/src/main/java/io/grpc/internal/CensusStatsModule.java @@ -153,19 +153,28 @@ ClientInterceptor getClientInterceptor(boolean recordStartedRpcs, boolean record } private static final class ClientTracer extends ClientStreamTracer { - - private static final AtomicLongFieldUpdater outboundMessageCountUpdater = - AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundMessageCount"); - private static final AtomicLongFieldUpdater inboundMessageCountUpdater = - AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundMessageCount"); - private static final AtomicLongFieldUpdater outboundWireSizeUpdater = - AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundWireSize"); - private static final AtomicLongFieldUpdater inboundWireSizeUpdater = - AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundWireSize"); - private static final AtomicLongFieldUpdater outboundUncompressedSizeUpdater = - AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundUncompressedSize"); - private static final AtomicLongFieldUpdater inboundUncompressedSizeUpdater = - AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundUncompressedSize"); + // When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in the JDK + // reflection API that triggers a NoSuchFieldException. When this occurs, fallback to a + // synchronized implementation. + private static final ClientTracerAtomicHelper atomicHelper = getAtomicHelper(); + + private static ClientTracerAtomicHelper getAtomicHelper() { + ClientTracerAtomicHelper helper; + try { + helper = + new FieldUpdaterClientTracerAtomicHelper( + AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundMessageCount"), + AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundMessageCount"), + AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundWireSize"), + AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundWireSize"), + AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundUncompressedSize"), + AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundUncompressedSize")); + } catch (Throwable t) { + logger.log(Level.WARNING, "FieldUpdaterClientTracerAtomicHelper failed", t); + helper = new SynchronizedClientTracerAtomicHelper(); + } + return helper; + } volatile long outboundMessageCount; volatile long inboundMessageCount; @@ -176,45 +185,179 @@ private static final class ClientTracer extends ClientStreamTracer { @Override public void outboundWireSize(long bytes) { - outboundWireSizeUpdater.getAndAdd(this, bytes); + atomicHelper.outboundWireSizeGetAndAdd(this, bytes); } @Override public void inboundWireSize(long bytes) { - inboundWireSizeUpdater.getAndAdd(this, bytes); + atomicHelper.inboundWireSizeGetAndAdd(this, bytes); } @Override public void outboundUncompressedSize(long bytes) { - outboundUncompressedSizeUpdater.getAndAdd(this, bytes); + atomicHelper.outboundUncompressedSizeGetAndAdd(this, bytes); } @Override public void inboundUncompressedSize(long bytes) { - inboundUncompressedSizeUpdater.getAndAdd(this, bytes); + atomicHelper.inboundUncompressedSizeGetAndAdd(this, bytes); } @Override public void inboundMessage(int seqNo) { - inboundMessageCountUpdater.getAndIncrement(this); + atomicHelper.inboundMessageCountGetAndIncrement(this); } @Override public void outboundMessage(int seqNo) { - outboundMessageCountUpdater.getAndIncrement(this); + atomicHelper.outboundMessageCountGetAndIncrement(this); } - } + private abstract static class ClientTracerAtomicHelper { + public abstract long outboundMessageCountGetAndIncrement(ClientTracer obj); + + public abstract long inboundMessageCountGetAndIncrement(ClientTracer obj); + + public abstract long outboundWireSizeGetAndAdd(ClientTracer obj, long delta); + + public abstract long inboundWireSizeGetAndAdd(ClientTracer obj, long delta); + + public abstract long outboundUncompressedSizeGetAndAdd(ClientTracer obj, long delta); + + public abstract long inboundUncompressedSizeGetAndAdd(ClientTracer obj, long delta); + } + + private static final class FieldUpdaterClientTracerAtomicHelper + extends ClientTracerAtomicHelper { + private final AtomicLongFieldUpdater outboundMessageCountUpdater; + private final AtomicLongFieldUpdater inboundMessageCountUpdater; + private final AtomicLongFieldUpdater outboundWireSizeUpdater; + private final AtomicLongFieldUpdater inboundWireSizeUpdater; + private final AtomicLongFieldUpdater outboundUncompressedSizeUpdater; + private final AtomicLongFieldUpdater inboundUncompressedSizeUpdater; + + private FieldUpdaterClientTracerAtomicHelper( + AtomicLongFieldUpdater outboundMessageCountUpdater, + AtomicLongFieldUpdater inboundMessageCountUpdater, + AtomicLongFieldUpdater outboundWireSizeUpdater, + AtomicLongFieldUpdater inboundWireSizeUpdater, + AtomicLongFieldUpdater outboundUncompressedSizeUpdater, + AtomicLongFieldUpdater inboundUncompressedSizeUpdater) { + this.outboundMessageCountUpdater = outboundMessageCountUpdater; + this.inboundMessageCountUpdater = inboundMessageCountUpdater; + this.outboundWireSizeUpdater = outboundWireSizeUpdater; + this.inboundWireSizeUpdater = inboundWireSizeUpdater; + this.outboundUncompressedSizeUpdater = outboundUncompressedSizeUpdater; + this.inboundUncompressedSizeUpdater = inboundUncompressedSizeUpdater; + } + + @Override + public long outboundMessageCountGetAndIncrement(ClientTracer obj) { + return outboundMessageCountUpdater.getAndIncrement(obj); + } + + @Override + public long inboundMessageCountGetAndIncrement(ClientTracer obj) { + return inboundMessageCountUpdater.getAndIncrement(obj); + } + + @Override + public long outboundWireSizeGetAndAdd(ClientTracer obj, long delta) { + return outboundWireSizeUpdater.getAndAdd(obj, delta); + } + + @Override + public long inboundWireSizeGetAndAdd(ClientTracer obj, long delta) { + return inboundWireSizeUpdater.getAndAdd(obj, delta); + } + + @Override + public long outboundUncompressedSizeGetAndAdd(ClientTracer obj, long delta) { + return outboundUncompressedSizeUpdater.getAndAdd(obj, delta); + } + + @Override + public long inboundUncompressedSizeGetAndAdd(ClientTracer obj, long delta) { + return inboundUncompressedSizeUpdater.getAndAdd(obj, delta); + } + } + + private static final class SynchronizedClientTracerAtomicHelper + extends ClientTracerAtomicHelper { + + @Override + public long outboundMessageCountGetAndIncrement(ClientTracer obj) { + synchronized (obj) { + return obj.outboundMessageCount++; + } + } + + @Override + public long inboundMessageCountGetAndIncrement(ClientTracer obj) { + synchronized (obj) { + return obj.inboundMessageCount++; + } + } + + @Override + public long outboundWireSizeGetAndAdd(ClientTracer obj, long delta) { + synchronized (obj) { + long prev = obj.outboundWireSize; + obj.outboundWireSize += delta; + return prev; + } + } + @Override + public long inboundWireSizeGetAndAdd(ClientTracer obj, long delta) { + synchronized (obj) { + long prev = obj.inboundWireSize; + obj.inboundWireSize += delta; + return prev; + } + } + + @Override + public long outboundUncompressedSizeGetAndAdd(ClientTracer obj, long delta) { + synchronized (obj) { + long prev = obj.outboundUncompressedSize; + obj.outboundUncompressedSize += delta; + return prev; + } + } + + @Override + public long inboundUncompressedSizeGetAndAdd(ClientTracer obj, long delta) { + synchronized (obj) { + long prev = obj.inboundUncompressedSize; + obj.inboundUncompressedSize += delta; + return prev; + } + } + } + } @VisibleForTesting static final class ClientCallTracer extends ClientStreamTracer.Factory { - private static final AtomicReferenceFieldUpdater - streamTracerUpdater = - AtomicReferenceFieldUpdater.newUpdater( - ClientCallTracer.class, ClientTracer.class, "streamTracer"); - private static final AtomicIntegerFieldUpdater callEndedUpdater = - AtomicIntegerFieldUpdater.newUpdater(ClientCallTracer.class, "callEnded"); + // When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in the JDK + // reflection API that triggers a NoSuchFieldException. When this occurs, fallback to a + // synchronized implementation. + private static final ClientCallTracerAtomicHelper atomicHelper = getAtomicHelper(); + + private static ClientCallTracerAtomicHelper getAtomicHelper() { + ClientCallTracerAtomicHelper helper; + try { + helper = + new FieldUpdaterClientCallTracerAtomicHelper( + AtomicReferenceFieldUpdater.newUpdater( + ClientCallTracer.class, ClientTracer.class, "streamTracer"), + AtomicIntegerFieldUpdater.newUpdater(ClientCallTracer.class, "callEnded")); + } catch (Throwable t) { + logger.log(Level.WARNING, "FieldUpdaterClientCallTracerAtomicHelper failed", t); + helper = new SynchronizedClientCallTracerAtomicHelper(); + } + return helper; + } private final CensusStatsModule module; private final String fullMethodName; @@ -251,7 +394,7 @@ public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadat // TODO(zhangkun83): Once retry or hedging is implemented, a ClientCall may start more than // one streams. We will need to update this file to support them. checkState( - streamTracerUpdater.compareAndSet(this, null, tracer), + atomicHelper.streamTracerCompareAndSet(this, null, tracer), "Are you creating multiple streams per call? This class doesn't yet support this case."); if (module.propagateTags) { headers.discardAll(module.statsHeader); @@ -269,7 +412,7 @@ public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadat * is a no-op. */ void callEnded(Status status) { - if (callEndedUpdater.getAndSet(this, 1) != 0) { + if (atomicHelper.callEndedGetAndSet(this, 1) != 0) { return; } if (!recordFinishedRpcs) { @@ -305,23 +448,88 @@ void callEnded(Status status) { .put(RpcMeasureConstants.RPC_STATUS, TagValue.create(status.getCode().toString())) .build()); } + + private abstract static class ClientCallTracerAtomicHelper { + public abstract boolean streamTracerCompareAndSet( + ClientCallTracer obj, ClientTracer expect, ClientTracer update); + + public abstract int callEndedGetAndSet(ClientCallTracer obj, int newValue); + } + + private static final class FieldUpdaterClientCallTracerAtomicHelper + extends ClientCallTracerAtomicHelper { + private final AtomicReferenceFieldUpdater streamTracerUpdater; + private final AtomicIntegerFieldUpdater callEndedUpdater; + + private FieldUpdaterClientCallTracerAtomicHelper( + AtomicReferenceFieldUpdater streamTracerUpdater, + AtomicIntegerFieldUpdater callEndedUpdater) { + this.streamTracerUpdater = streamTracerUpdater; + this.callEndedUpdater = callEndedUpdater; + } + + @Override + public boolean streamTracerCompareAndSet( + ClientCallTracer obj, ClientTracer expect, ClientTracer update) { + return streamTracerUpdater.compareAndSet(obj, expect, update); + } + + @Override + public int callEndedGetAndSet(ClientCallTracer obj, int newValue) { + return callEndedUpdater.getAndSet(obj, newValue); + } + } + + private static final class SynchronizedClientCallTracerAtomicHelper + extends ClientCallTracerAtomicHelper { + + @Override + public boolean streamTracerCompareAndSet( + ClientCallTracer obj, ClientTracer expect, ClientTracer update) { + synchronized (obj) { + if (obj.streamTracer == expect) { + obj.streamTracer = update; + return true; + } + return false; + } + } + + @Override + public int callEndedGetAndSet(ClientCallTracer obj, int newValue) { + synchronized (obj) { + int prev = obj.callEnded; + obj.callEnded = newValue; + return prev; + } + } + } } private static final class ServerTracer extends ServerStreamTracer { - private static final AtomicIntegerFieldUpdater streamClosedUpdater = - AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed"); - private static final AtomicLongFieldUpdater outboundMessageCountUpdater = - AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundMessageCount"); - private static final AtomicLongFieldUpdater inboundMessageCountUpdater = - AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundMessageCount"); - private static final AtomicLongFieldUpdater outboundWireSizeUpdater = - AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundWireSize"); - private static final AtomicLongFieldUpdater inboundWireSizeUpdater = - AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundWireSize"); - private static final AtomicLongFieldUpdater outboundUncompressedSizeUpdater = - AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundUncompressedSize"); - private static final AtomicLongFieldUpdater inboundUncompressedSizeUpdater = - AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundUncompressedSize"); + // When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in the JDK + // reflection API that triggers a NoSuchFieldException. When this occurs, fallback to a + // synchronized implementation. + private static final ServerTracerAtomicHelper atomicHelper = getAtomicHelper(); + + private static ServerTracerAtomicHelper getAtomicHelper() { + ServerTracerAtomicHelper helper; + try { + helper = + new FieldUpdaterServerTracerAtomicHelper( + AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed"), + AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundMessageCount"), + AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundMessageCount"), + AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundWireSize"), + AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundWireSize"), + AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundUncompressedSize"), + AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundUncompressedSize")); + } catch (Throwable t) { + logger.log(Level.WARNING, "FieldUpdaterServerTracerAtomicHelper failed", t); + helper = new SynchronizedServerTracerAtomicHelper(); + } + return helper; + } private final CensusStatsModule module; private final String fullMethodName; @@ -359,32 +567,32 @@ private static final class ServerTracer extends ServerStreamTracer { @Override public void outboundWireSize(long bytes) { - outboundWireSizeUpdater.getAndAdd(this, bytes); + atomicHelper.outboundWireSizeGetAndAdd(this, bytes); } @Override public void inboundWireSize(long bytes) { - inboundWireSizeUpdater.getAndAdd(this, bytes); + atomicHelper.inboundWireSizeGetAndAdd(this, bytes); } @Override public void outboundUncompressedSize(long bytes) { - outboundUncompressedSizeUpdater.getAndAdd(this, bytes); + atomicHelper.outboundUncompressedSizeGetAndAdd(this, bytes); } @Override public void inboundUncompressedSize(long bytes) { - inboundUncompressedSizeUpdater.getAndAdd(this, bytes); + atomicHelper.inboundUncompressedSizeGetAndAdd(this, bytes); } @Override public void inboundMessage(int seqNo) { - inboundMessageCountUpdater.getAndIncrement(this); + atomicHelper.inboundMessageCountGetAndIncrement(this); } @Override public void outboundMessage(int seqNo) { - outboundMessageCountUpdater.getAndIncrement(this); + atomicHelper.outboundMessageCountGetAndIncrement(this); } /** @@ -395,7 +603,7 @@ public void outboundMessage(int seqNo) { */ @Override public void streamClosed(Status status) { - if (streamClosedUpdater.getAndSet(this, 1) != 0) { + if (atomicHelper.streamClosedGetAndSet(this, 1) != 0) { return; } if (!recordFinishedRpcs) { @@ -431,6 +639,148 @@ public Context filterContext(Context context) { } return context; } + + private abstract static class ServerTracerAtomicHelper { + public abstract int streamClosedGetAndSet(ServerTracer obj, int newValue); + + public abstract long outboundMessageCountGetAndIncrement(ServerTracer obj); + + public abstract long inboundMessageCountGetAndIncrement(ServerTracer obj); + + public abstract long outboundWireSizeGetAndAdd(ServerTracer obj, long delta); + + public abstract long inboundWireSizeGetAndAdd(ServerTracer obj, long delta); + + public abstract long outboundUncompressedSizeGetAndAdd(ServerTracer obj, long delta); + + public abstract long inboundUncompressedSizeGetAndAdd(ServerTracer obj, long delta); + } + + private static final class FieldUpdaterServerTracerAtomicHelper + extends ServerTracerAtomicHelper { + private final AtomicIntegerFieldUpdater streamClosedUpdater; + private final AtomicLongFieldUpdater outboundMessageCountUpdater; + private final AtomicLongFieldUpdater inboundMessageCountUpdater; + private final AtomicLongFieldUpdater outboundWireSizeUpdater; + private final AtomicLongFieldUpdater inboundWireSizeUpdater; + private final AtomicLongFieldUpdater outboundUncompressedSizeUpdater; + private final AtomicLongFieldUpdater inboundUncompressedSizeUpdater; + + private FieldUpdaterServerTracerAtomicHelper( + AtomicIntegerFieldUpdater streamClosedUpdater, + AtomicLongFieldUpdater outboundMessageCountUpdater, + AtomicLongFieldUpdater inboundMessageCountUpdater, + AtomicLongFieldUpdater outboundWireSizeUpdater, + AtomicLongFieldUpdater inboundWireSizeUpdater, + AtomicLongFieldUpdater outboundUncompressedSizeUpdater, + AtomicLongFieldUpdater inboundUncompressedSizeUpdater) { + this.streamClosedUpdater = streamClosedUpdater; + this.outboundMessageCountUpdater = outboundMessageCountUpdater; + this.inboundMessageCountUpdater = inboundMessageCountUpdater; + this.outboundWireSizeUpdater = outboundWireSizeUpdater; + this.inboundWireSizeUpdater = inboundWireSizeUpdater; + this.outboundUncompressedSizeUpdater = outboundUncompressedSizeUpdater; + this.inboundUncompressedSizeUpdater = inboundUncompressedSizeUpdater; + } + + @Override + public int streamClosedGetAndSet(ServerTracer obj, int newValue) { + return streamClosedUpdater.getAndSet(obj, newValue); + } + + @Override + public long outboundMessageCountGetAndIncrement(ServerTracer obj) { + return outboundMessageCountUpdater.getAndIncrement(obj); + } + + @Override + public long inboundMessageCountGetAndIncrement(ServerTracer obj) { + return inboundMessageCountUpdater.getAndIncrement(obj); + } + + @Override + public long outboundWireSizeGetAndAdd(ServerTracer obj, long delta) { + return outboundWireSizeUpdater.getAndAdd(obj, delta); + } + + @Override + public long inboundWireSizeGetAndAdd(ServerTracer obj, long delta) { + return inboundWireSizeUpdater.getAndAdd(obj, delta); + } + + @Override + public long outboundUncompressedSizeGetAndAdd(ServerTracer obj, long delta) { + return outboundUncompressedSizeUpdater.getAndAdd(obj, delta); + } + + @Override + public long inboundUncompressedSizeGetAndAdd(ServerTracer obj, long delta) { + return inboundUncompressedSizeUpdater.getAndAdd(obj, delta); + } + } + + private static final class SynchronizedServerTracerAtomicHelper + extends ServerTracerAtomicHelper { + + @Override + public int streamClosedGetAndSet(ServerTracer obj, int newValue) { + synchronized (obj) { + int prev = obj.streamClosed; + obj.streamClosed = newValue; + return prev; + } + } + + @Override + public long outboundMessageCountGetAndIncrement(ServerTracer obj) { + synchronized (obj) { + return obj.outboundMessageCount++; + } + } + + @Override + public long inboundMessageCountGetAndIncrement(ServerTracer obj) { + synchronized (obj) { + return obj.inboundMessageCount++; + } + } + + @Override + public long outboundWireSizeGetAndAdd(ServerTracer obj, long delta) { + synchronized (obj) { + long prev = obj.outboundWireSize; + obj.outboundWireSize += delta; + return prev; + } + } + + @Override + public long inboundWireSizeGetAndAdd(ServerTracer obj, long delta) { + synchronized (obj) { + long prev = obj.inboundWireSize; + obj.inboundWireSize += delta; + return prev; + } + } + + @Override + public long outboundUncompressedSizeGetAndAdd(ServerTracer obj, long delta) { + synchronized (obj) { + long prev = obj.outboundUncompressedSize; + obj.outboundUncompressedSize += delta; + return prev; + } + } + + @Override + public long inboundUncompressedSizeGetAndAdd(ServerTracer obj, long delta) { + synchronized (obj) { + long prev = obj.inboundUncompressedSize; + obj.inboundUncompressedSize += delta; + return prev; + } + } + } } @VisibleForTesting diff --git a/core/src/main/java/io/grpc/internal/CensusTracingModule.java b/core/src/main/java/io/grpc/internal/CensusTracingModule.java index 626abb4aecc..26e5f199afc 100644 --- a/core/src/main/java/io/grpc/internal/CensusTracingModule.java +++ b/core/src/main/java/io/grpc/internal/CensusTracingModule.java @@ -58,10 +58,25 @@ */ final class CensusTracingModule { private static final Logger logger = Logger.getLogger(CensusTracingModule.class.getName()); - private static final AtomicIntegerFieldUpdater callEndedUpdater = - AtomicIntegerFieldUpdater.newUpdater(ClientCallTracer.class, "callEnded"); - private static final AtomicIntegerFieldUpdater streamClosedUpdater = - AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed"); + + // When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in the JDK + // reflection API that triggers a NoSuchFieldException. When this occurs, fallback to a + // synchronized implementation. + private static final AtomicHelper atomicHelper = getAtomicHelper(); + + private static AtomicHelper getAtomicHelper() { + AtomicHelper helper; + try { + helper = + new FieldUpdaterAtomicHelper( + AtomicIntegerFieldUpdater.newUpdater(ClientCallTracer.class, "callEnded"), + AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed")); + } catch (Throwable t) { + logger.log(Level.WARNING, "FieldUpdaterAtomicHelper failed", t); + helper = new SynchronizedAtomicHelper(); + } + return helper; + } private final Tracer censusTracer; @VisibleForTesting @@ -232,7 +247,7 @@ public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadat * is a no-op. */ void callEnded(io.grpc.Status status) { - if (callEndedUpdater.getAndSet(this, 1) != 0) { + if (atomicHelper.callEndedGetAndSet(this, 1) != 0) { return; } span.end(createEndSpanOptions(status, isSampledToLocalTracing)); @@ -291,7 +306,7 @@ public void serverCallStarted(ServerCall call) { */ @Override public void streamClosed(io.grpc.Status status) { - if (streamClosedUpdater.getAndSet(this, 1) != 0) { + if (atomicHelper.streamClosedGetAndSet(this, 1) != 0) { return; } span.end(createEndSpanOptions(status, isSampledToLocalTracing)); @@ -376,4 +391,51 @@ static String generateTraceSpanName(boolean isServer, String fullMethodName) { return prefix + "." + fullMethodName.replace('/', '.'); } + private abstract static class AtomicHelper { + public abstract int callEndedGetAndSet(ClientCallTracer obj, int newValue); + + public abstract int streamClosedGetAndSet(ServerTracer obj, int newValue); + } + + private static final class FieldUpdaterAtomicHelper extends AtomicHelper { + private final AtomicIntegerFieldUpdater callEndedUpdater; + private final AtomicIntegerFieldUpdater streamClosedUpdater; + + private FieldUpdaterAtomicHelper( + AtomicIntegerFieldUpdater callEndedUpdater, + AtomicIntegerFieldUpdater streamClosedUpdater) { + this.callEndedUpdater = callEndedUpdater; + this.streamClosedUpdater = streamClosedUpdater; + } + + @Override + public int callEndedGetAndSet(ClientCallTracer obj, int newValue) { + return callEndedUpdater.getAndSet(obj, newValue); + } + + @Override + public int streamClosedGetAndSet(ServerTracer obj, int newValue) { + return streamClosedUpdater.getAndSet(obj, newValue); + } + } + + private static final class SynchronizedAtomicHelper extends AtomicHelper { + @Override + public int callEndedGetAndSet(ClientCallTracer obj, int newValue) { + synchronized (obj) { + int prev = obj.callEnded; + obj.callEnded = newValue; + return prev; + } + } + + @Override + public int streamClosedGetAndSet(ServerTracer obj, int newValue) { + synchronized (obj) { + int prev = obj.streamClosed; + obj.streamClosed = newValue; + return prev; + } + } + } } diff --git a/core/src/main/java/io/grpc/internal/SerializingExecutor.java b/core/src/main/java/io/grpc/internal/SerializingExecutor.java index e5957452cf9..b3a87fa8fa5 100644 --- a/core/src/main/java/io/grpc/internal/SerializingExecutor.java +++ b/core/src/main/java/io/grpc/internal/SerializingExecutor.java @@ -37,8 +37,24 @@ public final class SerializingExecutor implements Executor, Runnable { private static final Logger log = Logger.getLogger(SerializingExecutor.class.getName()); - private static final AtomicIntegerFieldUpdater runStateUpdater = - AtomicIntegerFieldUpdater.newUpdater(SerializingExecutor.class, "runState"); + // When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in the JDK + // reflection API that triggers a NoSuchFieldException. When this occurs, fallback to a + // synchronized implementation. + private static final AtomicHelper atomicHelper = getAtomicHelper(); + + private static AtomicHelper getAtomicHelper() { + AtomicHelper helper; + try { + helper = + new FieldUpdaterAtomicHelper( + AtomicIntegerFieldUpdater.newUpdater(SerializingExecutor.class, "runState")); + } catch (Throwable t) { + log.log(Level.WARNING, "FieldUpdaterAtomicHelper failed", t); + helper = new SynchronizedAtomicHelper(); + } + return helper; + } + private static final int STOPPED = 0; private static final int RUNNING = -1; @@ -71,7 +87,7 @@ public void execute(Runnable r) { } private void schedule(@Nullable Runnable removable) { - if (runStateUpdater.compareAndSet(this, STOPPED, RUNNING)) { + if (atomicHelper.runStateCompareAndSet(this, STOPPED, RUNNING)) { boolean success = false; try { executor.execute(this); @@ -92,7 +108,7 @@ private void schedule(@Nullable Runnable removable) { // to execute don't succeed and accidentally run a previous runnable. runQueue.remove(removable); } - runStateUpdater.set(this, STOPPED); + atomicHelper.runStateSet(this, STOPPED); } } } @@ -111,11 +127,56 @@ public void run() { } } } finally { - runStateUpdater.set(this, STOPPED); + atomicHelper.runStateSet(this, STOPPED); } if (!runQueue.isEmpty()) { // we didn't enqueue anything but someone else did. schedule(null); } } + + private abstract static class AtomicHelper { + public abstract boolean runStateCompareAndSet(SerializingExecutor obj, int expect, int update); + + public abstract void runStateSet(SerializingExecutor obj, int newValue); + } + + private static final class FieldUpdaterAtomicHelper extends AtomicHelper { + private final AtomicIntegerFieldUpdater runStateUpdater; + + private FieldUpdaterAtomicHelper( + AtomicIntegerFieldUpdater runStateUpdater) { + this.runStateUpdater = runStateUpdater; + } + + @Override + public boolean runStateCompareAndSet(SerializingExecutor obj, int expect, int update) { + return runStateUpdater.compareAndSet(obj, expect, update); + } + + @Override + public void runStateSet(SerializingExecutor obj, int newValue) { + runStateUpdater.set(obj, newValue); + } + } + + private static final class SynchronizedAtomicHelper extends AtomicHelper { + @Override + public boolean runStateCompareAndSet(SerializingExecutor obj, int expect, int update) { + synchronized (obj) { + if (obj.runState == expect) { + obj.runState = update; + return true; + } + return false; + } + } + + @Override + public void runStateSet(SerializingExecutor obj, int newValue) { + synchronized (obj) { + obj.runState = newValue; + } + } + } }