Skip to content
Permalink
Browse files

add compare flags func to compare flags between prometheus and sidecar (

#838)

Original message:

* update documentation for a max/min block duration

add compare flags func to compare flags between prom and sidecar

* fix some nits


Functional change: now we check the configured flags (if possible) and error out if MinTime != MaxTime. We need to check this always since if that is not true then we will get overlapping blocks. Additionally, an error message is printed out if it is not equal to 2h (the recommended value).
  • Loading branch information...
yeya24 authored and GiedriusS committed Apr 24, 2019
1 parent a18ca1e commit eddaf1138a107ed5304d7cfb66a43de31a8e56da
@@ -11,6 +11,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/hashicorp/go-version"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/improbable-eng/thanos/pkg/component"
@@ -25,6 +26,7 @@ import (
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/tsdb/labels"
"google.golang.org/grpc"
kingpin "gopkg.in/alecthomas/kingpin.v2"
@@ -111,6 +113,17 @@ func runSidecar(
maxt: math.MaxInt64,
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return errors.Wrap(err, "getting object store config")
}

var uploads = true
if len(confContentYaml) == 0 {
level.Info(logger).Log("msg", "no supported bucket was configured, uploads will be disabled")
uploads = false
}

// Setup all the concurrent groups.
{
promUp := prometheus.NewGauge(prometheus.GaugeOpts{
@@ -125,6 +138,29 @@ func runSidecar(

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
// Only check Prometheus's flags when upload is enabled.
if uploads {
// Retry infinitely until we get Prometheus version.
if err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
if m.version, err = promclient.PromVersion(logger, m.promURL); err != nil {
level.Warn(logger).Log(
"msg", "failed to get Prometheus version. Is Prometheus running? Retrying",
"err", err,
)
return errors.Wrapf(err, "fetch Prometheus version")
}

return nil
}); err != nil {
return errors.Wrap(err, "fetch Prometheus version")
}

// Check prometheus's flags to ensure sane sidecar flags.
if err := validatePrometheus(ctx, logger, m); err != nil {
return errors.Wrap(err, "validate Prometheus flags")
}
}

// Blocking query of external labels before joining as a Source Peer into gossip.
// We retry infinitely until we reach and fetch labels from our Prometheus.
err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
@@ -229,17 +265,6 @@ func runSidecar(
})
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

var uploads = true
if len(confContentYaml) == 0 {
level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled")
uploads = false
}

if uploads {
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
@@ -265,10 +290,7 @@ func runSidecar(

var s *shipper.Shipper
if uploadCompacted {
s, err = shipper.NewWithCompacted(ctx, logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource, m.promURL)
if err != nil {
return errors.Wrap(err, "create shipper")
}
s = shipper.NewWithCompacted(logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource)
} else {
s = shipper.New(logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource)
}
@@ -298,13 +320,45 @@ func runSidecar(
return nil
}

func validatePrometheus(ctx context.Context, logger log.Logger, m *promMetadata) error {
if m.version == nil {
level.Warn(logger).Log("msg", "fetched version is nil or invalid. Unable to know whether Prometheus supports /version endpoint, skip validation")
return nil
}

if m.version.LessThan(promclient.FlagsVersion) {
level.Warn(logger).Log("msg",
"Prometheus doesn't support flags endpoint, skip validation", "version", m.version.Original())
return nil
}

flags, err := promclient.ConfiguredFlags(ctx, logger, m.promURL)
if err != nil {
return errors.Wrap(err, "failed to check flags")
}

// Check if compaction is disabled.
if flags.TSDBMinTime != flags.TSDBMaxTime {
return errors.Errorf("found that TSDB Max time is %s and Min time is %s. "+
"Compaction needs to be disabled (storage.tsdb.min-block-duration = storage.tsdb.max-block-duration)", flags.TSDBMaxTime, flags.TSDBMinTime)
}

// Check if block time is 2h.
if flags.TSDBMinTime != model.Duration(2*time.Hour) {
level.Warn(logger).Log("msg", "found that TSDB block time is not 2h. Only 2h block time is recommended.", "block-time", flags.TSDBMinTime)
}

return nil
}

type promMetadata struct {
promURL *url.URL

mtx sync.Mutex
mint int64
maxt int64
labels labels.Labels
mtx sync.Mutex
mint int64
maxt int64
labels labels.Labels
version *version.Version
}

func (s *promMetadata) UpdateLabels(ctx context.Context, logger log.Logger) error {
@@ -14,7 +14,8 @@ Prometheus servers connected to the Thanos cluster via the sidecar are subject t
* The minimum Prometheus version is 2.2.1
* The `external_labels` section of the configuration implements is in line with the desired label scheme (will be used by query layer to filter out store APIs to query).
* The `--web.enable-lifecycle` flag is enabled if you want to use `reload.*` flags.
* The `--storage.tsdb.min-block-duration` and `--storage.tsdb.max-block-duration` must be set to equal values to disable local compaction. The default of `2h` is recommended.
* The `--storage.tsdb.min-block-duration` and `--storage.tsdb.max-block-duration` must be set to equal values to disable local compaction on order to use Thanos sidecar upload. Leave local compaction on if sidecar just exposes StoreAPI and your retention is normal. The default of `2h` is recommended.
Mentioned parameters set to equal values disable the internal Prometheus compaction, which is needed to avoid the uploaded data corruption when thanos compactor does its job, this is critical for data consistency and should not be ignored if you plan to use Thanos compactor. Even though you set mentioned parameters equal, you might observe Prometheus internal metric `prometheus_tsdb_compactions_total` being incremented, don't be confused by that: Prometheus writes initial head block to filestem via internal compaction mechanis, but if you followed recommendations - data won't be modified by Prometheus before sidecar uploads it. Thanos sidecar will also check sanity of the flags set to Prometheus on the startup and log errors or warning if they have been configured improperly (#838).

The retention is recommended to not be lower than three times the block duration. This achieves resilience in the face of connectivity issues
to the object storage since all local data will remain available within the Thanos cluster. If connectivity gets restored the backlog of blocks gets uploaded to the object storage.
1 go.mod
@@ -17,6 +17,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
github.com/grpc-ecosystem/go-grpc-prometheus v0.0.0-20181025070259-68e3a13e4117
github.com/hashicorp/go-sockaddr v0.0.0-20180320115054-6d291a969b86
github.com/hashicorp/go-version v1.1.0
github.com/hashicorp/golang-lru v0.5.1
github.com/hashicorp/memberlist v0.1.0
github.com/julienschmidt/httprouter v1.1.0 // indirect
3 go.sum
@@ -59,6 +59,7 @@ github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeME
github.com/go-ini/ini v1.21.1/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/go-kit/kit v0.8.0 h1:Wz+5lgoB0kkuqLEc6NVmwRknTKP6dTGbSqvhZtBI/j0=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0 h1:8HUsc87TaSWLKwrnumgC8/YconD2fJQsRJAsWaPg2ic=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0 h1:MP4Eh7ZCb31lleYCFuwm0oe4/YGak+5l1vA2NOE80nA=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
@@ -122,6 +123,8 @@ github.com/hashicorp/go-sockaddr v0.0.0-20180320115054-6d291a969b86 h1:7YOlAIO2Y
github.com/hashicorp/go-sockaddr v0.0.0-20180320115054-6d291a969b86/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-version v1.1.0 h1:bPIoEKD27tNdebFGGxxYwcL4nepeY4j1QP23PFRGzg0=
github.com/hashicorp/go-version v1.1.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
@@ -23,6 +23,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/hashicorp/go-version"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/improbable-eng/thanos/pkg/tracing"
@@ -35,6 +36,8 @@ import (
yaml "gopkg.in/yaml.v2"
)

var FlagsVersion = version.Must(version.NewVersion("2.2.0"))

// IsWALFileAccesible returns no error if WAL dir can be found. This helps to tell
// if we have access to Prometheus TSDB directory.
func IsWALDirAccesible(dir string) error {
@@ -70,7 +73,7 @@ func ExternalLabels(ctx context.Context, logger log.Logger, base *url.URL) (labe

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.Errorf("failed to read body")
return nil, errors.New("failed to read body")
}

if resp.StatusCode != 200 {
@@ -186,7 +189,7 @@ func ConfiguredFlags(ctx context.Context, logger log.Logger, base *url.URL) (Fla

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return Flags{}, errors.Errorf("failed to read body")
return Flags{}, errors.New("failed to read body")
}

if resp.StatusCode != 200 {
@@ -230,7 +233,7 @@ func Snapshot(ctx context.Context, logger log.Logger, base *url.URL, skipHead bo

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", errors.Errorf("failed to read body")
return "", errors.New("failed to read body")
}

if resp.StatusCode != 200 {
@@ -376,6 +379,32 @@ func PromqlQueryInstant(ctx context.Context, logger log.Logger, base *url.URL, q
return vec, warnings, nil
}

// PromVersion will return the version of Prometheus by querying /version Prometheus endpoint.
func PromVersion(logger log.Logger, base *url.URL) (*version.Version, error) {
if logger == nil {
logger = log.NewNopLogger()
}

u := *base
u.Path = path.Join(u.Path, "/version")
resp, err := http.Get(u.String())
if err != nil {
return nil, errors.Wrapf(err, "request version against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body")

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.New("failed to read body")
}

if resp.StatusCode != 200 {
return nil, errors.Errorf("got non-200 response code: %v, response: %v", resp.StatusCode, string(b))
}

return parseVersion(b)
}

// Scalar response consists of array with mixed types so it needs to be
// unmarshaled separately.
func convertScalarJSONToVector(scalarJSONResult json.RawMessage) (model.Vector, error) {
@@ -478,3 +507,25 @@ func MetricValues(ctx context.Context, logger log.Logger, base *url.URL, perMetr
}
}
}

// parseVersion converts string to version.Version.
func parseVersion(data []byte) (*version.Version, error) {
var m struct {
Version string `json:"version"`
}
if err := json.Unmarshal(data, &m); err != nil {
return nil, errors.Wrapf(err, "unmarshal response: %v", string(data))
}

// Prometheus is built with nil version.
if strings.TrimSpace(m.Version) == "" {
return nil, nil
}

ver, err := version.NewVersion(m.Version)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse version %s", m.Version)
}

return ver, nil
}
@@ -11,6 +11,7 @@ import (
"time"

"github.com/go-kit/kit/log"
"github.com/hashicorp/go-version"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/oklog/ulid"
@@ -147,3 +148,36 @@ func TestRule_UnmarshalScalarResponse(t *testing.T) {
vectorResult, err = convertScalarJSONToVector(invalidDataScalarJSONResult)
testutil.NotOk(t, err)
}

func TestParseVersion(t *testing.T) {
promVersions := map[string]string{
"": promVersionResp(""),
"2.2.0": promVersionResp("2.2.0"),
"2.3.0": promVersionResp("2.3.0"),
"2.3.0-rc.0": promVersionResp("2.3.0-rc.0"),
}

promMalformedVersions := map[string]string{
"foo": promVersionResp("foo"),
"bar": promVersionResp("bar"),
}

for v, resp := range promVersions {
gotVersion, err := parseVersion([]byte(resp))
testutil.Ok(t, err)
expectVersion, _ := version.NewVersion(v)
testutil.Equals(t, gotVersion, expectVersion)
}

for v, resp := range promMalformedVersions {
gotVersion, err := parseVersion([]byte(resp))
testutil.NotOk(t, err)
expectVersion, _ := version.NewVersion(v)
testutil.Equals(t, gotVersion, expectVersion)
}
}

// promVersionResp returns the response of Prometheus /version endpoint.
func promVersionResp(ver string) string {
return fmt.Sprintf(`{"version":"%s","revision":"","branch":"","buildUser":"","buildDate":"","goVersion":""}`, ver)
}

0 comments on commit eddaf11

Please sign in to comment.
You can’t perform that action at this time.