Skip to content

Commit

Permalink
fix(logql-compliance-tester): wait until ingester is available
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Apr 26, 2024
1 parent b7a7e5a commit 284a6d0
Showing 1 changed file with 63 additions and 17 deletions.
80 changes: 63 additions & 17 deletions cmd/logql-compliance-tester/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"net/http"
"net/url"
"strconv"
"time"
Expand All @@ -23,21 +24,24 @@ func setup(ctx context.Context, cfg Config) (*lokicompliance.Comparer, error) {
cfg.ReferenceTarget,
cfg.TestTarget,
} {
raw := cfg.PushURL
if raw == "" {
raw = cfg.QueryURL
target := cfg.PushURL
if target == "" {
target = cfg.QueryURL
}

u, err := url.Parse(raw)
u, err := url.Parse(target)
if err != nil {
return nil, errors.Wrapf(err, "parse target %q", raw)
return nil, errors.Wrapf(err, "parse target %q", target)
}
if u.Path == "" {
u = u.JoinPath("loki", "api", "v1", "push")
}
raw = u.String()
target = u.String()

targets = append(targets, raw)
if err := waitForIngest(ctx, target, http.DefaultClient); err != nil {
return nil, errors.Wrapf(err, "wait for %q", target)
}
targets = append(targets, target)
}

log.Info("Generating logs",
Expand Down Expand Up @@ -70,20 +74,65 @@ func setup(ctx context.Context, cfg Config) (*lokicompliance.Comparer, error) {
return lokicompliance.New(refAPI, testAPI), nil
}

func waitForIngest(ctx context.Context, target string, client *http.Client) error {
check := func(ctx context.Context) error {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, target, http.NoBody)
if err != nil {
return errors.Wrap(err, "create request")
}

resp, err := client.Do(req)
if err != nil {
if cerr := ctx.Err(); cerr != nil {
return backoff.Permanent(cerr)
}
return errors.Wrap(err, "send")
}
defer func() {
_ = resp.Body.Close()
}()
return nil
}

var (
log = zctx.From(ctx).With(zap.String("target", target))
b = backoff.NewExponentialBackOff(
backoff.WithInitialInterval(5*time.Second),
backoff.WithMaxElapsedTime(time.Minute),
)
)
log.Info("Waiting for receiver")
if err := backoff.RetryNotify(
func() error {
return check(ctx)
},
b,
func(err error, d time.Duration) {
log.Debug("Retry ping request",
zap.Error(err),
)
},
); err != nil {
return err
}
log.Info("Receiver is ready")
return nil
}

func newLokiAPI(ctx context.Context, start, end time.Time, cfg lokicompliance.TargetConfig) (lokicompliance.LokiAPI, error) {
c, err := lokiapi.NewClient(cfg.QueryURL)
if err != nil {
return nil, err
}

if err := waitForLoki(ctx, c, start, end, cfg); err != nil {
if err := waitForAPI(ctx, c, start, end, cfg); err != nil {
return nil, errors.Wrap(err, "wait for loki")
}

return c, nil
}

func waitForLoki(ctx context.Context, c *lokiapi.Client, start, end time.Time, targetCfg lokicompliance.TargetConfig) error {
func waitForAPI(ctx context.Context, c *lokiapi.Client, start, end time.Time, targetCfg lokicompliance.TargetConfig) error {
check := func(ctx context.Context) error {
q := targetCfg.ReadyQuery
if q == "" {
Expand Down Expand Up @@ -118,28 +167,25 @@ func waitForLoki(ctx context.Context, c *lokiapi.Client, start, end time.Time, t
}

var (
b = backoff.NewExponentialBackOff(
log = zctx.From(ctx).With(zap.String("target", targetCfg.QueryURL))
b = backoff.NewExponentialBackOff(
backoff.WithInitialInterval(5*time.Second),
backoff.WithMaxElapsedTime(time.Minute),
)
log = zctx.From(ctx)
)
log.Info("Waiting for data and Query API")
if err := backoff.RetryNotify(
func() error {
return check(ctx)
},
b,
func(err error, d time.Duration) {
log.Debug("Retry ping request",
zap.String("target", targetCfg.QueryURL),
zap.Error(err),
)
log.Debug("Retry ping request", zap.Error(err))
},
); err != nil {
return err
}

log.Info("Target is ready", zap.String("target", targetCfg.QueryURL))
log.Info("Query API is ready")
return nil
}

Expand Down

0 comments on commit 284a6d0

Please sign in to comment.