Skip to content

Commit

Permalink
Add metrics for checkpoint and name changes
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
  • Loading branch information
codesome committed Aug 13, 2019
1 parent 787fbb8 commit 9845a5e
Showing 1 changed file with 59 additions and 15 deletions.
74 changes: 59 additions & 15 deletions pkg/ingester/wal.go
Expand Up @@ -55,58 +55,89 @@ func (noop) Log(*Record) error {
// Stop any background WAL processes.
func (noop) Stop() {}

type wrapper struct {
type walWrapper struct {
cfg WALConfig
ingester *Ingester
quit chan struct{}
wait sync.WaitGroup

lastWalSegment int
wal *wal.WAL

// Checkpoint metrics.
checkpointDeleteFail prometheus.Counter
checkpointDeleteTotal prometheus.Counter
checkpointCreationFail prometheus.Counter
checkpointCreationTotal prometheus.Counter
}

func newWAL(cfg WALConfig, ingester *Ingester) (WAL, error) {
if !cfg.enabled {
return &noop{}, nil
}

var samplesRegistry prometheus.Registerer
var walRegistry prometheus.Registerer
if cfg.metricsRegisterer != nil {
samplesRegistry = prometheus.WrapRegistererWith(prometheus.Labels{"kind": "samples"}, cfg.metricsRegisterer)
walRegistry = prometheus.WrapRegistererWith(prometheus.Labels{"kind": "wal"}, cfg.metricsRegisterer)
}
samples, err := wal.New(util.Logger, samplesRegistry, path.Join(cfg.dir, "samples"), true)
tsdbWAL, err := wal.New(util.Logger, walRegistry, path.Join(cfg.dir, "wal"), true)
if err != nil {
return nil, err
}

w := &wrapper{
w := &walWrapper{
cfg: cfg,
ingester: ingester,
quit: make(chan struct{}),
wal: samples,
wal: tsdbWAL,
}

w.checkpointDeleteFail = prometheus.NewCounter(prometheus.CounterOpts{
Name: "ingester_checkpoint_deletions_failed_total",
Help: "Total number of checkpoint deletions that failed.",
})
w.checkpointDeleteTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "ingester_checkpoint_deletions_total",
Help: "Total number of checkpoint deletions attempted.",
})
w.checkpointCreationFail = prometheus.NewCounter(prometheus.CounterOpts{
Name: "ingester_checkpoint_creations_failed_total",
Help: "Total number of checkpoint creations that failed.",
})
w.checkpointCreationTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "ingester_checkpoint_creations_total",
Help: "Total number of checkpoint creations attempted.",
})
if cfg.metricsRegisterer != nil {
cfg.metricsRegisterer.MustRegister(
w.checkpointDeleteFail,
w.checkpointDeleteTotal,
w.checkpointCreationFail,
w.checkpointCreationTotal,
)
}

w.wait.Add(1)
go w.run()
return w, nil
}

func (w *wrapper) Stop() {
func (w *walWrapper) Stop() {
close(w.quit)
w.wait.Wait()

w.wal.Close()
}

func (w *wrapper) Log(record *Record) error {
func (w *walWrapper) Log(record *Record) error {
buf, err := proto.Marshal(record)
if err != nil {
return err
}
return w.wal.Log(buf)
}

func (w *wrapper) run() {
func (w *walWrapper) run() {
defer w.wait.Done()

for !w.isStopped() {
Expand All @@ -123,7 +154,7 @@ func (w *wrapper) run() {
}
}

func (w *wrapper) isStopped() bool {
func (w *walWrapper) isStopped() bool {
select {
case <-w.quit:
return true
Expand All @@ -134,7 +165,13 @@ func (w *wrapper) isStopped() bool {

const checkpointPrefix = "checkpoint."

func (w *wrapper) checkpoint() error {
func (w *walWrapper) checkpoint() (err error) {
w.checkpointCreationTotal.Inc()
defer func() {
if err != nil {
w.checkpointCreationFail.Inc()
}
}()
_, last, err := w.lastCheckpoint()
if err != nil {
return err
Expand Down Expand Up @@ -201,7 +238,7 @@ func (w *wrapper) checkpoint() error {

// lastCheckpoint returns the directory name and index of the most recent checkpoint.
// If dir does not contain any checkpoints, -1 is returned as index.
func (w *wrapper) lastCheckpoint() (string, int, error) {
func (w *walWrapper) lastCheckpoint() (string, int, error) {
files, err := ioutil.ReadDir(w.wal.Dir())
if err != nil {
return "", -1, err
Expand All @@ -226,7 +263,14 @@ func (w *wrapper) lastCheckpoint() (string, int, error) {
}

// deleteCheckpoints deletes all checkpoints in a directory below a given index.
func (w *wrapper) deleteCheckpoints(maxIndex int) error {
func (w *walWrapper) deleteCheckpoints(maxIndex int) (err error) {
w.checkpointDeleteTotal.Inc()
defer func() {
if err != nil {
w.checkpointDeleteFail.Inc()
}
}()

var errs tsdb_errors.MultiError

files, err := ioutil.ReadDir(w.wal.Dir())
Expand All @@ -248,7 +292,7 @@ func (w *wrapper) deleteCheckpoints(maxIndex int) error {
return errs.Err()
}

func (w *wrapper) checkpointSeries(cp *wal.WAL, userID string, fp model.Fingerprint, series *memorySeries) error {
func (w *walWrapper) checkpointSeries(cp *wal.WAL, userID string, fp model.Fingerprint, series *memorySeries) error {
wireChunks, err := toWireChunks(series.chunkDescs)
if err != nil {
return err
Expand All @@ -268,7 +312,7 @@ func (w *wrapper) checkpointSeries(cp *wal.WAL, userID string, fp model.Fingerpr
}

// truncateSamples removed the wal from before the checkpoint.
func (w *wrapper) truncateSamples() error {
func (w *walWrapper) truncateSamples() error {
_, last, err := w.wal.Segments()
if err != nil {
return err
Expand Down

0 comments on commit 9845a5e

Please sign in to comment.