Skip to content

Commit

Permalink
Add missing sample pending
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena committed Jun 17, 2022
1 parent 5b9a3a5 commit 5f135d8
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 1 deletion.
1 change: 1 addition & 0 deletions pkg/distributor/distributor.go
Expand Up @@ -97,6 +97,7 @@ func (d *Distributor) Push(ctx context.Context, req *connect.Request[pushv1.Push
done: make(chan struct{}, 1), // buffer avoids blocking if caller terminates - sendProfiles() only sends once on each
err: make(chan error, 1),
}
tracker.samplesPending.Store(int32(len(profiles)))
for ingester, samples := range samplesByIngester {
go func(ingester ring.InstanceDesc, samples []*profileTracker) {
// Use a background context to make sure all ingesters get samples even if we return early
Expand Down
8 changes: 7 additions & 1 deletion pkg/distributor/distributor_test.go
Expand Up @@ -26,7 +26,7 @@ import (
func Test_ConnectPush(t *testing.T) {
mux := http.NewServeMux()
d, err := New(Config{}, &mockRing{
replicationFactor: 3,
replicationFactor: 1,
ingesters: []ring.InstanceDesc{
{Addr: "foo"},
},
Expand Down Expand Up @@ -104,6 +104,12 @@ func (r mockRing) Get(key uint32, op ring.Operation, buf []ring.InstanceDesc, _
MaxErrors: 1,
Instances: buf[:0],
}
if r.replicationFactor == 1 && len(r.ingesters) == 1 {
result.MaxErrors = 0
result.Instances = append(result.Instances, r.ingesters[0])
return result, nil
}

for i := uint32(0); i < r.replicationFactor; i++ {
n := (key + i) % uint32(len(r.ingesters))
result.Instances = append(result.Instances, r.ingesters[n])
Expand Down

0 comments on commit 5f135d8

Please sign in to comment.