Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Vulture for query verification #904

Merged
merged 14 commits into from Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -29,6 +29,7 @@
* [ENHANCEMENT] Make s3 backend readError logic more robust [#905](https://github.com/grafana/tempo/pull/905) (@wei840222)
* [ENHANCEMENT] Include additional detail when searching for traces [#916](https://github.com/grafana/tempo/pull/916) (@zalegrala)
* [ENHANCEMENT] Add `gen index` and `gen bloom` commands to tempo-cli. [#903](https://github.com/grafana/tempo/pull/903) (@annanay25)
* [ENHANCEMENT] Implement trace comparison in Vulture [#904](https://github.com/grafana/tempo/pull/904) (@zalegrala)
* [CHANGE] Renamed CLI flag from `--storage.trace.maintenance-cycle` to `--storage.trace.blocklist_poll`. This is a **breaking change** [#897](https://github.com/grafana/tempo/pull/897) (@mritunjaysharma394)

## v1.1.0 / 2021-08-26
Expand Down
213 changes: 156 additions & 57 deletions cmd/tempo-vulture/main.go
Expand Up @@ -10,15 +10,20 @@ import (
"net/http"
"net/url"
"os"
"reflect"
"time"

"github.com/go-test/deep"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util"
jaeger_grpc "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc"
thrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger"
zaplogfmt "github.com/jsternberg/zap-logfmt"

"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/weaveworks/common/user"
jaegerTrans "go.opentelemetry.io/collector/translator/trace/jaeger"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
Expand All @@ -39,10 +44,11 @@ var (
)

type traceMetrics struct {
requested int
requestFailed int
notFound int
missingSpans int
requested int
requestFailed int
notFound int
missingSpans int
incorrectResult int
}

func init() {
Expand All @@ -69,10 +75,11 @@ func main() {

logger.Info("Tempo Vulture starting")

startTime := time.Now().Unix()
actualStartTime := time.Now()
startTime := actualStartTime
tickerWrite := time.NewTicker(tempoWriteBackoffDuration)
tickerRead := time.NewTicker(tempoReadBackoffDuration)
interval := int64(tempoWriteBackoffDuration / time.Second)
interval := tempoWriteBackoffDuration

// Write
go func() {
Expand All @@ -81,32 +88,31 @@ func main() {
panic(err)
}

for {
<-tickerWrite.C
for now := range tickerWrite.C {
timestamp := now.Round(interval)
r := newRand(timestamp)

seed := (time.Now().Unix() / interval) * interval
rand.Seed(seed)
traceIDHigh := rand.Int63()
traceIDLow := rand.Int63()
traceIDHigh := r.Int63()
traceIDLow := r.Int63()

logger := logger.With(
log := logger.With(
zap.String("org_id", tempoOrgID),
zap.String("write_trace_id", fmt.Sprintf("%016x%016x", traceIDLow, traceIDHigh)),
zap.Int64("seed", seed),
zap.String("write_trace_id", fmt.Sprintf("%016x%016x", traceIDHigh, traceIDLow)),
zap.Int64("seed", timestamp.Unix()),
)
logger.Info("sending trace")
log.Info("sending trace")

for i := int64(0); i < generateRandomInt(1, 100); i++ {
for i := int64(0); i < generateRandomInt(1, 100, r); i++ {
ctx := user.InjectOrgID(context.Background(), tempoOrgID)
ctx, err := user.InjectIntoGRPCRequest(ctx)
if err != nil {
logger.Error("error injecting org id", zap.Error(err))
log.Error("error injecting org id", zap.Error(err))
metricErrorTotal.Inc()
continue
}
err = c.EmitBatch(ctx, makeThriftBatch(traceIDHigh, traceIDLow))
err = c.EmitBatch(ctx, makeThriftBatch(traceIDHigh, traceIDLow, r, timestamp))
if err != nil {
logger.Error("error pushing batch to Tempo", zap.Error(err))
log.Error("error pushing batch to Tempo", zap.Error(err))
metricErrorTotal.Inc()
continue
}
Expand All @@ -116,20 +122,23 @@ func main() {

// Read
go func() {
for {
<-tickerRead.C

currentTime := time.Now().Unix()
for now := range tickerRead.C {
var seed time.Time
startTime, seed = selectPastTimestamp(startTime, now, interval, tempoRetentionDuration)

// Don't attempt to read on the first itteration if we can't reasonably
// expect the write loop to have fired yet.
if seed.Before(actualStartTime.Add(tempoWriteBackoffDuration)) {
continue
}

// don't query traces before retention
if (currentTime - startTime) > int64(tempoRetentionDuration/time.Second) {
startTime = currentTime - int64(tempoRetentionDuration/time.Second)
// Don't attempt to read future traces.
if seed.After(now) {
continue
}

// pick past interval and re-generate trace
seed := (generateRandomInt(startTime, currentTime) / interval) * interval
rand.Seed(seed)
hexID := fmt.Sprintf("%016x%016x", rand.Int63(), rand.Int63())
r := newRand(seed)
hexID := fmt.Sprintf("%016x%016x", r.Int63(), r.Int63())

// query the trace
metrics, err := queryTempoAndAnalyze(tempoQueryURL, seed, hexID)
Expand All @@ -141,19 +150,39 @@ func main() {
metricTracesErrors.WithLabelValues("requestfailed").Add(float64(metrics.requestFailed))
metricTracesErrors.WithLabelValues("notfound").Add(float64(metrics.notFound))
metricTracesErrors.WithLabelValues("missingspans").Add(float64(metrics.missingSpans))
metricTracesErrors.WithLabelValues("incorrectresult").Add(float64(metrics.incorrectResult))
}
}()

http.Handle(prometheusPath, promhttp.Handler())
log.Fatal(http.ListenAndServe(prometheusListenAddress, nil))
}

func selectPastTimestamp(start, stop time.Time, interval time.Duration, retention time.Duration) (newStart, ts time.Time) {
oldest := stop.Add(-retention)

if oldest.After(start) {
newStart = oldest
} else {
newStart = start
}

ts = time.Unix(generateRandomInt(newStart.Unix(), stop.Unix(), newRand(start)), 0)

return newStart.Round(interval), ts.Round(interval)
}

func newJaegerGRPCClient(endpoint string) (*jaeger_grpc.Reporter, error) {
// remove scheme and port
u, err := url.Parse(endpoint)
if err != nil {
return nil, err
}

logger.Info("dialing grpc",
zap.String("endpoint", fmt.Sprintf("%s:14250", u.Host)),
)

// new jaeger grpc exporter
conn, err := grpc.Dial(u.Host+":14250", grpc.WithInsecure())
if err != nil {
Expand All @@ -162,79 +191,83 @@ func newJaegerGRPCClient(endpoint string) (*jaeger_grpc.Reporter, error) {
return jaeger_grpc.NewReporter(conn, nil, logger), err
}

func generateRandomString() string {
func newRand(t time.Time) *rand.Rand {
return rand.New(rand.NewSource(t.Unix()))
}

func generateRandomString(r *rand.Rand) string {
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

s := make([]rune, generateRandomInt(5, 20))
s := make([]rune, generateRandomInt(5, 20, r))
for i := range s {
s[i] = letters[rand.Intn(len(letters))]
s[i] = letters[r.Intn(len(letters))]
}
return string(s)
}

func generateRandomTags() []*thrift.Tag {
func generateRandomTags(r *rand.Rand) []*thrift.Tag {
var tags []*thrift.Tag
count := generateRandomInt(1, 5)
count := generateRandomInt(1, 5, r)
for i := int64(0); i < count; i++ {
value := generateRandomString()
value := generateRandomString(r)
tags = append(tags, &thrift.Tag{
Key: generateRandomString(),
Key: generateRandomString(r),
VStr: &value,
})
}
return tags
}

func generateRandomLogs() []*thrift.Log {
func generateRandomLogs(r *rand.Rand, now time.Time) []*thrift.Log {
var logs []*thrift.Log
count := generateRandomInt(1, 5)
count := generateRandomInt(1, 5, r)
for i := int64(0); i < count; i++ {
logs = append(logs, &thrift.Log{
Timestamp: time.Now().Unix(),
Fields: generateRandomTags(),
Timestamp: now.Unix(),
Fields: generateRandomTags(r),
})
}
return logs
}

func makeThriftBatch(TraceIDHigh int64, TraceIDLow int64) *thrift.Batch {
func makeThriftBatch(TraceIDHigh int64, TraceIDLow int64, r *rand.Rand, now time.Time) *thrift.Batch {
var spans []*thrift.Span
count := generateRandomInt(1, 5)
count := generateRandomInt(1, 5, r)
for i := int64(0); i < count; i++ {
spans = append(spans, &thrift.Span{
TraceIdLow: TraceIDLow,
TraceIdHigh: TraceIDHigh,
SpanId: rand.Int63(),
SpanId: r.Int63(),
ParentSpanId: 0,
OperationName: generateRandomString(),
OperationName: generateRandomString(r),
References: nil,
Flags: 0,
StartTime: time.Now().Unix(),
Duration: rand.Int63(),
Tags: generateRandomTags(),
Logs: generateRandomLogs(),
StartTime: now.Unix(),
Duration: generateRandomInt(0, 100, r),
Tags: generateRandomTags(r),
Logs: generateRandomLogs(r, now),
})
}

return &thrift.Batch{Spans: spans}
}

func generateRandomInt(min int64, max int64) int64 {
number := min + rand.Int63n(max-min)
if number == min {
return generateRandomInt(min, max)
}
func generateRandomInt(min int64, max int64, r *rand.Rand) int64 {
min++
number := min + r.Int63n(max-min)
return number
}

func queryTempoAndAnalyze(baseURL string, seed int64, traceID string) (traceMetrics, error) {
func queryTempoAndAnalyze(baseURL string, seed time.Time, traceID string) (traceMetrics, error) {
tm := traceMetrics{
requested: 1,
}

logger := logger.With(
zap.String("query_trace_id", traceID),
zap.String("tempo_query_url", baseURL+"/api/traces/"+traceID),
zap.Int64("seed", seed),
zap.Int64("seed", seed.Unix()),
zap.Duration("ago", time.Since(seed)),
)
logger.Info("querying Tempo")

Expand All @@ -252,17 +285,42 @@ func queryTempoAndAnalyze(baseURL string, seed int64, traceID string) (traceMetr
if len(trace.Batches) == 0 {
logger.Error("trace contains 0 batches")
tm.notFound++
return tm, nil
}

// iterate through
if hasMissingSpans(trace) {
logger.Error("trace has missing spans")
tm.missingSpans++
return tm, nil
}

// Get the expected
expected := constructTraceFromEpoch(seed)

match := equalTraces(expected, trace)
if !match {
tm.incorrectResult++
if diff := deep.Equal(expected, trace); diff != nil {
for _, d := range diff {
logger.Error("incorrect result",
zap.String("expected -> response", d),
)
}
}
return tm, nil
}

return tm, nil
}

func equalTraces(a, b *tempopb.Trace) bool {
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
model.SortTrace(a)
model.SortTrace(b)

return reflect.DeepEqual(a, b)
}

func hasMissingSpans(t *tempopb.Trace) bool {
// collect all parent span IDs
linkedSpanIDs := make([][]byte, 0)
Expand Down Expand Up @@ -299,3 +357,44 @@ func hasMissingSpans(t *tempopb.Trace) bool {

return false
}

func constructTraceFromEpoch(epoch time.Time) *tempopb.Trace {
r := newRand(epoch)
traceIDHigh := r.Int63()
traceIDLow := r.Int63()

trace := &tempopb.Trace{}

for i := int64(0); i < generateRandomInt(1, 100, r); i++ {
batch := makeThriftBatch(traceIDHigh, traceIDLow, r, epoch)
internalTrace := jaegerTrans.ThriftBatchToInternalTraces(batch)
conv, err := internalTrace.ToOtlpProtoBytes()
if err != nil {
logger.Error(err.Error())
}

t := tempopb.Trace{}
err = t.Unmarshal(conv)
if err != nil {
logger.Error(err.Error())
}

// Due to the several transforms above, some manual mangling is required to
// get the parentSpanID to match. In the case of an empty []byte in place
// for the ParentSpanId, we set to nil here to ensure that the final result
// matches the json.Unmarshal value when tempo is queried.
for ib, b := range t.Batches {
for il, l := range b.InstrumentationLibrarySpans {
for is, s := range l.Spans {
if len(s.GetParentSpanId()) == 0 {
t.Batches[ib].InstrumentationLibrarySpans[il].Spans[is].ParentSpanId = nil
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}

trace.Batches = append(trace.Batches, t.Batches...)
}

return trace
}