Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/ingester: check that ingester is in LEAVING state when transferring chunks and claiming tokens. Required when using memberlist client. #1300

Merged
merged 4 commits into from Dec 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 31 additions & 0 deletions pkg/ingester/transfer.go
Expand Up @@ -79,6 +79,12 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer)
if fromIngesterID == "" {
fromIngesterID = chunkSet.FromIngesterId
level.Info(util.Logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID)

// Before transfer, make sure 'from' ingester is in correct state to call ClaimTokensFor later
err := i.checkFromIngesterIsInLeavingState(stream.Context(), fromIngesterID)
if err != nil {
return errors.Wrap(err, "TransferChunks: checkFromIngesterIsInLeavingState")
}
}

userCtx := user.InjectOrgID(stream.Context(), chunkSet.UserId)
Expand Down Expand Up @@ -125,6 +131,31 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer)
return nil
}

// Ring gossiping: check if "from" ingester is in LEAVING state. It should be, but we may not see that yet
// when using gossip ring. If we cannot see ingester is the LEAVING state yet, we don't accept this
// transfer, as claiming tokens would possibly end up with this ingester owning no tokens, due to conflict
// resolution in ring merge function. Hopefully the leaving ingester will retry transfer again.
func (i *Ingester) checkFromIngesterIsInLeavingState(ctx context.Context, fromIngesterID string) error {
v, err := i.lifecycler.KVStore.Get(ctx, ring.ConsulKey)
if err != nil {
return errors.Wrap(err, "get ring")
}
if v == nil {
return fmt.Errorf("ring not found when checking state of source ingester")
}
r, ok := v.(*ring.Desc)
if !ok || r == nil {
return fmt.Errorf("ring not found, got %T", v)
}

if r.Ingesters == nil || r.Ingesters[fromIngesterID].State != ring.LEAVING {
return fmt.Errorf("source ingester is not in a LEAVING state, found state=%v", r.Ingesters[fromIngesterID].State)
}

// all fine
return nil
}

// StopIncomingRequests implements ring.Lifecycler.
func (i *Ingester) StopIncomingRequests() {
i.shutdownMtx.Lock()
Expand Down
2 changes: 2 additions & 0 deletions pkg/loki/modules.go
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cortexproject/cortex/pkg/util"

"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"
"google.golang.org/grpc/health/grpc_health_v1"
Expand Down Expand Up @@ -120,6 +121,7 @@ func (t *Loki) initRing() (err error) {
if err != nil {
return
}
prometheus.MustRegister(t.ring)
t.server.HTTP.Handle("/ring", t.ring)
return
}
Expand Down