Skip to content

Commit

Permalink
Stop using global logger in disttributor and other modules
Browse files Browse the repository at this point in the history
  • Loading branch information
shantanualsi committed Oct 27, 2023
1 parent 583bfae commit 7fa0b8e
Show file tree
Hide file tree
Showing 12 changed files with 42 additions and 34 deletions.
2 changes: 1 addition & 1 deletion cmd/querytee/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func main() {
registry := prometheus.NewRegistry()
registry.MustRegister(collectors.NewGoCollector())

i := querytee.NewInstrumentationServer(cfg.ServerMetricsPort, registry)
i := querytee.NewInstrumentationServer(cfg.ServerMetricsPort, registry, util_log.Logger)
if err := i.Start(); err != nil {
level.Error(util_log.Logger).Log("msg", "Unable to start instrumentation server", "err", err.Error())
os.Exit(1)
Expand Down
12 changes: 6 additions & 6 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func New(
ingestersRing ring.ReadRing,
overrides Limits,
registerer prometheus.Registerer,
log log.Logger,
logger log.Logger,
) (*Distributor, error) {
factory := cfg.factory
if factory == nil {
Expand Down Expand Up @@ -169,13 +169,13 @@ func New(

d := &Distributor{
cfg: cfg,
log: log,
log: logger,
clientCfg: clientCfg,
tenantConfigs: configs,
tenantsRetention: retention.NewTenantsRetention(overrides),
ingestersRing: ingestersRing,
validator: validator,
pool: clientpool.NewPool("ingester", clientCfg.PoolConfig, ingestersRing, factory, log),
pool: clientpool.NewPool("ingester", clientCfg.PoolConfig, ingestersRing, factory, logger),
labelCache: labelCache,
shardTracker: NewShardTracker(),
healthyInstancesCount: atomic.NewUint32(0),
Expand All @@ -200,13 +200,13 @@ func New(
Name: "stream_sharding_count",
Help: "Total number of times the distributor has sharded streams",
}),
writeFailuresManager: writefailures.NewManager(log, registerer, cfg.WriteFailuresLogging, configs, "distributor"),
writeFailuresManager: writefailures.NewManager(logger, registerer, cfg.WriteFailuresLogging, configs, "distributor"),
}

if overrides.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
d.rateLimitStrat = validation.GlobalIngestionRateStrategy

distributorsRing, distributorsLifecycler, err = newRingAndLifecycler(cfg.DistributorRing, d.healthyInstancesCount, log, registerer)
distributorsRing, distributorsLifecycler, err = newRingAndLifecycler(cfg.DistributorRing, d.healthyInstancesCount, logger, registerer)
if err != nil {
return nil, err
}
Expand All @@ -233,7 +233,7 @@ func New(
clientCfg.PoolConfig,
ingestersRing,
ring_client.PoolAddrFunc(internalFactory),
log,
logger,
),
overrides,
registerer,
Expand Down
4 changes: 2 additions & 2 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
return nil, err
}

q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, prometheus.DefaultRegisterer)
q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, prometheus.DefaultRegisterer, util_log.Logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1551,7 +1551,7 @@ func (t *Loki) createRulerQueryEngine(logger log.Logger) (eng *logql.Engine, err
return nil, fmt.Errorf("could not create delete requests store: %w", err)
}

q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, nil)
q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, nil, util_log.Logger)
if err != nil {
return nil, fmt.Errorf("could not create querier: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/lokifrontend/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func InitFrontend(cfg CombinedFrontendConfig, ring ring.ReadRing, limits v1.Limi
case cfg.FrontendV2.SchedulerAddress != "" || ring != nil:
// If query-scheduler address is configured, use Frontend.
if cfg.FrontendV2.Addr == "" {
addr, err := util.GetFirstAddressOf(cfg.FrontendV2.InfNames)
addr, err := util.GetFirstAddressOf(cfg.FrontendV2.InfNames, log)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "failed to get frontend address")
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"time"

"github.com/go-kit/log"
"github.com/opentracing/opentracing-go"

"github.com/grafana/loki/pkg/storage/stores/index"
Expand Down Expand Up @@ -110,21 +111,23 @@ type SingleTenantQuerier struct {
ingesterQuerier *IngesterQuerier
deleteGetter deleteGetter
metrics *Metrics
logger log.Logger
}

type deleteGetter interface {
GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]deletion.DeleteRequest, error)
}

// New makes a new Querier.
func New(cfg Config, store Store, ingesterQuerier *IngesterQuerier, limits Limits, d deleteGetter, r prometheus.Registerer) (*SingleTenantQuerier, error) {
func New(cfg Config, store Store, ingesterQuerier *IngesterQuerier, limits Limits, d deleteGetter, r prometheus.Registerer, logger log.Logger) (*SingleTenantQuerier, error) {
return &SingleTenantQuerier{
cfg: cfg,
store: store,
ingesterQuerier: ingesterQuerier,
limits: limits,
deleteGetter: d,
metrics: NewMetrics(r),
logger: logger,
}, nil
}

Expand Down Expand Up @@ -497,6 +500,7 @@ func (q *SingleTenantQuerier) Tail(ctx context.Context, req *logproto.TailReques
q.cfg.TailMaxDuration,
tailerWaitEntryThrottle,
q.metrics,
q.logger,
), nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/ring"
Expand Down Expand Up @@ -1290,7 +1291,7 @@ func newQuerier(cfg Config, clientCfg client.Config, clientFactory ring_client.P
return nil, err
}

return New(cfg, store, iq, limits, dg, nil)
return New(cfg, store, iq, limits, dg, nil, log.NewNopLogger())
}

type mockDeleteGettter struct {
Expand Down
14 changes: 9 additions & 5 deletions pkg/querier/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"

Expand Down Expand Up @@ -60,6 +61,7 @@ type Tailer struct {
// how long do we want to wait by going into sleep
waitEntryThrottle time.Duration
metrics *Metrics
logger log.Logger
}

func (t *Tailer) readTailClients() {
Expand Down Expand Up @@ -87,11 +89,11 @@ func (t *Tailer) loop() {
case <-checkConnectionTicker.C:
// Try to reconnect dropped ingesters and connect to new ingesters
if err := t.checkIngesterConnections(); err != nil {
level.Error(util_log.Logger).Log("msg", "Error reconnecting to disconnected ingesters", "err", err)
level.Error(t.logger).Log("msg", "Error reconnecting to disconnected ingesters", "err", err)
}
case <-tailMaxDurationTicker.C:
if err := t.close(); err != nil {
level.Error(util_log.Logger).Log("msg", "Error closing Tailer", "err", err)
level.Error(t.logger).Log("msg", "Error closing Tailer", "err", err)
}
t.closeErrChan <- errors.New("reached tail max duration limit")
return
Expand Down Expand Up @@ -137,12 +139,12 @@ func (t *Tailer) loop() {
if numClients == 0 {
// All the connections to ingesters are dropped, try reconnecting or return error
if err := t.checkIngesterConnections(); err != nil {
level.Error(util_log.Logger).Log("msg", "Error reconnecting to ingesters", "err", err)
level.Error(t.logger).Log("msg", "Error reconnecting to ingesters", "err", err)
} else {
continue
}
if err := t.close(); err != nil {
level.Error(util_log.Logger).Log("msg", "Error closing Tailer", "err", err)
level.Error(t.logger).Log("msg", "Error closing Tailer", "err", err)
}
t.closeErrChan <- errors.New("all ingesters closed the connection")
return
Expand Down Expand Up @@ -209,7 +211,7 @@ func (t *Tailer) readTailClient(addr string, querierTailClient logproto.Querier_
var err error
defer t.dropTailClient(addr)

logger := util_log.WithContext(querierTailClient.Context(), util_log.Logger)
logger := util_log.WithContext(querierTailClient.Context(), t.logger)
for {
if t.stopped {
if err := querierTailClient.CloseSend(); err != nil {
Expand Down Expand Up @@ -306,6 +308,7 @@ func newTailer(
tailMaxDuration time.Duration,
waitEntryThrottle time.Duration,
m *Metrics,
logger log.Logger,
) *Tailer {
t := Tailer{
openStreamIterator: iter.NewMergeEntryIterator(context.Background(), []iter.EntryIterator{historicEntries}, logproto.FORWARD),
Expand All @@ -318,6 +321,7 @@ func newTailer(
tailMaxDuration: tailMaxDuration,
waitEntryThrottle: waitEntryThrottle,
metrics: m,
logger: logger,
}

t.metrics.tailsActive.Inc()
Expand Down
3 changes: 2 additions & 1 deletion pkg/querier/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

gokitlog "github.com/go-kit/log"
"github.com/grafana/loki/pkg/iter"
loghttp "github.com/grafana/loki/pkg/loghttp/legacy"
"github.com/grafana/loki/pkg/logproto"
Expand Down Expand Up @@ -161,7 +162,7 @@ func TestTailer(t *testing.T) {
tailClients["test"] = test.tailClient
}

tailer := newTailer(0, tailClients, test.historicEntries, tailDisconnectedIngesters, timeout, throttle, NewMetrics(nil))
tailer := newTailer(0, tailClients, test.historicEntries, tailDisconnectedIngesters, timeout, throttle, NewMetrics(nil), gokitlog.NewNopLogger())
defer tailer.close()

test.tester(t, tailer, test.tailClient)
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stores/series/index/table_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ type TableManager struct {

// NewTableManager makes a new TableManager
func NewTableManager(cfg TableManagerConfig, schemaCfg config.SchemaConfig, maxChunkAge time.Duration, tableClient TableClient,
objectClient BucketClient, extraTables []ExtraTables, registerer prometheus.Registerer, log log.Logger,
objectClient BucketClient, extraTables []ExtraTables, registerer prometheus.Registerer, logger log.Logger,
) (*TableManager, error) {
if cfg.RetentionPeriod != 0 {
// Assume the newest config is the one to use for validation of retention
Expand All @@ -184,7 +184,7 @@ func NewTableManager(cfg TableManagerConfig, schemaCfg config.SchemaConfig, maxC

tm := &TableManager{
cfg: cfg,
log: log,
log: logger,
schemaCfg: schemaCfg,
maxChunkAge: maxChunkAge,
client: tableClient,
Expand Down
13 changes: 6 additions & 7 deletions pkg/util/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,26 @@ import (
"net"
"strings"

"github.com/go-kit/log"
"github.com/go-kit/log/level"

util_log "github.com/grafana/loki/pkg/util/log"
)

// GetFirstAddressOf returns the first IPv4 address of the supplied interface names, omitting any 169.254.x.x automatic private IPs if possible.
func GetFirstAddressOf(names []string) (string, error) {
func GetFirstAddressOf(names []string, logger log.Logger) (string, error) {
var ipAddr string
for _, name := range names {
inf, err := net.InterfaceByName(name)
if err != nil {
level.Warn(util_log.Logger).Log("msg", "error getting interface", "inf", name, "err", err)
level.Warn(logger).Log("msg", "error getting interface", "inf", name, "err", err)
continue
}
addrs, err := inf.Addrs()
if err != nil {
level.Warn(util_log.Logger).Log("msg", "error getting addresses for interface", "inf", name, "err", err)
level.Warn(logger).Log("msg", "error getting addresses for interface", "inf", name, "err", err)
continue
}
if len(addrs) <= 0 {
level.Warn(util_log.Logger).Log("msg", "no addresses found for interface", "inf", name, "err", err)
level.Warn(logger).Log("msg", "no addresses found for interface", "inf", name, "err", err)
continue
}
if ip := filterIPs(addrs); ip != "" {
Expand All @@ -40,7 +39,7 @@ func GetFirstAddressOf(names []string) (string, error) {
return "", fmt.Errorf("no address found for %s", names)
}
if strings.HasPrefix(ipAddr, `169.254.`) {
level.Warn(util_log.Logger).Log("msg", "using automatic private ip", "address", ipAddr)
level.Warn(logger).Log("msg", "using automatic private ip", "address", ipAddr)
}
return ipAddr, nil
}
Expand Down
9 changes: 5 additions & 4 deletions tools/querytee/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,26 @@ import (
"net"
"net/http"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"

util_log "github.com/grafana/loki/pkg/util/log"
)

type InstrumentationServer struct {
port int
registry *prometheus.Registry
srv *http.Server
logger log.Logger
}

// NewInstrumentationServer returns a server exposing Prometheus metrics.
func NewInstrumentationServer(port int, registry *prometheus.Registry) *InstrumentationServer {
func NewInstrumentationServer(port int, registry *prometheus.Registry, logger log.Logger) *InstrumentationServer {
return &InstrumentationServer{
port: port,
registry: registry,
logger: logger,
}
}

Expand All @@ -44,7 +45,7 @@ func (s *InstrumentationServer) Start() error {

go func() {
if err := s.srv.Serve(listener); err != nil {
level.Error(util_log.Logger).Log("msg", "metrics server terminated", "err", err)
level.Error(s.logger).Log("msg", "metrics server terminated", "err", err)
}
}()

Expand Down
4 changes: 1 addition & 3 deletions tools/querytee/proxy_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"

util_log "github.com/grafana/loki/pkg/util/log"
)

type ResponsesComparator interface {
Expand Down Expand Up @@ -182,7 +180,7 @@ func (p *ProxyEndpoint) executeBackendRequests(r *http.Request, resCh chan *back
result := comparisonSuccess
summary, err := p.compareResponses(expectedResponse, actualResponse)
if err != nil {
level.Error(util_log.Logger).Log("msg", "response comparison failed",
level.Error(p.logger).Log("msg", "response comparison failed",
"backend-name", p.backends[i].name,
"route-name", p.routeName,
"query", r.URL.RawQuery, "err", err)
Expand Down

0 comments on commit 7fa0b8e

Please sign in to comment.