Skip to content

Commit

Permalink
Adapt services model for flusher
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
  • Loading branch information
codesome committed Mar 6, 2020
1 parent df38a40 commit a20527d
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 44 deletions.
22 changes: 3 additions & 19 deletions cmd/cortex/main.go
Expand Up @@ -8,7 +8,6 @@ import (
"os"
"runtime"
"strings"
"sync"
"time"

"github.com/go-kit/kit/log/level"
Expand Down Expand Up @@ -106,24 +105,9 @@ func main() {

level.Info(util.Logger).Log("msg", "Starting Cortex", "version", version.Info())

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := t.Run(); err != nil {
level.Error(util.Logger).Log("msg", "error running Cortex", "err", err)
}
}()

// if cfg.Target.IsJob() {
// err = t.Stop()
// }

wg.Wait()

// if !cfg.Target.IsJob() {
// err = t.Stop()
// }
if err := t.Run(); err != nil {
level.Error(util.Logger).Log("msg", "error running Cortex", "err", err)
}

runtime.KeepAlive(ballast)
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/cortex/cortex.go
Expand Up @@ -86,8 +86,8 @@ type Config struct {
QueryRange queryrange.Config `yaml:"query_range,omitempty"`
TableManager chunk.TableManagerConfig `yaml:"table_manager,omitempty"`
Encoding encoding.Config `yaml:"-"` // No yaml for this, it only works with flags.
TSDB tsdb.Config `yaml:"tsdb" doc:"hidden"`
Compactor compactor.Config `yaml:"compactor,omitempty" doc:"hidden"`
TSDB tsdb.Config `yaml:"tsdb"`
Compactor compactor.Config `yaml:"compactor,omitempty"`
DataPurgerConfig purger.Config `yaml:"purger,omitempty"`

Ruler ruler.Config `yaml:"ruler,omitempty"`
Expand Down Expand Up @@ -322,7 +322,11 @@ func (t *Cortex) Run() error {
// let's find out which module failed
for m, s := range t.serviceMap {
if s == service {
level.Error(util.Logger).Log("msg", "module failed", "module", m, "error", service.FailureCase())
if service.FailureCase() == util.ErrStopCortex {
level.Info(util.Logger).Log("msg", "received stop signal via return error", "module", m, "error", service.FailureCase())
} else {
level.Error(util.Logger).Log("msg", "module failed", "module", m, "error", service.FailureCase())
}
return
}
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/cortex/module_service_wrapper.go
Expand Up @@ -85,9 +85,8 @@ func (w *moduleServiceWrapper) stop() error {

level.Debug(util.Logger).Log("msg", "stopping", "module", w.module)

w.service.StopAsync()
err := w.service.AwaitTerminated(context.Background())
if err != nil {
err := services.StopAndAwaitTerminated(context.Background(), w.service)
if err != nil && err != util.ErrStopCortex {
level.Warn(util.Logger).Log("msg", "error stopping module", "module", w.module, "err", err)
} else {
level.Info(util.Logger).Log("msg", "module stopped", "module", w.module)
Expand Down
2 changes: 0 additions & 2 deletions pkg/cortex/modules.go
Expand Up @@ -383,8 +383,6 @@ func (t *Cortex) initIngester(cfg *Config) (serv services.Service, err error) {
}

func (t *Cortex) initFlusher(cfg *Config) (serv services.Service, err error) {
// By the end of this call, the chunks should be recovered
// from the WAL and flushed.
t.flusher, err = flusher.New(
cfg.Flusher,
cfg.Ingester,
Expand Down
63 changes: 48 additions & 15 deletions pkg/flusher/flusher.go
@@ -1,17 +1,20 @@
package flusher

import (
"context"
"flag"
"fmt"
"net/http"
"time"

"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/cortexproject/cortex/pkg/ingester"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
)

// Config for an Ingester.
Expand All @@ -30,7 +33,15 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

// Flusher is designed to be used as a job to flush the chunks from the WAL on disk.
type Flusher struct {
ing *ingester.Ingester
services.Service

cfg Config
ingesterConfig ingester.Config
clientConfig client.Config
chunkStore ingester.ChunkStore
registerer prometheus.Registerer

ingester *ingester.Ingester
}

const (
Expand All @@ -48,33 +59,55 @@ func New(
) (*Flusher, error) {

ingesterConfig.WALConfig.Dir = cfg.WALDir

ing, err := ingester.NewForFlusher(ingesterConfig, clientConfig, chunkStore, registerer)
if err != nil {
return nil, err
ingesterConfig.ConcurrentFlushes = cfg.ConcurrentFlushes
ingesterConfig.FlushOpTimeout = cfg.FlushOpTimeout

f := &Flusher{
cfg: cfg,
ingesterConfig: ingesterConfig,
clientConfig: clientConfig,
chunkStore: chunkStore,
registerer: registerer,
}
f.Service = services.NewBasicService(f.starting, f.running, f.stopping)
return f, nil
}

return &Flusher{
ing: ing,
}, err
func (f *Flusher) starting(ctx context.Context) error {
// WAL replay happens here. We have it in the starting function and not New
// so that metrics can be collected in parallel with WAL replay.
var err error
f.ingester, err = ingester.NewForFlusher(f.ingesterConfig, f.clientConfig, f.chunkStore, f.registerer)
return err
}

func (f *Flusher) Flush() {
f.ing.Flush()
func (f *Flusher) running(ctx context.Context) error {
if err := f.ingester.StartAsync(ctx); err != nil {
return errors.Wrap(err, "start ingester")
}
if err := f.ingester.AwaitRunning(ctx); err != nil {
return errors.Wrap(err, "awaing running ingester")
}

f.ingester.Flush()

// Sleeping to give a chance to Prometheus
// to collect the metrics.
level.Info(util.Logger).Log("msg", fmt.Sprintf("sleeping for %s to give chance for collection of metrics", postFlushSleepTime.String()))
time.Sleep(postFlushSleepTime)

f.ing.Shutdown()
f.ingester.StopAsync()
if err := f.ingester.AwaitTerminated(ctx); err != nil {
return err
}
return util.ErrStopCortex
}
func (f *Flusher) stopping() error {
// Nothing to do here.
return nil
}

// ReadinessHandler returns 204 always.
func (f *Flusher) ReadinessHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}

func (f *Flusher) Close() error {
return nil
}
4 changes: 2 additions & 2 deletions pkg/ingester/ingester.go
Expand Up @@ -252,11 +252,11 @@ func NewForFlusher(cfg Config, clientConfig client.Config, chunkStore ChunkStore
return nil, err
}

i.Service = services.NewBasicService(i.starting, i.loop, i.stopping)
i.Service = services.NewBasicService(i.startingForFlusher, i.loop, i.stopping)
return i, nil
}

func (i *Ingester) startingFlusher(ctx context.Context) error {
func (i *Ingester) startingForFlusher(ctx context.Context) error {
i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes)
for j := 0; j < i.cfg.ConcurrentFlushes; j++ {
i.flushQueues[j] = util.NewPriorityQueue(i.metrics.flushQueueLength)
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/errors.go
@@ -0,0 +1,6 @@
package util

import "errors"

// ErrStopCortex is the error returned by a service as a hint to stop the Cortex server entirely.
var ErrStopCortex = errors.New("stop cortex")

0 comments on commit a20527d

Please sign in to comment.