Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 236 additions & 48 deletions core/src/main/java/io/grpc/internal/CensusStatsModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;

/**
* Provides factories for {@link StreamTracer} that records stats to Census.
Expand Down Expand Up @@ -154,18 +155,58 @@ ClientInterceptor getClientInterceptor(boolean recordStartedRpcs, boolean record

private static final class ClientTracer extends ClientStreamTracer {

private static final AtomicLongFieldUpdater<ClientTracer> outboundMessageCountUpdater =
AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundMessageCount");
private static final AtomicLongFieldUpdater<ClientTracer> inboundMessageCountUpdater =
AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundMessageCount");
private static final AtomicLongFieldUpdater<ClientTracer> outboundWireSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundWireSize");
private static final AtomicLongFieldUpdater<ClientTracer> inboundWireSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundWireSize");
private static final AtomicLongFieldUpdater<ClientTracer> outboundUncompressedSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundUncompressedSize");
private static final AtomicLongFieldUpdater<ClientTracer> inboundUncompressedSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundUncompressedSize");
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> outboundMessageCountUpdater;
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> inboundMessageCountUpdater;
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> outboundWireSizeUpdater;
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> inboundWireSizeUpdater;

@Nullable
private static final AtomicLongFieldUpdater<ClientTracer> outboundUncompressedSizeUpdater;

@Nullable
private static final AtomicLongFieldUpdater<ClientTracer> inboundUncompressedSizeUpdater;

/**
* When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
* JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to
* (potentially racy) direct updates of the volatile variables.
*/
static {
AtomicLongFieldUpdater<ClientTracer> tmpOutboundMessageCountUpdater;
AtomicLongFieldUpdater<ClientTracer> tmpInboundMessageCountUpdater;
AtomicLongFieldUpdater<ClientTracer> tmpOutboundWireSizeUpdater;
AtomicLongFieldUpdater<ClientTracer> tmpInboundWireSizeUpdater;
AtomicLongFieldUpdater<ClientTracer> tmpOutboundUncompressedSizeUpdater;
AtomicLongFieldUpdater<ClientTracer> tmpInboundUncompressedSizeUpdater;
try {
tmpOutboundMessageCountUpdater =
AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundMessageCount");
tmpInboundMessageCountUpdater =
AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundMessageCount");
tmpOutboundWireSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundWireSize");
tmpInboundWireSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundWireSize");
tmpOutboundUncompressedSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundUncompressedSize");
tmpInboundUncompressedSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundUncompressedSize");
} catch (Throwable t) {
logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
tmpOutboundMessageCountUpdater = null;
tmpInboundMessageCountUpdater = null;
tmpOutboundWireSizeUpdater = null;
tmpInboundWireSizeUpdater = null;
tmpOutboundUncompressedSizeUpdater = null;
tmpInboundUncompressedSizeUpdater = null;
}
outboundMessageCountUpdater = tmpOutboundMessageCountUpdater;
inboundMessageCountUpdater = tmpInboundMessageCountUpdater;
outboundWireSizeUpdater = tmpOutboundWireSizeUpdater;
inboundWireSizeUpdater = tmpInboundWireSizeUpdater;
outboundUncompressedSizeUpdater = tmpOutboundUncompressedSizeUpdater;
inboundUncompressedSizeUpdater = tmpInboundUncompressedSizeUpdater;
}

volatile long outboundMessageCount;
volatile long inboundMessageCount;
Expand All @@ -175,46 +216,98 @@ private static final class ClientTracer extends ClientStreamTracer {
volatile long inboundUncompressedSize;

@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void outboundWireSize(long bytes) {
outboundWireSizeUpdater.getAndAdd(this, bytes);
if (outboundWireSizeUpdater != null) {
outboundWireSizeUpdater.getAndAdd(this, bytes);
} else {
outboundWireSize += bytes;
}
}

@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void inboundWireSize(long bytes) {
inboundWireSizeUpdater.getAndAdd(this, bytes);
if (inboundWireSizeUpdater != null) {
inboundWireSizeUpdater.getAndAdd(this, bytes);
} else {
inboundWireSize += bytes;
}
}

@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void outboundUncompressedSize(long bytes) {
outboundUncompressedSizeUpdater.getAndAdd(this, bytes);
if (outboundUncompressedSizeUpdater != null) {
outboundUncompressedSizeUpdater.getAndAdd(this, bytes);
} else {
outboundUncompressedSize += bytes;
}
}

@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void inboundUncompressedSize(long bytes) {
inboundUncompressedSizeUpdater.getAndAdd(this, bytes);
if (inboundUncompressedSizeUpdater != null) {
inboundUncompressedSizeUpdater.getAndAdd(this, bytes);
} else {
inboundUncompressedSize += bytes;
}
}

@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void inboundMessage(int seqNo) {
inboundMessageCountUpdater.getAndIncrement(this);
if (inboundMessageCountUpdater != null) {
inboundMessageCountUpdater.getAndIncrement(this);
} else {
inboundMessageCount++;
}
}

@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void outboundMessage(int seqNo) {
outboundMessageCountUpdater.getAndIncrement(this);
if (outboundMessageCountUpdater != null) {
outboundMessageCountUpdater.getAndIncrement(this);
} else {
outboundMessageCount++;
}
}
}



@VisibleForTesting
static final class ClientCallTracer extends ClientStreamTracer.Factory {
@Nullable
private static final AtomicReferenceFieldUpdater<ClientCallTracer, ClientTracer>
streamTracerUpdater =
streamTracerUpdater;

@Nullable private static final AtomicIntegerFieldUpdater<ClientCallTracer> callEndedUpdater;

/**
* When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
* JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to
* (potentially racy) direct updates of the volatile variables.
*/
static {
AtomicReferenceFieldUpdater<ClientCallTracer, ClientTracer> tmpStreamTracerUpdater;
AtomicIntegerFieldUpdater<ClientCallTracer> tmpCallEndedUpdater;
try {
tmpStreamTracerUpdater =
AtomicReferenceFieldUpdater.newUpdater(
ClientCallTracer.class, ClientTracer.class, "streamTracer");
private static final AtomicIntegerFieldUpdater<ClientCallTracer> callEndedUpdater =
AtomicIntegerFieldUpdater.newUpdater(ClientCallTracer.class, "callEnded");
tmpCallEndedUpdater =
AtomicIntegerFieldUpdater.newUpdater(ClientCallTracer.class, "callEnded");
} catch (Throwable t) {
logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
tmpStreamTracerUpdater = null;
tmpCallEndedUpdater = null;
}
streamTracerUpdater = tmpStreamTracerUpdater;
callEndedUpdater = tmpCallEndedUpdater;
}

private final CensusStatsModule module;
private final String fullMethodName;
Expand Down Expand Up @@ -250,9 +343,16 @@ public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadat
ClientTracer tracer = new ClientTracer();
// TODO(zhangkun83): Once retry or hedging is implemented, a ClientCall may start more than
// one streams. We will need to update this file to support them.
checkState(
streamTracerUpdater.compareAndSet(this, null, tracer),
"Are you creating multiple streams per call? This class doesn't yet support this case.");
if (streamTracerUpdater != null) {
checkState(
streamTracerUpdater.compareAndSet(this, null, tracer),
"Are you creating multiple streams per call? This class doesn't yet support this case");
} else {
checkState(
streamTracer == null,
"Are you creating multiple streams per call? This class doesn't yet support this case");
streamTracer = tracer;
}
if (module.propagateTags) {
headers.discardAll(module.statsHeader);
if (!module.tagger.empty().equals(parentCtx)) {
Expand All @@ -269,8 +369,15 @@ public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadat
* is a no-op.
*/
void callEnded(Status status) {
if (callEndedUpdater.getAndSet(this, 1) != 0) {
return;
if (callEndedUpdater != null) {
if (callEndedUpdater.getAndSet(this, 1) != 0) {
return;
}
} else {
if (callEnded != 0) {
return;
}
callEnded = 1;
}
if (!recordFinishedRpcs) {
return;
Expand Down Expand Up @@ -308,20 +415,64 @@ void callEnded(Status status) {
}

private static final class ServerTracer extends ServerStreamTracer {
private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater =
AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed");
private static final AtomicLongFieldUpdater<ServerTracer> outboundMessageCountUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundMessageCount");
private static final AtomicLongFieldUpdater<ServerTracer> inboundMessageCountUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundMessageCount");
private static final AtomicLongFieldUpdater<ServerTracer> outboundWireSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundWireSize");
private static final AtomicLongFieldUpdater<ServerTracer> inboundWireSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundWireSize");
private static final AtomicLongFieldUpdater<ServerTracer> outboundUncompressedSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundUncompressedSize");
private static final AtomicLongFieldUpdater<ServerTracer> inboundUncompressedSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundUncompressedSize");
@Nullable private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater;
@Nullable private static final AtomicLongFieldUpdater<ServerTracer> outboundMessageCountUpdater;
@Nullable private static final AtomicLongFieldUpdater<ServerTracer> inboundMessageCountUpdater;
@Nullable private static final AtomicLongFieldUpdater<ServerTracer> outboundWireSizeUpdater;
@Nullable private static final AtomicLongFieldUpdater<ServerTracer> inboundWireSizeUpdater;

@Nullable
private static final AtomicLongFieldUpdater<ServerTracer> outboundUncompressedSizeUpdater;

@Nullable
private static final AtomicLongFieldUpdater<ServerTracer> inboundUncompressedSizeUpdater;

/**
* When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
* JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to
* (potentially racy) direct updates of the volatile variables.
*/
static {
AtomicIntegerFieldUpdater<ServerTracer> tmpStreamClosedUpdater;
AtomicLongFieldUpdater<ServerTracer> tmpOutboundMessageCountUpdater;
AtomicLongFieldUpdater<ServerTracer> tmpInboundMessageCountUpdater;
AtomicLongFieldUpdater<ServerTracer> tmpOutboundWireSizeUpdater;
AtomicLongFieldUpdater<ServerTracer> tmpInboundWireSizeUpdater;
AtomicLongFieldUpdater<ServerTracer> tmpOutboundUncompressedSizeUpdater;
AtomicLongFieldUpdater<ServerTracer> tmpInboundUncompressedSizeUpdater;
try {
tmpStreamClosedUpdater =
AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed");
tmpOutboundMessageCountUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundMessageCount");
tmpInboundMessageCountUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundMessageCount");
tmpOutboundWireSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundWireSize");
tmpInboundWireSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundWireSize");
tmpOutboundUncompressedSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundUncompressedSize");
tmpInboundUncompressedSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundUncompressedSize");
} catch (Throwable t) {
logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
tmpStreamClosedUpdater = null;
tmpOutboundMessageCountUpdater = null;
tmpInboundMessageCountUpdater = null;
tmpOutboundWireSizeUpdater = null;
tmpInboundWireSizeUpdater = null;
tmpOutboundUncompressedSizeUpdater = null;
tmpInboundUncompressedSizeUpdater = null;
}
streamClosedUpdater = tmpStreamClosedUpdater;
outboundMessageCountUpdater = tmpOutboundMessageCountUpdater;
inboundMessageCountUpdater = tmpInboundMessageCountUpdater;
outboundWireSizeUpdater = tmpOutboundWireSizeUpdater;
inboundWireSizeUpdater = tmpInboundWireSizeUpdater;
outboundUncompressedSizeUpdater = tmpOutboundUncompressedSizeUpdater;
inboundUncompressedSizeUpdater = tmpInboundUncompressedSizeUpdater;
}

private final CensusStatsModule module;
private final String fullMethodName;
Expand Down Expand Up @@ -358,33 +509,63 @@ private static final class ServerTracer extends ServerStreamTracer {
}

@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void outboundWireSize(long bytes) {
outboundWireSizeUpdater.getAndAdd(this, bytes);
if (outboundWireSizeUpdater != null) {
outboundWireSizeUpdater.getAndAdd(this, bytes);
} else {
outboundWireSize += bytes;
}
}

@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void inboundWireSize(long bytes) {
inboundWireSizeUpdater.getAndAdd(this, bytes);
if (inboundWireSizeUpdater != null) {
inboundWireSizeUpdater.getAndAdd(this, bytes);
} else {
inboundWireSize += bytes;
}
}

@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void outboundUncompressedSize(long bytes) {
outboundUncompressedSizeUpdater.getAndAdd(this, bytes);
if (outboundUncompressedSizeUpdater != null) {
outboundUncompressedSizeUpdater.getAndAdd(this, bytes);
} else {
outboundUncompressedSize += bytes;
}
}

@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void inboundUncompressedSize(long bytes) {
inboundUncompressedSizeUpdater.getAndAdd(this, bytes);
if (inboundUncompressedSizeUpdater != null) {
inboundUncompressedSizeUpdater.getAndAdd(this, bytes);
} else {
inboundUncompressedSize += bytes;
}
}

@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void inboundMessage(int seqNo) {
inboundMessageCountUpdater.getAndIncrement(this);
if (inboundMessageCountUpdater != null) {
inboundMessageCountUpdater.getAndIncrement(this);
} else {
inboundMessageCount++;
}
}

@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void outboundMessage(int seqNo) {
outboundMessageCountUpdater.getAndIncrement(this);
if (outboundMessageCountUpdater != null) {
outboundMessageCountUpdater.getAndIncrement(this);
} else {
outboundMessageCount++;
}
}

/**
Expand All @@ -395,8 +576,15 @@ public void outboundMessage(int seqNo) {
*/
@Override
public void streamClosed(Status status) {
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
return;
if (streamClosedUpdater != null) {
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
return;
}
} else {
if (streamClosed != 0) {
return;
}
streamClosed = 1;
}
if (!recordFinishedRpcs) {
return;
Expand Down
Loading