Skip to content

Commit

Permalink
Fix creation of user states in WAL recover
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
  • Loading branch information
codesome committed Jul 11, 2019
1 parent bc25534 commit f023ddb
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 43 deletions.
71 changes: 38 additions & 33 deletions pkg/ingester/user_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,43 @@ func (us *userStates) get(userID string) (*userState, bool) {
return state.(*userState), ok
}

func (us *userStates) getOrCreate(userID string) *userState {
rawState, ok := us.states.Load(userID)
if ok {
return rawState.(*userState)
}

seriesInMetric := make([]metricCounterShard, 0, metricCounterShards)
for i := 0; i < metricCounterShards; i++ {
seriesInMetric = append(seriesInMetric, metricCounterShard{
m: map[string]int{},
})
}

// Speculatively create a userState object and try to store it
// in the map. Another goroutine may have got there before
// us, in which case this userState will be discarded
state := &userState{
userID: userID,
limits: us.limits,
fpToSeries: newSeriesMap(),
fpLocker: newFingerprintLocker(16 * 1024),
index: index.New(),
ingestedAPISamples: newEWMARate(0.2, us.cfg.RateUpdatePeriod),
ingestedRuleSamples: newEWMARate(0.2, us.cfg.RateUpdatePeriod),
seriesInMetric: seriesInMetric,

memSeriesCreatedTotal: memSeriesCreatedTotal.WithLabelValues(userID),
memSeriesRemovedTotal: memSeriesRemovedTotal.WithLabelValues(userID),
}
state.mapper = newFPMapper(state.fpToSeries)
stored, ok := us.states.LoadOrStore(userID, state)
if !ok {
memUsers.Inc()
}
return stored.(*userState)
}

func (us *userStates) getViaContext(ctx context.Context) (*userState, bool, error) {
userID, err := user.ExtractOrgID(ctx)
if err != nil {
Expand All @@ -135,39 +172,7 @@ func (us *userStates) getOrCreateSeries(ctx context.Context, labels []client.Lab
return nil, 0, nil, fmt.Errorf("no user id")
}

state, ok := us.get(userID)
if !ok {

seriesInMetric := make([]metricCounterShard, 0, metricCounterShards)
for i := 0; i < metricCounterShards; i++ {
seriesInMetric = append(seriesInMetric, metricCounterShard{
m: map[string]int{},
})
}

// Speculatively create a userState object and try to store it
// in the map. Another goroutine may have got there before
// us, in which case this userState will be discarded
state = &userState{
userID: userID,
limits: us.limits,
fpToSeries: newSeriesMap(),
fpLocker: newFingerprintLocker(16 * 1024),
index: index.New(),
ingestedAPISamples: newEWMARate(0.2, us.cfg.RateUpdatePeriod),
ingestedRuleSamples: newEWMARate(0.2, us.cfg.RateUpdatePeriod),
seriesInMetric: seriesInMetric,

memSeriesCreatedTotal: memSeriesCreatedTotal.WithLabelValues(userID),
memSeriesRemovedTotal: memSeriesRemovedTotal.WithLabelValues(userID),
}
state.mapper = newFPMapper(state.fpToSeries)
stored, ok := us.states.LoadOrStore(userID, state)
if !ok {
memUsers.Inc()
}
state = stored.(*userState)
}
state := us.getOrCreate(userID)

fp, series, err := state.getSeries(labels, record)
return state, fp, series, err
Expand Down
13 changes: 3 additions & 10 deletions pkg/ingester/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ingester

import (
"flag"
"fmt"
"path"
"sync"
"time"
Expand Down Expand Up @@ -220,7 +219,7 @@ func (w *wrapper) truncateSamples() error {
return nil
}

func (w *wrapper) recover(ctx context.Context) error {
func (w *wrapper) recover(ctx context.Context) (err error) {
// Use a local userStates, so we don't need to worry about locking.
userStates := newUserStates(w.ingester.limits, w.ingester.cfg)

Expand All @@ -233,10 +232,7 @@ func (w *wrapper) recover(ctx context.Context) error {
return err
}

state, ok := userStates.get(walSeries.UserId)
if !ok {
return fmt.Errorf("user state not found for userid=%s", walSeries.UserId)
}
state := userStates.getOrCreate(walSeries.UserId)

la = la[:0]
for _, l := range walSeries.Labels {
Expand All @@ -258,10 +254,7 @@ func (w *wrapper) recover(ctx context.Context) error {
if err := w.recoverRecords("samples", &Record{}, func(msg proto.Message) error {
record := msg.(*Record)

state, ok := userStates.get(record.UserId)
if !ok {
return fmt.Errorf("user state not found for userid=%s", record.UserId)
}
state := userStates.getOrCreate(record.UserId)

for _, labels := range record.Labels {
_, ok := state.fpToSeries.get(model.Fingerprint(labels.Fingerprint))
Expand Down

0 comments on commit f023ddb

Please sign in to comment.