Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/alertmanager/alertstore/bucketclient/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (

"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/runutil"
"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/objstore"

"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/util/concurrency"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion pkg/alertmanager/alertstore/objectclient/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
"sync"

"github.com/go-kit/log"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/runutil"
"github.com/pkg/errors"

"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/util/concurrency"
)

// Object Alert Storage Schema
Expand Down
2 changes: 1 addition & 1 deletion pkg/alertmanager/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/pkg/errors"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/template"
Expand All @@ -22,6 +21,7 @@ import (
"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/concurrency"
util_log "github.com/cortexproject/cortex/pkg/util/log"
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/services"
Expand All @@ -38,6 +37,7 @@ import (
"github.com/cortexproject/cortex/pkg/ring/client"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/concurrency"
util_log "github.com/cortexproject/cortex/pkg/util/log"
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/alertmanager/multitenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv/consul"
"github.com/grafana/dskit/services"
Expand All @@ -47,6 +46,7 @@ import (
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/concurrency"
"github.com/cortexproject/cortex/pkg/util/test"
"github.com/cortexproject/cortex/pkg/util/validation"
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/services"
"github.com/oklog/ulid"
"github.com/pkg/errors"
Expand All @@ -21,6 +20,7 @@ import (
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/concurrency"
util_log "github.com/cortexproject/cortex/pkg/util/log"
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv/consul"
"github.com/grafana/dskit/services"
Expand All @@ -39,6 +38,7 @@ import (
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/storage/bucket"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util/concurrency"
cortex_testutil "github.com/cortexproject/cortex/pkg/util/test"
"github.com/cortexproject/cortex/pkg/util/validation"
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/go-kit/log"
"github.com/gorilla/mux"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/services"
otgrpc "github.com/opentracing-contrib/go-grpc"
Expand All @@ -30,6 +29,7 @@ import (
"github.com/cortexproject/cortex/pkg/frontend/transport"
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
"github.com/cortexproject/cortex/pkg/util/concurrency"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/services"
"github.com/oklog/ulid"
"github.com/pkg/errors"
Expand All @@ -40,6 +39,7 @@ import (
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/concurrency"
"github.com/cortexproject/cortex/pkg/util/extract"
logutil "github.com/cortexproject/cortex/pkg/util/log"
util_math "github.com/cortexproject/cortex/pkg/util/math"
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/tenantfederation/merge_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sort"
"strings"

"github.com/grafana/dskit/concurrency"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
Expand All @@ -15,6 +14,7 @@ import (
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util/concurrency"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
)

Expand Down
3 changes: 2 additions & 1 deletion pkg/ring/basic_lifecycler_delegates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/test"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/util/concurrency"
)

func TestLeaveOnStoppingDelegate(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/services"
Expand All @@ -34,6 +33,7 @@ import (
"github.com/cortexproject/cortex/pkg/ruler/rulestore"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/concurrency"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/validation"
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket_index_metadata_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/concurrency"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
Expand All @@ -22,6 +21,7 @@ import (
"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
"github.com/cortexproject/cortex/pkg/util/concurrency"
)

func TestBucketIndexMetadataFetcher_Fetch(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"context"
"sync"

"github.com/grafana/dskit/multierror"
"golang.org/x/sync/errgroup"

"github.com/grafana/dskit/internal/math"
"github.com/grafana/dskit/multierror"
"github.com/cortexproject/cortex/pkg/util/math"
)

// ForEachUser runs the provided userFunc for each userIDs up to concurrency concurrent workers.
Expand Down
168 changes: 168 additions & 0 deletions pkg/util/concurrency/runner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package concurrency

import (
"context"
"errors"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

func TestForEachUser(t *testing.T) {
var (
ctx = context.Background()

// Keep track of processed users.
processedMx sync.Mutex
processed []string
)

input := []string{"a", "b", "c"}

err := ForEachUser(ctx, input, 2, func(ctx context.Context, user string) error {
processedMx.Lock()
defer processedMx.Unlock()
processed = append(processed, user)
return nil
})

require.NoError(t, err)
assert.ElementsMatch(t, input, processed)
}

func TestForEachUser_ShouldContinueOnErrorButReturnIt(t *testing.T) {
var (
ctx = context.Background()

// Keep the processed users count.
processed atomic.Int32
)

input := []string{"a", "b", "c"}

err := ForEachUser(ctx, input, 2, func(ctx context.Context, user string) error {
if processed.CAS(0, 1) {
return errors.New("the first request is failing")
}

// Wait 1s and increase the number of processed jobs, unless the context get canceled earlier.
select {
case <-time.After(time.Second):
processed.Add(1)
case <-ctx.Done():
return ctx.Err()
}

return nil
})

require.EqualError(t, err, "the first request is failing")

// Since we expect it continues on error, the number of processed users should be equal to the input length.
assert.Equal(t, int32(len(input)), processed.Load())
}

func TestForEachUser_ShouldReturnImmediatelyOnNoUsersProvided(t *testing.T) {
require.NoError(t, ForEachUser(context.Background(), nil, 2, func(ctx context.Context, user string) error {
return nil
}))
}

func TestForEach(t *testing.T) {
var (
ctx = context.Background()

// Keep track of processed jobs.
processedMx sync.Mutex
processed []string
)

jobs := []string{"a", "b", "c"}

err := ForEach(ctx, CreateJobsFromStrings(jobs), 2, func(ctx context.Context, job interface{}) error {
processedMx.Lock()
defer processedMx.Unlock()
processed = append(processed, job.(string))
return nil
})

require.NoError(t, err)
assert.ElementsMatch(t, jobs, processed)
}

func TestForEach_ShouldBreakOnFirstError_ContextCancellationHandled(t *testing.T) {
var (
ctx = context.Background()

// Keep the processed jobs count.
processed atomic.Int32
)

err := ForEach(ctx, []interface{}{"a", "b", "c"}, 2, func(ctx context.Context, job interface{}) error {
if processed.CAS(0, 1) {
return errors.New("the first request is failing")
}

// Wait 1s and increase the number of processed jobs, unless the context get canceled earlier.
select {
case <-time.After(time.Second):
processed.Add(1)
case <-ctx.Done():
return ctx.Err()
}

return nil
})

require.EqualError(t, err, "the first request is failing")

// Since we expect the first error interrupts the workers, we should only see
// 1 job processed (the one which immediately returned error).
assert.Equal(t, int32(1), processed.Load())
}

func TestForEach_ShouldBreakOnFirstError_ContextCancellationUnhandled(t *testing.T) {
var (
ctx = context.Background()

// Keep the processed jobs count.
processed atomic.Int32
)

// waitGroup to await the start of the first two jobs
var wg sync.WaitGroup
wg.Add(2)

err := ForEach(ctx, []interface{}{"a", "b", "c"}, 2, func(ctx context.Context, job interface{}) error {
wg.Done()

if processed.CAS(0, 1) {
// wait till two jobs have been started
wg.Wait()
return errors.New("the first request is failing")
}

// Wait till context is cancelled to add processed jobs.
<-ctx.Done()
processed.Add(1)

return nil
})

require.EqualError(t, err, "the first request is failing")

// Since we expect the first error interrupts the workers, we should only
// see 2 job processed (the one which immediately returned error and the
// job with "b").
assert.Equal(t, int32(2), processed.Load())
}

func TestForEach_ShouldReturnImmediatelyOnNoJobsProvided(t *testing.T) {
require.NoError(t, ForEach(context.Background(), nil, 2, func(ctx context.Context, job interface{}) error {
return nil
}))
}
9 changes: 0 additions & 9 deletions vendor/github.com/grafana/dskit/internal/math/math.go

This file was deleted.

Loading