Skip to content

Commit

Permalink
Loki-Canary: Backoff retries on query failures, add histograms for qu…
Browse files Browse the repository at this point in the history
…ery performance. (#2413)

* Canary needs to wait before spot-checking entries some reasonable amount of time else the nature of the async thread doing the testing can ask for them from Loki only a few ms after they are written and they would have no chance of being present.
Adding two histograms for tracking the performance of the spot check and metric test queries

* Adding backoff to queries if they fail specifically waiting 5mins when a 429 is received from Loki

* fix order of parameters
  • Loading branch information
slim-bean authored Jul 24, 2020
1 parent 43f2ef3 commit 204e61a
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 16 deletions.
3 changes: 2 additions & 1 deletion cmd/loki-canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func main() {
"e.g. 15min default one entry every 15 min will be saved and then queried again every 15min until spot-check-max is reached")
spotCheckMax := flag.Duration("spot-check-max", 4*time.Hour, "How far back to check a spot check entry before dropping it")
spotCheckQueryRate := flag.Duration("spot-check-query-rate", 1*time.Minute, "Interval that the canary will query Loki for the current list of all spot check entries")
spotCheckWait := flag.Duration("spot-check-initial-wait", 10*time.Second, "How long should the spot check query wait before starting to check for entries")

printVersion := flag.Bool("version", false, "Print this builds version information")

Expand Down Expand Up @@ -84,7 +85,7 @@ func main() {

c.writer = writer.NewWriter(os.Stdout, sentChan, *interval, *size)
c.reader = reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *queryTimeout, *lName, *lVal, *sName, *sValue, *interval)
c.comparator = comparator.NewComparator(os.Stderr, *wait, *maxWait, *pruneInterval, *spotCheckInterval, *spotCheckMax, *spotCheckQueryRate, *metricTestInterval, *metricTestQueryRange, *interval, *buckets, sentChan, receivedChan, c.reader, true)
c.comparator = comparator.NewComparator(os.Stderr, *wait, *maxWait, *pruneInterval, *spotCheckInterval, *spotCheckMax, *spotCheckQueryRate, *spotCheckWait, *metricTestInterval, *metricTestQueryRange, *interval, *buckets, sentChan, receivedChan, c.reader, true)
}

startCanary()
Expand Down
1 change: 1 addition & 0 deletions docs/sources/operations/upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ loki_canary_missing_entries -> loki_canary_missing_entries_tota
loki_canary_unexpected_entries -> loki_canary_unexpected_entries_total
loki_canary_duplicate_entries -> loki_canary_duplicate_entries_total
loki_canary_ws_reconnects -> loki_canary_ws_reconnects_total
loki_canary_response_latency -> loki_canary_response_latency_seconds
```

## 1.5.0
Expand Down
29 changes: 26 additions & 3 deletions pkg/canary/comparator/comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/instrument"

"github.com/grafana/loki/pkg/canary/reader"
)
Expand Down Expand Up @@ -75,7 +76,19 @@ var (
Name: "metric_test_actual",
Help: "How many counts were actually received by the metric test query",
})
responseLatency prometheus.Histogram
responseLatency prometheus.Histogram
metricTestLatency = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki_canary",
Name: "metric_test_request_duration_seconds",
Help: "how long the metric test query execution took in seconds.",
Buckets: instrument.DefBuckets,
})
spotTestLatency = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki_canary",
Name: "spot_check_request_duration_seconds",
Help: "how long the spot check test query execution took in seconds.",
Buckets: instrument.DefBuckets,
})
)

type Comparator struct {
Expand All @@ -97,6 +110,7 @@ type Comparator struct {
spotCheckInterval time.Duration
spotCheckMax time.Duration
spotCheckQueryRate time.Duration
spotCheckWait time.Duration
spotCheckRunning bool
metricTestInterval time.Duration
metricTestRange time.Duration
Expand All @@ -115,7 +129,7 @@ func NewComparator(writer io.Writer,
wait time.Duration,
maxWait time.Duration,
pruneInterval time.Duration,
spotCheckInterval, spotCheckMax, spotCheckQueryRate time.Duration,
spotCheckInterval, spotCheckMax, spotCheckQueryRate, spotCheckWait time.Duration,
metricTestInterval time.Duration,
metricTestRange time.Duration,
writeInterval time.Duration,
Expand All @@ -135,6 +149,7 @@ func NewComparator(writer io.Writer,
spotCheckInterval: spotCheckInterval,
spotCheckMax: spotCheckMax,
spotCheckQueryRate: spotCheckQueryRate,
spotCheckWait: spotCheckWait,
spotCheckRunning: false,
metricTestInterval: metricTestInterval,
metricTestRange: metricTestRange,
Expand All @@ -152,7 +167,7 @@ func NewComparator(writer io.Writer,
if responseLatency == nil {
responseLatency = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki_canary",
Name: "response_latency",
Name: "response_latency_seconds",
Help: "is how long it takes for log lines to be returned from Loki in seconds.",
Buckets: prometheus.ExponentialBuckets(0.5, 2, buckets),
})
Expand Down Expand Up @@ -292,7 +307,9 @@ func (c *Comparator) metricTest(currTime time.Time) {
if currTime.Add(-c.metricTestRange).Before(c.startTime) {
adjustedRange = currTime.Sub(c.startTime)
}
begin := time.Now()
actualCount, err := c.rdr.QueryCountOverTime(fmt.Sprintf("%.0fs", adjustedRange.Seconds()))
metricTestLatency.Observe(time.Since(begin).Seconds())
if err != nil {
fmt.Fprintf(c.w, "error running metric query test: %s\n", err.Error())
return
Expand Down Expand Up @@ -326,13 +343,19 @@ func (c *Comparator) spotCheckEntries(currTime time.Time) {
c.spotEntMtx.Unlock()

for _, sce := range cpy {
// Make sure enough time has passed to start checking for this entry
if currTime.Sub(*sce) < c.spotCheckWait {
continue
}
spotCheckEntries.Inc()
// Because we are querying loki timestamps vs the timestamp in the log,
// make the range +/- 10 seconds to allow for clock inaccuracies
start := *sce
adjustedStart := start.Add(-10 * time.Second)
adjustedEnd := start.Add(10 * time.Second)
begin := time.Now()
recvd, err := c.rdr.Query(adjustedStart, adjustedEnd)
spotTestLatency.Observe(time.Since(begin).Seconds())
if err != nil {
fmt.Fprintf(c.w, "error querying loki: %s\n", err)
return
Expand Down
33 changes: 21 additions & 12 deletions pkg/canary/comparator/comparator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestComparatorEntryReceivedOutOfOrder(t *testing.T) {
duplicateEntries = &mockCounter{}

actual := &bytes.Buffer{}
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)

t1 := time.Now()
t2 := t1.Add(1 * time.Second)
Expand Down Expand Up @@ -60,7 +60,7 @@ func TestComparatorEntryReceivedNotExpected(t *testing.T) {
duplicateEntries = &mockCounter{}

actual := &bytes.Buffer{}
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)

t1 := time.Now()
t2 := t1.Add(1 * time.Second)
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestComparatorEntryReceivedDuplicate(t *testing.T) {
duplicateEntries = &mockCounter{}

actual := &bytes.Buffer{}
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)

t1 := time.Unix(0, 0)
t2 := t1.Add(1 * time.Second)
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestEntryNeverReceived(t *testing.T) {
wait := 60 * time.Second
maxWait := 300 * time.Second
//We set the prune interval timer to a huge value here so that it never runs, instead we call pruneEntries manually below
c := NewComparator(actual, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false)
c := NewComparator(actual, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false)

c.entrySent(t1)
c.entrySent(t2)
Expand Down Expand Up @@ -223,7 +223,7 @@ func TestPruneAckdEntires(t *testing.T) {
wait := 30 * time.Millisecond
maxWait := 30 * time.Millisecond
//We set the prune interval timer to a huge value here so that it never runs, instead we call pruneEntries manually below
c := NewComparator(actual, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)
c := NewComparator(actual, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)

t1 := time.Unix(0, 0)
t2 := t1.Add(1 * time.Millisecond)
Expand Down Expand Up @@ -278,25 +278,34 @@ func TestSpotCheck(t *testing.T) {

mr := &mockReader{resp: found}
spotCheck := 10 * time.Millisecond
spotCheckMax := 10 * time.Millisecond
spotCheckMax := 20 * time.Millisecond
//We set the prune interval timer to a huge value here so that it never runs, instead we call spotCheckEntries manually below
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 50*time.Hour, spotCheck, spotCheckMax, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 50*time.Hour, spotCheck, spotCheckMax, 4*time.Hour, 3*time.Millisecond, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false)

// Send all the entries
for i := range entries {
c.entrySent(entries[i])
}

// Should include the following entries
assert.Equal(t, 3, len(c.spotCheck))
assert.Equal(t, time.Unix(0, 0), *c.spotCheck[0])
assert.Equal(t, time.Unix(0, 10*time.Millisecond.Nanoseconds()), *c.spotCheck[1])
assert.Equal(t, time.Unix(0, 20*time.Millisecond.Nanoseconds()), *c.spotCheck[2])

// Run with "current time" 11ms after start which will prune the first entry which is no "before" the 10ms spot check max
c.spotCheckEntries(time.Unix(0, 11*time.Millisecond.Nanoseconds()))
// Run with "current time" 1ms after start which is less than spotCheckWait so nothing should be checked
c.spotCheckEntries(time.Unix(0, 2*time.Millisecond.Nanoseconds()))
assert.Equal(t, 3, len(c.spotCheck))
assert.Equal(t, 0, spotCheckEntries.(*mockCounter).count)

// Run with "current time" at 25ms, the first entry should be pruned, the second entry should be found, and the last entry should come back as missing
c.spotCheckEntries(time.Unix(0, 25*time.Millisecond.Nanoseconds()))

// First entry should have been pruned
// First entry should have been pruned, second and third entries have not expired yet
assert.Equal(t, 2, len(c.spotCheck))

expected := fmt.Sprintf(ErrSpotCheckEntryNotReceived, // List entry not received from Loki
entries[20].UnixNano(), "-9ms")
entries[20].UnixNano(), "5ms")

// We didn't send the last entry and our initial counter did not start at 0 so we should get back entries 1-19
for i := 1; i < 20; i++ {
Expand All @@ -322,7 +331,7 @@ func TestMetricTest(t *testing.T) {
mr := &mockReader{}
metricTestRange := 30 * time.Second
//We set the prune interval timer to a huge value here so that it never runs, instead we call spotCheckEntries manually below
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 50*time.Hour, 0, 0, 4*time.Hour, 10*time.Minute, metricTestRange, writeInterval, 1, make(chan time.Time), make(chan time.Time), mr, false)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 50*time.Hour, 0, 0, 4*time.Hour, 0, 10*time.Minute, metricTestRange, writeInterval, 1, make(chan time.Time), make(chan time.Time), mr, false)
// Force the start time to a known value
c.startTime = time.Unix(10, 0)

Expand Down
69 changes: 69 additions & 0 deletions pkg/canary/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/gorilla/websocket"
json "github.com/json-iterator/go"
"github.com/pkg/errors"
Expand Down Expand Up @@ -56,6 +58,9 @@ type Reader struct {
sValue string
lName string
lVal string
backoff *util.Backoff
nextQuery time.Time
backoffMtx sync.RWMutex
interval time.Duration
conn *websocket.Conn
w io.Writer
Expand All @@ -82,6 +87,14 @@ func NewReader(writer io.Writer,
h = http.Header{"Authorization": {"Basic " + base64.StdEncoding.EncodeToString([]byte(user+":"+pass))}}
}

next := time.Now()
bkcfg := util.BackoffConfig{
MinBackoff: 1 * time.Second,
MaxBackoff: 10 * time.Minute,
MaxRetries: 0,
}
bkoff := util.NewBackoff(context.Background(), bkcfg)

rd := Reader{
header: h,
tls: tls,
Expand All @@ -93,6 +106,8 @@ func NewReader(writer io.Writer,
sValue: streamValue,
lName: labelName,
lVal: labelVal,
nextQuery: next,
backoff: bkoff,
interval: interval,
w: writer,
recv: receivedChan,
Expand Down Expand Up @@ -123,7 +138,20 @@ func (r *Reader) Stop() {
}
}

// QueryCountOverTime will ask Loki for a count of logs over the provided range e.g. 5m
// QueryCountOverTime blocks if a previous query has failed until the appropriate backoff time has been reached.
func (r *Reader) QueryCountOverTime(queryRange string) (float64, error) {
r.backoffMtx.RLock()
next := r.nextQuery
r.backoffMtx.RUnlock()
for time.Now().Before(next) {
time.Sleep(50 * time.Millisecond)
// Update next in case other queries have tried and failed
r.backoffMtx.RLock()
next = r.nextQuery
r.backoffMtx.RUnlock()
}

scheme := "http"
if r.tls {
scheme = "https"
Expand Down Expand Up @@ -159,9 +187,17 @@ func (r *Reader) QueryCountOverTime(queryRange string) (float64, error) {
}()

if resp.StatusCode/100 != 2 {
r.backoffMtx.Lock()
r.nextQuery = nextBackoff(r.w, resp.StatusCode, r.backoff)
r.backoffMtx.Unlock()
buf, _ := ioutil.ReadAll(resp.Body)
return 0, fmt.Errorf("error response from server: %s (%v)", string(buf), err)
}
// No Errors, reset backoff
r.backoffMtx.Lock()
r.backoff.Reset()
r.backoffMtx.Unlock()

var decoded loghttp.QueryResponse
err = json.NewDecoder(resp.Body).Decode(&decoded)
if err != nil {
Expand All @@ -187,7 +223,20 @@ func (r *Reader) QueryCountOverTime(queryRange string) (float64, error) {
return ret, nil
}

// Query will ask Loki for all canary timestamps in the requested timerange.
// Query blocks if a previous query has failed until the appropriate backoff time has been reached.
func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
r.backoffMtx.RLock()
next := r.nextQuery
r.backoffMtx.RUnlock()
for time.Now().Before(next) {
time.Sleep(50 * time.Millisecond)
// Update next in case other queries have tried and failed moving it even farther in the future
r.backoffMtx.RLock()
next = r.nextQuery
r.backoffMtx.RUnlock()
}

scheme := "http"
if r.tls {
scheme = "https"
Expand Down Expand Up @@ -224,9 +273,17 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
}()

if resp.StatusCode/100 != 2 {
r.backoffMtx.Lock()
r.nextQuery = nextBackoff(r.w, resp.StatusCode, r.backoff)
r.backoffMtx.Unlock()
buf, _ := ioutil.ReadAll(resp.Body)
return nil, fmt.Errorf("error response from server: %s (%v)", string(buf), err)
}
// No Errors, reset backoff
r.backoffMtx.Lock()
r.backoff.Reset()
r.backoffMtx.Unlock()

var decoded loghttp.QueryResponse
err = json.NewDecoder(resp.Body).Decode(&decoded)
if err != nil {
Expand Down Expand Up @@ -368,3 +425,15 @@ func parseResponse(entry *loghttp.Entry) (*time.Time, error) {
t := time.Unix(0, ts)
return &t, nil
}

func nextBackoff(w io.Writer, statusCode int, backoff *util.Backoff) time.Time {
// Be way more conservative with an http 429 and wait 5 minutes before trying again.
var next time.Time
if statusCode == http.StatusTooManyRequests {
next = time.Now().Add(5 * time.Minute)
} else {
next = time.Now().Add(backoff.NextDelay())
}
fmt.Fprintf(w, "Loki returned an error code: %v, waiting %v before next query.", statusCode, next.Sub(time.Now()))
return next
}

0 comments on commit 204e61a

Please sign in to comment.