Skip to content

Commit

Permalink
Stop using global logger in modules (#11051)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
Refactors Loki to inject logger from the constructors instead of using
global logger variable. It is generally considered a good practice.

**Which issue(s) this PR fixes**:
[Fixes #<issue
number>](grafana/loki-private#810)

**Special notes for your reviewer**:

**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [ ] Tests updated
- [x] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory. <!--
TODO(salvacorts): Add example PR -->
  • Loading branch information
shantanualsi committed Nov 1, 2023
1 parent 303e353 commit fb6f6d2
Show file tree
Hide file tree
Showing 28 changed files with 180 additions and 155 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* [10785](https://github.com/grafana/loki/pull/10785) **ashwanthgoli** Config: Removes `querier.worker-parallelism` and updates default value of `querier.max-concurrent` to 4.
* [10733](https://github.com/grafana/loki/pull/10733) **shantanualsi** Add support for case-insensitive logql funtions
* [10727](https://github.com/grafana/loki/pull/10727) **sandeepsukhani** Native otlp ingestion support

* [11051](https://github.com/grafana/loki/pull/11051) Refactor to not use global logger in modules
##### Fixes

##### Changes
Expand Down
6 changes: 2 additions & 4 deletions clients/pkg/promtail/targets/windows/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/clients/pkg/promtail/targets/target"
"github.com/grafana/loki/clients/pkg/promtail/targets/windows/win_eventlog"

util_log "github.com/grafana/loki/pkg/util/log"
)

var fs = afero.NewOsFs()
Expand Down Expand Up @@ -115,7 +113,7 @@ func (t *Target) loop() {
if err != nil {
if err != win_eventlog.ERROR_NO_MORE_ITEMS {
t.err = err
level.Error(util_log.Logger).Log("msg", "error fetching events", "err", err)
level.Error(t.logger).Log("msg", "error fetching events", "err", err)
}
break loop
}
Expand All @@ -125,7 +123,7 @@ func (t *Target) loop() {
t.handler.Chan() <- entry
if err := t.bm.save(handles[i]); err != nil {
t.err = err
level.Error(util_log.Logger).Log("msg", "error saving bookmark", "err", err)
level.Error(t.logger).Log("msg", "error saving bookmark", "err", err)
}
}
win_eventlog.Close(handles)
Expand Down
2 changes: 1 addition & 1 deletion cmd/querytee/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func main() {
registry := prometheus.NewRegistry()
registry.MustRegister(collectors.NewGoCollector())

i := querytee.NewInstrumentationServer(cfg.ServerMetricsPort, registry)
i := querytee.NewInstrumentationServer(cfg.ServerMetricsPort, registry, util_log.Logger)
if err := i.Start(); err != nil {
level.Error(util_log.Logger).Log("msg", "Unable to start instrumentation server", "err", err.Error())
os.Exit(1)
Expand Down
23 changes: 13 additions & 10 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type Distributor struct {
services.Service

cfg Config
logger log.Logger
clientCfg client.Config
tenantConfigs *runtime.TenantConfigs
tenantsRetention *retention.TenantsRetention
Expand Down Expand Up @@ -135,6 +136,7 @@ func New(
overrides Limits,
registerer prometheus.Registerer,
metricsNamespace string,
logger log.Logger,
) (*Distributor, error) {
factory := cfg.factory
if factory == nil {
Expand Down Expand Up @@ -169,12 +171,13 @@ func New(

d := &Distributor{
cfg: cfg,
logger: logger,
clientCfg: clientCfg,
tenantConfigs: configs,
tenantsRetention: retention.NewTenantsRetention(overrides),
ingestersRing: ingestersRing,
validator: validator,
pool: clientpool.NewPool("ingester", clientCfg.PoolConfig, ingestersRing, factory, util_log.Logger),
pool: clientpool.NewPool("ingester", clientCfg.PoolConfig, ingestersRing, factory, logger),
labelCache: labelCache,
shardTracker: NewShardTracker(),
healthyInstancesCount: atomic.NewUint32(0),
Expand All @@ -199,13 +202,13 @@ func New(
Name: "stream_sharding_count",
Help: "Total number of times the distributor has sharded streams",
}),
writeFailuresManager: writefailures.NewManager(util_log.Logger, registerer, cfg.WriteFailuresLogging, configs, "distributor"),
writeFailuresManager: writefailures.NewManager(logger, registerer, cfg.WriteFailuresLogging, configs, "distributor"),
}

if overrides.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
d.rateLimitStrat = validation.GlobalIngestionRateStrategy

distributorsRing, distributorsLifecycler, err = newRingAndLifecycler(cfg.DistributorRing, d.healthyInstancesCount, util_log.Logger, registerer, metricsNamespace)
distributorsRing, distributorsLifecycler, err = newRingAndLifecycler(cfg.DistributorRing, d.healthyInstancesCount, logger, registerer, metricsNamespace)
if err != nil {
return nil, err
}
Expand All @@ -232,7 +235,7 @@ func New(
clientCfg.PoolConfig,
ingestersRing,
ring_client.PoolAddrFunc(internalFactory),
util_log.Logger,
logger,
),
overrides,
registerer,
Expand Down Expand Up @@ -473,7 +476,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
// The number of shards is limited by the number of entries.
func (d *Distributor) shardStream(stream logproto.Stream, pushSize int, tenantID string) ([]uint32, []streamTracker) {
shardStreamsCfg := d.validator.Limits.ShardStreams(tenantID)
logger := log.With(util_log.WithUserID(tenantID, util_log.Logger), "stream", stream.Labels)
logger := log.With(util_log.WithUserID(tenantID, d.logger), "stream", stream.Labels)
shardCount := d.shardCountFor(logger, &stream, pushSize, tenantID, shardStreamsCfg)

if shardCount <= 1 {
Expand Down Expand Up @@ -502,7 +505,7 @@ func (d *Distributor) divideEntriesBetweenShards(tenantID string, totalShards in

func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tenantID string, shardStreamsCfg *shardstreams.Config) ([]uint32, []streamTracker) {
var (
streamLabels = labelTemplate(stream.Labels)
streamLabels = labelTemplate(stream.Labels, d.logger)
streamPattern = streamLabels.String()
derivedKeys = make([]uint32, 0, totalShards)
derivedStreams = make([]streamTracker, 0, totalShards)
Expand All @@ -511,7 +514,7 @@ func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tena
)

if totalShards <= 0 {
level.Error(util_log.Logger).Log("msg", "attempt to create shard with zeroed total shards", "org_id", tenantID, "stream", stream.Labels, "entries_len", len(stream.Entries))
level.Error(d.logger).Log("msg", "attempt to create shard with zeroed total shards", "org_id", tenantID, "stream", stream.Labels, "entries_len", len(stream.Entries))
return derivedKeys, derivedStreams
}

Expand All @@ -525,7 +528,7 @@ func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tena
derivedStreams = append(derivedStreams, streamTracker{stream: shard})

if shardStreamsCfg.LoggingEnabled {
level.Info(util_log.Logger).Log("msg", "stream derived from sharding", "src-stream", stream.Labels, "derived-stream", shard.Labels)
level.Info(d.logger).Log("msg", "stream derived from sharding", "src-stream", stream.Labels, "derived-stream", shard.Labels)
}
}
d.shardTracker.SetLastShardNum(tenantID, stream.Hash, startShard+streamCount)
Expand All @@ -542,10 +545,10 @@ func streamCount(totalShards int, stream logproto.Stream) int {

// labelTemplate returns a label set that includes the dummy label to be replaced
// To avoid allocations, this slice is reused when we know the stream value
func labelTemplate(lbls string) labels.Labels {
func labelTemplate(lbls string, logger log.Logger) labels.Labels {
baseLbls, err := syntax.ParseLabels(lbls)
if err != nil {
level.Error(util_log.Logger).Log("msg", "couldn't extract labels from stream", "stream", lbls)
level.Error(logger).Log("msg", "couldn't extract labels from stream", "stream", lbls)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,7 @@ func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation
overrides, err := validation.NewOverrides(*limits, nil)
require.NoError(t, err)

d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, overrides, prometheus.NewPedanticRegistry(), constants.Loki)
d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, overrides, prometheus.NewPedanticRegistry(), constants.Loki, log.NewNopLogger())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), d))
distributors[i] = d
Expand Down
23 changes: 12 additions & 11 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

gokit_log "github.com/go-kit/log"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/user"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -69,7 +70,7 @@ func TestIngesterWAL(t *testing.T) {
}
}

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki)
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger())
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand Down Expand Up @@ -112,7 +113,7 @@ func TestIngesterWAL(t *testing.T) {
expectCheckpoint(t, walDir, false, time.Second)

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger())
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand All @@ -126,7 +127,7 @@ func TestIngesterWAL(t *testing.T) {
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger())
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand All @@ -149,7 +150,7 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) {
}
}

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki)
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger())
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand Down Expand Up @@ -195,7 +196,7 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) {
require.NoError(t, err)

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger())
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand Down Expand Up @@ -252,7 +253,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) {
}
}

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki)
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger())
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand All @@ -273,7 +274,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) {
expectCheckpoint(t, walDir, false, time.Second)

// restart the ingester, ensuring we replayed from WAL.
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger())
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand All @@ -294,7 +295,7 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) {
}
}

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki)
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger())
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand All @@ -315,7 +316,7 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) {
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))

// restart the ingester, ensuring we can replay from the checkpoint as well.
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger())
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand Down Expand Up @@ -590,7 +591,7 @@ func TestIngesterWALReplaysUnorderedToOrdered(t *testing.T) {
}
}

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki)
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger())
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand Down Expand Up @@ -662,7 +663,7 @@ func TestIngesterWALReplaysUnorderedToOrdered(t *testing.T) {
require.NoError(t, err)

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger())
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand Down
10 changes: 5 additions & 5 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (i *Ingester) flush(mayRemoveStreams bool) {
}

i.flushQueuesDone.Wait()
level.Debug(util_log.Logger).Log("msg", "flush queues have drained")
level.Debug(i.logger).Log("msg", "flush queues have drained")
}

// FlushHandler triggers a flush of all in memory chunks. Mainly used for
Expand Down Expand Up @@ -136,7 +136,7 @@ func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate boo

func (i *Ingester) flushLoop(j int) {
defer func() {
level.Debug(util_log.Logger).Log("msg", "Ingester.flushLoop() exited")
level.Debug(i.logger).Log("msg", "Ingester.flushLoop() exited")
i.flushQueuesDone.Done()
}()

Expand All @@ -149,7 +149,7 @@ func (i *Ingester) flushLoop(j int) {

err := i.flushUserSeries(op.userID, op.fp, op.immediate)
if err != nil {
level.Error(util_log.WithUserID(op.userID, util_log.Logger)).Log("msg", "failed to flush", "err", err)
level.Error(util_log.WithUserID(op.userID, i.logger)).Log("msg", "failed to flush", "err", err)
}

// If we're exiting & we failed to flush, put the failed operation
Expand All @@ -173,7 +173,7 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat
}

lbs := labels.String()
level.Info(util_log.Logger).Log("msg", "flushing stream", "user", userID, "fp", fp, "immediate", immediate, "num_chunks", len(chunks), "labels", lbs)
level.Info(i.logger).Log("msg", "flushing stream", "user", userID, "fp", fp, "immediate", immediate, "num_chunks", len(chunks), "labels", lbs)

ctx := user.InjectOrgID(context.Background(), userID)
ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
Expand Down Expand Up @@ -382,7 +382,7 @@ func (i *Ingester) flushChunk(ctx context.Context, ch *chunk.Chunk) error {
func (i *Ingester) reportFlushedChunkStatistics(ch *chunk.Chunk, desc *chunkDesc, sizePerTenant prometheus.Counter, countPerTenant prometheus.Counter, reason string) {
byt, err := ch.Encoded()
if err != nil {
level.Error(util_log.Logger).Log("msg", "failed to encode flushed wire chunk", "err", err)
level.Error(i.logger).Log("msg", "failed to encode flushed wire chunk", "err", err)
return
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func newTestStore(t require.TestingT, cfg Config, walOverride WAL) (*testStore,
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)

ing, err := New(cfg, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki)
ing, err := New(cfg, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokitlog.NewNopLogger())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))

Expand Down

0 comments on commit fb6f6d2

Please sign in to comment.