diff --git a/cmd/logql-compliance-tester/setup.go b/cmd/logql-compliance-tester/setup.go index 8a9dbed3..299452b3 100644 --- a/cmd/logql-compliance-tester/setup.go +++ b/cmd/logql-compliance-tester/setup.go @@ -2,6 +2,7 @@ package main import ( "context" + "net/http" "net/url" "strconv" "time" @@ -23,21 +24,24 @@ func setup(ctx context.Context, cfg Config) (*lokicompliance.Comparer, error) { cfg.ReferenceTarget, cfg.TestTarget, } { - raw := cfg.PushURL - if raw == "" { - raw = cfg.QueryURL + target := cfg.PushURL + if target == "" { + target = cfg.QueryURL } - u, err := url.Parse(raw) + u, err := url.Parse(target) if err != nil { - return nil, errors.Wrapf(err, "parse target %q", raw) + return nil, errors.Wrapf(err, "parse target %q", target) } if u.Path == "" { u = u.JoinPath("loki", "api", "v1", "push") } - raw = u.String() + target = u.String() - targets = append(targets, raw) + if err := waitForIngest(ctx, target, http.DefaultClient); err != nil { + return nil, errors.Wrapf(err, "wait for %q", target) + } + targets = append(targets, target) } log.Info("Generating logs", @@ -70,20 +74,65 @@ func setup(ctx context.Context, cfg Config) (*lokicompliance.Comparer, error) { return lokicompliance.New(refAPI, testAPI), nil } +func waitForIngest(ctx context.Context, target string, client *http.Client) error { + check := func(ctx context.Context) error { + req, err := http.NewRequestWithContext(ctx, http.MethodPost, target, http.NoBody) + if err != nil { + return errors.Wrap(err, "create request") + } + + resp, err := client.Do(req) + if err != nil { + if cerr := ctx.Err(); cerr != nil { + return backoff.Permanent(cerr) + } + return errors.Wrap(err, "send") + } + defer func() { + _ = resp.Body.Close() + }() + return nil + } + + var ( + log = zctx.From(ctx).With(zap.String("target", target)) + b = backoff.NewExponentialBackOff( + backoff.WithInitialInterval(5*time.Second), + backoff.WithMaxElapsedTime(time.Minute), + ) + ) + log.Info("Waiting for receiver") + if err := backoff.RetryNotify( + func() error { + return check(ctx) + }, + b, + func(err error, d time.Duration) { + log.Debug("Retry ping request", + zap.Error(err), + ) + }, + ); err != nil { + return err + } + log.Info("Receiver is ready") + return nil +} + func newLokiAPI(ctx context.Context, start, end time.Time, cfg lokicompliance.TargetConfig) (lokicompliance.LokiAPI, error) { c, err := lokiapi.NewClient(cfg.QueryURL) if err != nil { return nil, err } - if err := waitForLoki(ctx, c, start, end, cfg); err != nil { + if err := waitForAPI(ctx, c, start, end, cfg); err != nil { return nil, errors.Wrap(err, "wait for loki") } return c, nil } -func waitForLoki(ctx context.Context, c *lokiapi.Client, start, end time.Time, targetCfg lokicompliance.TargetConfig) error { +func waitForAPI(ctx context.Context, c *lokiapi.Client, start, end time.Time, targetCfg lokicompliance.TargetConfig) error { check := func(ctx context.Context) error { q := targetCfg.ReadyQuery if q == "" { @@ -118,28 +167,25 @@ func waitForLoki(ctx context.Context, c *lokiapi.Client, start, end time.Time, t } var ( - b = backoff.NewExponentialBackOff( + log = zctx.From(ctx).With(zap.String("target", targetCfg.QueryURL)) + b = backoff.NewExponentialBackOff( backoff.WithInitialInterval(5*time.Second), backoff.WithMaxElapsedTime(time.Minute), ) - log = zctx.From(ctx) ) + log.Info("Waiting for data and Query API") if err := backoff.RetryNotify( func() error { return check(ctx) }, b, func(err error, d time.Duration) { - log.Debug("Retry ping request", - zap.String("target", targetCfg.QueryURL), - zap.Error(err), - ) + log.Debug("Retry ping request", zap.Error(err)) }, ); err != nil { return err } - - log.Info("Target is ready", zap.String("target", targetCfg.QueryURL)) + log.Info("Query API is ready") return nil }