Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert Loki modules to services #1804

Merged
merged 21 commits into from
Apr 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
15 changes: 3 additions & 12 deletions cmd/loki/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,10 @@ func main() {

// Start Loki
t, err := loki.New(config)
if err != nil {
level.Error(util.Logger).Log("msg", "error initialising loki", "err", err)
os.Exit(1)
}
util.CheckFatal("initialising loki", err)

level.Info(util.Logger).Log("msg", "Starting Loki", "version", version.Info())

if err := t.Run(); err != nil {
level.Error(util.Logger).Log("msg", "error running loki", "err", err)
}

if err := t.Stop(); err != nil {
level.Error(util.Logger).Log("msg", "error stopping loki", "err", err)
os.Exit(1)
}
err = t.Run()
util.CheckFatal("running loki", err)
}
47 changes: 47 additions & 0 deletions go.sum

Large diffs are not rendered by default.

65 changes: 29 additions & 36 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"flag"
"net/http"
"os"
"sync/atomic"
"time"

Expand All @@ -16,7 +15,6 @@ import (
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/pkg/errors"

"github.com/go-kit/kit/log/level"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -34,7 +32,6 @@ const (
metricName = "logs"
)

var readinessProbeSuccess = []byte("Ready")
var (
ingesterAppends = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Expand Down Expand Up @@ -75,6 +72,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

// Distributor coordinates replicates and distribution of log streams.
type Distributor struct {
services.Service

cfg Config
clientCfg client.Config
ingestersRing ring.ReadRing
Expand All @@ -85,6 +84,9 @@ type Distributor struct {
// the number of healthy instances.
distributorsRing *ring.Lifecycler

subservices *services.Manager
subservicesWatcher *services.FailureWatcher

// Per-user rate limiter.
ingestionRateLimiter *limiter.RateLimiter
}
Expand All @@ -107,25 +109,16 @@ func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overr
var ingestionRateStrategy limiter.RateLimiterStrategy
var distributorsRing *ring.Lifecycler

var servs []services.Service

if overrides.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
var err error
distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ring.DistributorRingKey, false)
if err != nil {
return nil, err
}

distributorsRing.AddListener(services.NewListener(nil, nil, nil, nil, func(_ services.State, failure error) {
// lifecycler used to do os.Exit(1) on its own failure, but now it just goes into Failed state.
// for now we just simulate old behaviour here. When Distributor itself becomes a service, it will enter Failed state as well.
level.Error(cortex_util.Logger).Log("msg", "lifecycler failed", "err", err)
os.Exit(1)
}))

err = services.StartAndAwaitRunning(context.Background(), distributorsRing)
if err != nil {
return nil, err
}

servs = append(servs, distributorsRing)
ingestionRateStrategy = newGlobalIngestionRateStrategy(overrides, distributorsRing)
} else {
ingestionRateStrategy = newLocalIngestionRateStrategy(overrides)
Expand All @@ -141,18 +134,33 @@ func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overr
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
}

if err := services.StartAndAwaitRunning(context.Background(), d.pool); err != nil {
return nil, errors.Wrap(err, "starting client pool")
servs = append(servs, d.pool)
d.subservices, err = services.NewManager(servs...)
if err != nil {
return nil, errors.Wrap(err, "services manager")
}
d.subservicesWatcher = services.NewFailureWatcher()
d.subservicesWatcher.WatchManager(d.subservices)
d.Service = services.NewBasicService(d.starting, d.running, d.stopping)

return &d, nil
}

func (d *Distributor) Stop() {
if d.distributorsRing != nil {
_ = services.StopAndAwaitTerminated(context.Background(), d.distributorsRing)
func (d *Distributor) starting(ctx context.Context) error {
return services.StartManagerAndAwaitHealthy(ctx, d.subservices)
}

func (d *Distributor) running(ctx context.Context) error {
select {
case <-ctx.Done():
return nil
case err := <-d.subservicesWatcher.Chan():
return errors.Wrap(err, "distributor subservice failed")
}
_ = services.StopAndAwaitTerminated(context.Background(), d.pool)
}

func (d *Distributor) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), d.subservices)
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
Expand All @@ -172,21 +180,6 @@ type pushTracker struct {
err chan error
}

// ReadinessHandler is used to indicate to k8s when the distributor is ready.
// Returns 200 when the distributor is ready, 500 otherwise.
func (d *Distributor) ReadinessHandler(w http.ResponseWriter, r *http.Request) {
_, err := d.ingestersRing.GetAll()
if err != nil {
http.Error(w, "Not ready: "+err.Error(), http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusOK)
if _, err := w.Write(readinessProbeSuccess); err != nil {
level.Error(cortex_util.Logger).Log("msg", "error writing success message", "error", err)
}
}

// Push a set of streams.
func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
userID, err := user.ExtractOrgID(ctx)
Expand Down
5 changes: 4 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/test"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -80,6 +81,7 @@ func TestDistributor(t *testing.T) {
limits.MaxLineSize = fe.ByteSize(tc.maxLineSize)

d := prepare(t, limits, nil)
defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck

request := makeWriteRequest(tc.lines, 10)

Expand Down Expand Up @@ -163,7 +165,7 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
distributors := make([]*Distributor, testData.distributors)
for i := 0; i < testData.distributors; i++ {
distributors[i] = prepare(t, limits, kvStore)
defer distributors[i].Stop()
defer services.StopAndAwaitTerminated(context.Background(), distributors[i]) //nolint:errcheck
}

// If the distributors ring is setup, wait until the first distributor
Expand Down Expand Up @@ -226,6 +228,7 @@ func prepare(t *testing.T, limits *validation.Limits, kvStore kv.Client) *Distri

d, err := New(distributorConfig, clientConfig, ingestersRing, overrides)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), d))

return d
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/ingester/client"
Expand Down Expand Up @@ -43,6 +44,7 @@ func TestChunkFlushingIdle(t *testing.T) {
cfg.RetainPeriod = 500 * time.Millisecond

store, ing := newTestStore(t, cfg)
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
testData := pushTestSamples(t, ing)

// wait beyond idle time so samples flush
Expand All @@ -53,7 +55,7 @@ func TestChunkFlushingIdle(t *testing.T) {
func TestChunkFlushingShutdown(t *testing.T) {
store, ing := newTestStore(t, defaultIngesterTestConfig(t))
testData := pushTestSamples(t, ing)
ing.Shutdown()
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))
store.checkData(t, testData)
}

Expand Down Expand Up @@ -90,7 +92,7 @@ func TestFlushingCollidingLabels(t *testing.T) {
require.NoError(t, err)

// force flush
ing.Shutdown()
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))

// verify that we get all the data back
store.checkData(t, map[string][]*logproto.Stream{userID: req.Streams})
Expand Down Expand Up @@ -154,6 +156,7 @@ func TestFlushMaxAge(t *testing.T) {
},
})

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))
}

type testStore struct {
Expand All @@ -172,6 +175,7 @@ func newTestStore(t require.TestingT, cfg Config) (*testStore, *Ingester) {

ing, err := New(cfg, client.Config{}, store, limits)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))

return store, ing
}
Expand Down
Loading