Skip to content

Commit

Permalink
Don't spam logs when RPC spans are in use.
Browse files Browse the repository at this point in the history
As mentionned in openzipkin/openzipkin.github.io#49, timestamp should be set on span originators.
In the case of a span sent from a mid-tier server (e.g. receiving a request from a instrumented service), it is not supposed to set the timestamp, leading to a log message when using cassandra as storage.
This log can be only wrote when the span is an originator. It is still trying to guess the correct timestamp to cope with non-compliant tracers.
  • Loading branch information
Pierre-Hugues Jeanneret committed Dec 28, 2016
1 parent c45b744 commit 700c23b
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin.Annotation;
import zipkin.Codec;
import zipkin.Constants;
import zipkin.Span;
import zipkin.internal.Nullable;
import zipkin.internal.Pair;
Expand Down Expand Up @@ -112,9 +114,12 @@ public ListenableFuture<Void> accept(List<Span> rawSpans) {
Long timestamp = guessTimestamp(span);
spans.add(span);

boolean isServerRecvSpan = isServerRecvSpan(span);

futures.add(storeSpan(
span.traceId,
timestamp != null ? timestamp : 0L,
isServerRecvSpan,
String.format("%s%d_%d_%d",
span.traceIdHigh == 0 ? "" : span.traceIdHigh + "_",
span.id,
Expand All @@ -136,12 +141,24 @@ public ListenableFuture<Void> accept(List<Span> rawSpans) {
return transform(Futures.allAsList(futures.build()), TO_VOID);
}

private static boolean isServerRecvSpan(Span span) {
for (int i = 0, length = span.annotations.size(); i < length; i++) {
Annotation annotation = span.annotations.get(i);
if (annotation.value.equals(Constants.SERVER_RECV)) {
return true;
}
}
return false;
}

/**
* Store the span in the underlying storage for later retrieval.
*/
ListenableFuture<?> storeSpan(long traceId, long timestamp, String key, ByteBuffer span) {
ListenableFuture<?> storeSpan(long traceId, long timestamp, boolean isServerRecvSpan, String key, ByteBuffer span) {
try {
if (0 == timestamp && metadata.compactionClass.contains("DateTieredCompactionStrategy")) {
// If we couldn't guess the timestamp, that probably means that there was a missing timestamp.
// However, tracers are supposed to put a timestamp *only* on the span originator (not on SR annotation)
if (0 == timestamp && !isServerRecvSpan && metadata.compactionClass.contains("DateTieredCompactionStrategy")) {
LOG.warn("Span {} in trace {} had no timestamp. "
+ "If this happens a lot consider switching back to SizeTieredCompactionStrategy for "
+ "{}.traces", key, traceId, session.getLoggedKeyspace());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,38 @@
*/
package zipkin.storage.cassandra;

import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.LoggingEvent;
import ch.qos.logback.core.Appender;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import java.util.stream.IntStream;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.slf4j.LoggerFactory;
import zipkin.Annotation;
import zipkin.Constants;
import zipkin.Span;
import zipkin.TestObjects;

import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static zipkin.Constants.CLIENT_RECV;
import static zipkin.Constants.CLIENT_SEND;
import static zipkin.TestObjects.APP_ENDPOINT;

public class CassandraSpanConsumerTest {

private final CassandraStorage storage;
private final Appender mockAppender = mock(Appender.class);

public CassandraSpanConsumerTest() {
this.storage = CassandraTestGraph.INSTANCE.storage.get();
Expand All @@ -37,6 +53,15 @@ public CassandraSpanConsumerTest() {
@Before
public void clear() {
storage.clear();
Logger root = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
when(mockAppender.getName()).thenReturn(CassandraSpanConsumerTest.class.getName());
root.addAppender(mockAppender);
}

@After
public void tearDown() {
Logger root = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
root.detachAppender(mockAppender);
}

/**
Expand All @@ -61,6 +86,31 @@ public void doesntIndexCoreOrNonStringAnnotations() {
assertThat(rowCount(Tables.ANNOTATIONS_INDEX)).isZero();
}

@Test
public void logTimestampMissingOnClientSend() {
Span span = Span.builder().traceId(1L).parentId(1L).id(2L).name("query")
.addAnnotation(Annotation.create(0L, CLIENT_SEND, APP_ENDPOINT))
.addAnnotation(Annotation.create(0L, CLIENT_RECV, APP_ENDPOINT)).build();
accept(span);
verify(mockAppender).doAppend(considerSwitchStrategyLog());
}

@Test
public void dontLogTimestampMissingOnMidTierServerSpan() {
Span span = TestObjects.TRACE.get(0);
accept(span);
verify(mockAppender, never()).doAppend(considerSwitchStrategyLog());
}

private static Object considerSwitchStrategyLog() {
return argThat(new ArgumentMatcher() {
@Override
public boolean matches(final Object argument) {
return ((LoggingEvent)argument).getFormattedMessage().contains("If this happens a lot consider switching back to SizeTieredCompactionStrategy");
}
});
}

/**
* Simulates a trace with a step pattern, where each span starts a millisecond after the prior
* one. The consumer code optimizes index inserts to only represent the interval represented by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.slf4j.LoggerFactory;
import zipkin.Annotation;
import zipkin.BinaryAnnotation;
import zipkin.Constants;
import zipkin.Span;
import zipkin.storage.cassandra3.Schema.AnnotationUDT;
import zipkin.storage.cassandra3.Schema.BinaryAnnotationUDT;
Expand Down Expand Up @@ -102,7 +103,8 @@ public ListenableFuture<Void> accept(List<Span> rawSpans) {
// indexing occurs by timestamp, so derive one if not present.
Long timestamp = guessTimestamp(span);
TraceIdUDT traceId = new TraceIdUDT(span.traceIdHigh, span.traceId);
futures.add(storeSpan(span, traceId, timestamp));
boolean isServerRecvSpan = isServerRecvSpan(span);
futures.add(storeSpan(span, traceId, isServerRecvSpan, timestamp));

for (String serviceName : span.serviceNames()) {
// QueryRequest.min/maxDuration
Expand All @@ -121,12 +123,23 @@ public ListenableFuture<Void> accept(List<Span> rawSpans) {
return transform(Futures.allAsList(futures.build()), TO_VOID);
}

private static boolean isServerRecvSpan(Span span) {
for (int i = 0, length = span.annotations.size(); i < length; i++) {
Annotation annotation = span.annotations.get(i);
if (annotation.value.equals(Constants.SERVER_RECV)) {
return true;
}
}
return false;
}

/**
* Store the span in the underlying storage for later retrieval.
*/
ListenableFuture<?> storeSpan(Span span, TraceIdUDT traceId, Long timestamp) {
ListenableFuture<?> storeSpan(Span span, TraceIdUDT traceId, boolean isServerRecvSpan, Long timestamp) {
try {
if ((null == timestamp || 0 == timestamp)
&& !isServerRecvSpan
&& metadata.compactionClass.contains("TimeWindowCompactionStrategy")) {

LOG.warn("Span {} in trace {} had no timestamp. "
Expand All @@ -145,7 +158,7 @@ ListenableFuture<?> storeSpan(Span span, TraceIdUDT traceId, Long timestamp) {
Set<String> annotationKeys = CassandraUtil.annotationKeys(span);

if (!strictTraceId && traceId.getHigh() != 0L) {
storeSpan(span, new TraceIdUDT(0L, traceId.getLow()), timestamp);
storeSpan(span, new TraceIdUDT(0L, traceId.getLow()), isServerRecvSpan, timestamp);
}

BoundStatement bound = bindWithName(insertSpan, "insert-span")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,37 @@
*/
package zipkin.storage.cassandra3;

import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.LoggingEvent;
import ch.qos.logback.core.Appender;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import java.util.stream.IntStream;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.slf4j.LoggerFactory;
import zipkin.Annotation;
import zipkin.Span;
import zipkin.TestObjects;

import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static zipkin.Constants.CLIENT_RECV;
import static zipkin.Constants.CLIENT_SEND;
import static zipkin.TestObjects.APP_ENDPOINT;

public class CassandraSpanConsumerTest {

private final Cassandra3Storage storage;
private final Appender mockAppender = mock(Appender.class);

public CassandraSpanConsumerTest() {
this.storage = Cassandra3TestGraph.INSTANCE.storage.get();
Expand All @@ -36,6 +52,15 @@ public CassandraSpanConsumerTest() {
@Before
public void clear() {
storage.clear();
Logger root = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
when(mockAppender.getName()).thenReturn(CassandraSpanConsumerTest.class.getName());
root.addAppender(mockAppender);
}

@After
public void tearDown() {
Logger root = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
root.detachAppender(mockAppender);
}

/**
Expand All @@ -51,6 +76,31 @@ public void doesntIndexSpansMissingDuration() {
assertThat(rowCount(Schema.TABLE_TRACE_BY_SERVICE_SPAN)).isZero();
}

@Test
public void logTimestampMissingOnClientSend() {
Span span = Span.builder().traceId(1L).parentId(1L).id(2L).name("query")
.addAnnotation(Annotation.create(0L, CLIENT_SEND, APP_ENDPOINT))
.addAnnotation(Annotation.create(0L, CLIENT_RECV, APP_ENDPOINT)).build();
accept(span);
verify(mockAppender).doAppend(considerSwitchStrategyLog());
}

@Test
public void dontLogTimestampMissingOnMidTierServerSpan() {
Span span = TestObjects.TRACE.get(0);
accept(span);
verify(mockAppender, never()).doAppend(considerSwitchStrategyLog());
}

private static Object considerSwitchStrategyLog() {
return argThat(new ArgumentMatcher() {
@Override
public boolean matches(final Object argument) {
return ((LoggingEvent)argument).getFormattedMessage().contains("If this happens a lot consider switching back to SizeTieredCompactionStrategy");
}
});
}

/**
* Simulates a trace with a step pattern, where each span starts a millisecond after the prior
* one. The consumer code optimizes index inserts to only represent the interval represented by
Expand Down

0 comments on commit 700c23b

Please sign in to comment.