Skip to content

Commit

Permalink
sampling: fix pubsub implementation (#5126)
Browse files Browse the repository at this point in the history
* sampling: fix pubsub implementation

The initial implementation was written as a ~quick hack, with the
expectation that it would be replaced by the Changes API. It was
broken due to its ignorance of data streams, and multi-shard indices.
Sequence numbers are only comparable within a single shard.

Given that there is no known delivery date for the Changes API,
we propose to instead revise the pubsub implementation to address
the problems by:

 - enforcing single-shard indices for sampled trace data streams
 - searching (now single-shard) backing indices individually

In addition, we now use global checkpoints to bound searches, and
use PIT (point in time) for paging through results. Querying underlying
indices and global checkpoints requires an additional "monitor" index
privilege.

* sampling/pubsub: remove PIT again

Simplify by just using direct searches with a rnage on _seq_no,
using the most recently observed _seq_no value as the lower bound.
We can do this within the loop as well (i.e. until there are no
more results, or we've observed the global checkpoint.)

* sampling/pubsub: only query get metric from _stats

* pubsub: force-refresh indices

Refresh indices after observing an updated global checkpoint
to ensure document visibility is correct up to the observed
global checkpoint.

* Update changelog

* systemtest: fix spurious test failure

(cherry picked from commit 94e3201)

# Conflicts:
#	apmpackage/apm/0.2.0/data_stream/sampled_traces/manifest.yml
#	changelogs/head.asciidoc
#	x-pack/apm-server/sampling/pubsub/pubsub.go
#	x-pack/apm-server/sampling/pubsub/pubsub_test.go
#	x-pack/apm-server/sampling/pubsub/pubsubtest/client.go
  • Loading branch information
axw authored and mergify-bot committed Jul 8, 2021
1 parent e369a0e commit fea9e29
Show file tree
Hide file tree
Showing 5 changed files with 304 additions and 0 deletions.
11 changes: 11 additions & 0 deletions apmpackage/apm/0.2.0/data_stream/sampled_traces/manifest.yml
@@ -0,0 +1,11 @@
title: APM tail-sampled traces
type: traces
dataset: sampled
ilm_policy: traces-apm.sampled-default_policy
elasticsearch:
index_template:
settings:
# Create a single shard per index, so we can use
# global checkpoints as a way of limiting search
# results.
number_of_shards: 1
28 changes: 28 additions & 0 deletions changelogs/head.asciidoc
@@ -0,0 +1,28 @@
[[release-notes-head]]
== APM Server version HEAD

https://github.com/elastic/apm-server/compare/7.13\...master[View commits]

[float]
==== Breaking Changes

[float]
==== Bug fixes
* Don't auto-disable ILM due to a failure to communicate with Elasticsearch {pull}5264[5264]
* Fix panic due to misaligned 64-bit access on 32-bit architectures {pull}5277[5277]
* Fixed tail-based sampling pubsub to use _seq_no correctly {pull}5126[5126]
* Fix document grouping of translated OpenTelemetry Java metrics {pull}5309[5309]
* OpenTelemetry: record array attributes as labels {pull}5286[5286]
* model/modeldecoder: fix 32-bit timestamp decoding {pull}5308[5308]

[float]
==== Intake API Changes

[float]
==== Added
* Support setting agent configuration from apm-server.yml {pull}5177[5177]
* Add metric_type and unit to field metadata of system metrics {pull}5230[5230]
* Upgrade Go to 1.15.12 {pull}[]

[float]
==== Deprecated
119 changes: 119 additions & 0 deletions x-pack/apm-server/sampling/pubsub/pubsub.go
Expand Up @@ -27,9 +27,12 @@ import (
logs "github.com/elastic/apm-server/log"
)

<<<<<<< HEAD
// ErrClosed may be returned by Pubsub methods after the Close method is called.
var ErrClosed = errors.New("pubsub closed")

=======
>>>>>>> 94e3201a (sampling: fix pubsub implementation (#5126))
var errIndexNotFound = errors.New("index not found")

// Pubsub provides a means of publishing and subscribing to sampled trace IDs,
Expand Down Expand Up @@ -71,6 +74,14 @@ func (p *Pubsub) PublishSampledTraceIDs(ctx context.Context, traceIDs <-chan str
if err != nil {
return err
}
<<<<<<< HEAD
=======
return &Pubsub{
config: config,
indexer: indexer,
}, nil
}
>>>>>>> 94e3201a (sampling: fix pubsub implementation (#5126))

var closeIndexerOnce sync.Once
var closeIndexerErr error
Expand Down Expand Up @@ -121,12 +132,16 @@ func (p *Pubsub) SubscribeSampledTraceIDs(
ticker := time.NewTicker(p.config.SearchInterval)
defer ticker.Stop()

<<<<<<< HEAD
// Only send positions on change.
var positionsOut chan<- SubscriberPosition
positionsOut = positions

// Copy pos because it may be mutated by p.searchTraceIDs.
pos = copyPosition(pos)
=======
observedSeqnos := make(map[string]int64)
>>>>>>> 94e3201a (sampling: fix pubsub implementation (#5126))
for {
select {
case <-ctx.Done():
Expand All @@ -136,6 +151,7 @@ func (p *Pubsub) SubscribeSampledTraceIDs(
pos = copyPosition(pos)
positionsOut = nil
case <-ticker.C:
<<<<<<< HEAD
changed, err := p.searchTraceIDs(ctx, traceIDs, pos.observedSeqnos)
if err != nil {
// Errors may occur due to rate limiting, or while the index is
Expand All @@ -146,6 +162,13 @@ func (p *Pubsub) SubscribeSampledTraceIDs(
if changed {
positionsOut = positions
}
=======
}
if err := p.searchTraceIDs(ctx, traceIDs, observedSeqnos); err != nil {
// Errors may occur due to rate limiting, or while the index is
// still being created, so just log and continue.
p.config.Logger.With(logp.Error(err)).Debug("error searching for trace IDs")
>>>>>>> 94e3201a (sampling: fix pubsub implementation (#5126))
}
}
}
Expand All @@ -159,17 +182,25 @@ func (p *Pubsub) SubscribeSampledTraceIDs(
//
// Immediately after observing an updated global checkpoint we will force-refresh indices to ensure all documents
// up to the global checkpoint are visible in proceeding searches.
<<<<<<< HEAD
func (p *Pubsub) searchTraceIDs(ctx context.Context, out chan<- string, observedSeqnos map[string]int64) (bool, error) {
globalCheckpoints, err := getGlobalCheckpoints(ctx, p.config.Client, p.config.DataStream.String())
if err != nil {
return false, err
=======
func (p *Pubsub) searchTraceIDs(ctx context.Context, out chan<- string, observedSeqnos map[string]int64) error {
globalCheckpoints, err := getGlobalCheckpoints(ctx, p.config.Client, p.config.DataStream.String())
if err != nil {
return err
>>>>>>> 94e3201a (sampling: fix pubsub implementation (#5126))
}

// Remove old indices from the observed _seq_no map.
for index := range observedSeqnos {
if _, ok := globalCheckpoints[index]; !ok {
delete(observedSeqnos, index)
}
<<<<<<< HEAD
}

// Force-refresh the indices with updated global checkpoints.
Expand Down Expand Up @@ -210,14 +241,60 @@ func (p *Pubsub) searchTraceIDs(ctx context.Context, out chan<- string, observed
}
return nil
})
=======
>>>>>>> 94e3201a (sampling: fix pubsub implementation (#5126))
}
return changed, g.Wait()
}

<<<<<<< HEAD
func (p *Pubsub) refreshIndices(ctx context.Context, indices []string) error {
if len(indices) == 0 {
return nil
}
=======
// Force-refresh the indices with updated global checkpoints.
indices := make([]string, 0, len(globalCheckpoints))
for index, globalCheckpoint := range globalCheckpoints {
observedSeqno, ok := observedSeqnos[index]
if ok && globalCheckpoint <= observedSeqno {
delete(globalCheckpoints, index)
continue
}
indices = append(indices, index)
}
if err := p.refreshIndices(ctx, indices); err != nil {
return err
}

g, ctx := errgroup.WithContext(ctx)
for _, index := range indices {
globalCheckpoint := globalCheckpoints[index]
observedSeqno, ok := observedSeqnos[index]
if !ok {
observedSeqno = -1
}
index := index // copy for closure
g.Go(func() error {
maxSeqno, err := p.searchIndexTraceIDs(ctx, out, index, observedSeqno, globalCheckpoint)
if err != nil {
return err
}
if maxSeqno > observedSeqno {
observedSeqno = maxSeqno
}
observedSeqnos[index] = observedSeqno
return nil
})
}
return g.Wait()
}

func (p *Pubsub) refreshIndices(ctx context.Context, indices []string) error {
if len(indices) == 0 {
return nil
}
>>>>>>> 94e3201a (sampling: fix pubsub implementation (#5126))
ignoreUnavailable := true
resp, err := esapi.IndicesRefreshRequest{
Index: indices,
Expand Down Expand Up @@ -287,6 +364,47 @@ func (p *Pubsub) searchIndexTraceIDs(ctx context.Context, out chan<- string, ind
Sort []interface{} `json:"sort"`
}
}
<<<<<<< HEAD
}
if err := p.doSearchRequest(ctx, index, esutil.NewJSONReader(searchBody), &result); err != nil {
if err == errIndexNotFound {
// Index was deleted.
break
}
return -1, err
}
if len(result.Hits.Hits) == 0 {
break
}
for _, hit := range result.Hits.Hits {
select {
case <-ctx.Done():
return -1, ctx.Err()
case out <- hit.Source.Trace.ID:
}
}
maxObservedSeqno = result.Hits.Hits[len(result.Hits.Hits)-1].Seqno
}
return maxObservedSeqno, nil
}

func (p *Pubsub) doSearchRequest(ctx context.Context, index string, body io.Reader, out interface{}) error {
resp, err := esapi.SearchRequest{
Index: []string{index},
Body: body,
}.Do(ctx, p.config.Client)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.IsError() {
if resp.StatusCode == http.StatusNotFound {
return errIndexNotFound
}
message, _ := ioutil.ReadAll(resp.Body)
return fmt.Errorf("search request failed: %s", message)
}
=======
}
if err := p.doSearchRequest(ctx, index, esutil.NewJSONReader(searchBody), &result); err != nil {
if err == errIndexNotFound {
Expand Down Expand Up @@ -326,6 +444,7 @@ func (p *Pubsub) doSearchRequest(ctx context.Context, index string, body io.Read
message, _ := ioutil.ReadAll(resp.Body)
return fmt.Errorf("search request failed: %s", message)
}
>>>>>>> 94e3201a (sampling: fix pubsub implementation (#5126))
return json.NewDecoder(resp.Body).Decode(out)
}

Expand Down

0 comments on commit fea9e29

Please sign in to comment.