Skip to content

Commit

Permalink
Replace usage of sync/atomic with uber-go/atomic (#2449)
Browse files Browse the repository at this point in the history
* distributor: Replace usage of sync/atomic with uber-go/atomic

Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com>

* ingester: Replace usage of sync/atomic with uber-go/atomic

Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com>

* Vendor go.uber.org/atomic

Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com>

* Install faillint in the 'loki-build-image' Docker image

Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com>

* faillint: Ensure we are not using sync/atomic

Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com>
  • Loading branch information
jvrplmlmn authored Jul 30, 2020
1 parent 7a60efc commit 6852b1c
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 13 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ publish: dist

lint:
GO111MODULE=on GOGC=10 golangci-lint run -v $(GOLANGCI_ARG)
faillint -paths "sync/atomic=go.uber.org/atomic" ./...

########
# Test #
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ require (
github.com/ugorji/go v1.1.7 // indirect
github.com/weaveworks/common v0.0.0-20200625145055-4b1847531bc9
go.etcd.io/bbolt v1.3.5-0.20200615073812-232d8fc87f50
go.uber.org/atomic v1.6.0
golang.org/x/net v0.0.0-20200707034311-ab3426394381
google.golang.org/grpc v1.29.1
gopkg.in/alecthomas/kingpin.v2 v2.2.6
Expand Down
9 changes: 9 additions & 0 deletions loki-build-image/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ RUN apk add --no-cache docker-cli
FROM golang:1.14.2 as drone
RUN GO111MODULE=on go get github.com/drone/drone-cli/drone@1fad337d74ca0ecf420993d9d2d7229a1c99f054

# Install faillint used to lint go imports in CI.
# This collisions with the version of go tools used in the base image, thus we install it in its own image and copy it over.
# Error:
# github.com/fatih/faillint@v1.5.0 requires golang.org/x/tools@v0.0.0-20200207224406-61798d64f025
# (not golang.org/x/tools@v0.0.0-20190918214920-58d531046acd from golang.org/x/tools/cmd/goyacc@58d531046acdc757f177387bc1725bfa79895d69)
FROM golang:1.14.2 as faillint
RUN GO111MODULE=on go get github.com/fatih/faillint@v1.5.0

FROM golang:1.14.2-stretch
RUN apt-get update && \
apt-get install -qy \
Expand All @@ -33,6 +41,7 @@ COPY --from=docker /usr/bin/docker /usr/bin/docker
COPY --from=helm /usr/bin/helm /usr/bin/helm
COPY --from=golangci /bin/golangci-lint /usr/local/bin
COPY --from=drone /go/bin/drone /usr/bin/drone
COPY --from=faillint /go/bin/faillint /usr/bin/faillint

# Install some necessary dependencies.
# Forcing GO111MODULE=on is required to specify dependencies at specific versions using the go mod notation.
Expand Down
20 changes: 10 additions & 10 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"flag"
"net/http"
"sync/atomic"
"time"

cortex_distributor "github.com/cortexproject/cortex/pkg/distributor"
Expand All @@ -14,6 +13,7 @@ import (
"github.com/cortexproject/cortex/pkg/util/limiter"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/pkg/errors"
"go.uber.org/atomic"

"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -164,14 +164,14 @@ type streamTracker struct {
stream logproto.Stream
minSuccess int
maxFailures int
succeeded int32
failed int32
succeeded atomic.Int32
failed atomic.Int32
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
type pushTracker struct {
samplesPending int32
samplesFailed int32
samplesPending atomic.Int32
samplesFailed atomic.Int32
done chan struct{}
err chan error
}
Expand Down Expand Up @@ -263,10 +263,10 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}

tracker := pushTracker{
samplesPending: int32(len(streams)),
done: make(chan struct{}),
err: make(chan error),
}
tracker.samplesPending.Store(int32(len(streams)))
for ingester, samples := range samplesByIngester {
go func(ingester ring.IngesterDesc, samples []*streamTracker) {
// Use a background context to make sure all ingesters get samples even if we return early
Expand Down Expand Up @@ -304,17 +304,17 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester ring.IngesterDes
// goroutine will write to either channel.
for i := range streamTrackers {
if err != nil {
if atomic.AddInt32(&streamTrackers[i].failed, 1) <= int32(streamTrackers[i].maxFailures) {
if streamTrackers[i].failed.Inc() <= int32(streamTrackers[i].maxFailures) {
continue
}
if atomic.AddInt32(&pushTracker.samplesFailed, 1) == 1 {
if pushTracker.samplesFailed.Inc() == 1 {
pushTracker.err <- err
}
} else {
if atomic.AddInt32(&streamTrackers[i].succeeded, 1) != int32(streamTrackers[i].minSuccess) {
if streamTrackers[i].succeeded.Inc() != int32(streamTrackers[i].minSuccess) {
continue
}
if atomic.AddInt32(&pushTracker.samplesPending, -1) == 0 {
if pushTracker.samplesPending.Dec() == 0 {
pushTracker.done <- struct{}{}
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingester/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"

"github.com/prometheus/prometheus/pkg/labels"

Expand All @@ -14,6 +13,7 @@ import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"go.uber.org/atomic"
)

const maxMappedFP = 1 << 20 // About 1M fingerprints reserved for mapping.
Expand All @@ -24,7 +24,7 @@ var separatorString = string([]byte{model.SeparatorByte})
// collisions.
type fpMapper struct {
// highestMappedFP has to be aligned for atomic operations.
highestMappedFP model.Fingerprint
highestMappedFP atomic.Uint64

mtx sync.RWMutex // Protects mappings.
// maps original fingerprints to a map of string representations of
Expand Down Expand Up @@ -163,7 +163,7 @@ func (m *fpMapper) maybeAddMapping(fp model.Fingerprint, collidingMetric []clien
}

func (m *fpMapper) nextMappedFP() model.Fingerprint {
mappedFP := model.Fingerprint(atomic.AddUint64((*uint64)(&m.highestMappedFP), 1))
mappedFP := model.Fingerprint(m.highestMappedFP.Inc())
if mappedFP > maxMappedFP {
panic(fmt.Errorf("more than %v fingerprints mapped in collision detection", maxMappedFP))
}
Expand Down
1 change: 1 addition & 0 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,7 @@ go.opencensus.io/trace/internal
go.opencensus.io/trace/propagation
go.opencensus.io/trace/tracestate
# go.uber.org/atomic v1.6.0
## explicit
go.uber.org/atomic
# go.uber.org/goleak v1.0.0
go.uber.org/goleak
Expand Down

0 comments on commit 6852b1c

Please sign in to comment.