Skip to content

Commit

Permalink
[nspcc-dev#1770] node: Support runtime re-configuration of the logger
Browse files Browse the repository at this point in the history
There is a need to re-configure storage node's logging on the
fly.

Provide `SubscribeConfigChanges` function to listen to changes of the
storage node config. Create dynamically configured `logger.Logger` in
`neofs-node` app and use it to log notifications about new Sidechain
blocks. Update `logger.Level` parameter on each `syscall.SIGHUP` signal.

Signed-off-by: Leonard Lyubich <ctulhurider@gmail.com>
  • Loading branch information
cthulhu-rider committed Oct 6, 2022
1 parent 541bf3d commit dfe3908
Show file tree
Hide file tree
Showing 15 changed files with 146 additions and 74 deletions.
41 changes: 28 additions & 13 deletions cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ type applicationConfiguration struct {
shardPoolSize uint32
shards []shardCfg
}

logger configLogger
}

type shardCfg struct {
Expand Down Expand Up @@ -171,6 +173,11 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
a._read = true
}

err := a.logger.read(c)
if err != nil {
return fmt.Errorf("logger section: %w", err)
}

a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)

Expand Down Expand Up @@ -278,7 +285,7 @@ type internals struct {

appCfg *config.Config

log *zap.Logger
log logger.Logger

wg *sync.WaitGroup
workers []worker
Expand Down Expand Up @@ -335,8 +342,13 @@ type shared struct {
metricsCollector *metrics.NodeMetrics
}

type runtimeConfiguration struct {
logger runtimeConfigLogger
}

type cfg struct {
applicationConfiguration
runtimeConfiguration
internals
shared

Expand Down Expand Up @@ -481,9 +493,6 @@ func initCfg(appCfg *config.Config) *cfg {
)
fatalOnErr(err)

log, err := logger.NewLogger(logPrm)
fatalOnErr(err)

var netAddr network.AddressGroup

relayOnly := nodeconfig.Relay(appCfg)
Expand Down Expand Up @@ -513,7 +522,6 @@ func initCfg(appCfg *config.Config) *cfg {
ctx: context.Background(),
appCfg: appCfg,
internalErr: make(chan error),
log: log,
wg: new(sync.WaitGroup),
apiVersion: version.Current(),
healthStatus: atomic.NewInt32(int32(control.HealthStatus_HEALTH_STATUS_UNDEFINED)),
Expand Down Expand Up @@ -561,6 +569,9 @@ func initCfg(appCfg *config.Config) *cfg {
},
}

err = bindRuntimeConfigLogger(&c.internals.log, &c.runtimeConfiguration.logger)
fatalOnErr(err)

// returned err must be nil during first time read
err = c.readConfig(appCfg)
if err != nil {
Expand All @@ -587,7 +598,7 @@ func (c *cfg) engineOpts() []engine.Option {
engine.WithShardPoolSize(c.EngineCfg.shardPoolSize),
engine.WithErrorThreshold(c.EngineCfg.errorThreshold),

engine.WithLogger(c.log),
engine.WithLogger(&c.log),
)

if c.metricsCollector != nil {
Expand Down Expand Up @@ -617,7 +628,7 @@ func (c *cfg) shardOpts() []shardOptsWithMetaPath {
writecache.WithFlushWorkersCount(wcRead.flushWorkerCount),
writecache.WithMaxCacheSize(wcRead.sizeLimit),

writecache.WithLogger(c.log),
writecache.WithLogger(&c.log),
)
}

Expand Down Expand Up @@ -645,7 +656,7 @@ func (c *cfg) shardOpts() []shardOptsWithMetaPath {
blobovniczatree.WithBlobovniczaShallowWidth(sRead.width),
blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize),

blobovniczatree.WithLogger(c.log)),
blobovniczatree.WithLogger(&c.log)),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) < shCfg.smallSizeObjectLimit
},
Expand All @@ -669,15 +680,15 @@ func (c *cfg) shardOpts() []shardOptsWithMetaPath {
var sh shardOptsWithMetaPath
sh.metaPath = shCfg.metaCfg.path
sh.shOpts = []shard.Option{
shard.WithLogger(c.log),
shard.WithLogger(&c.log),
shard.WithRefillMetabase(shCfg.refillMetabase),
shard.WithMode(shCfg.mode),
shard.WithBlobStorOptions(
blobstor.WithCompressObjects(shCfg.compress),
blobstor.WithUncompressableContentTypes(shCfg.uncompressableContentType),
blobstor.WithStorages(ss),

blobstor.WithLogger(c.log),
blobstor.WithLogger(&c.log),
),
shard.WithMetaBaseOptions(
meta.WithPath(shCfg.metaCfg.path),
Expand All @@ -688,7 +699,7 @@ func (c *cfg) shardOpts() []shardOptsWithMetaPath {
Timeout: 100 * time.Millisecond,
}),

meta.WithLogger(c.log),
meta.WithLogger(&c.log),
meta.WithEpochState(c.cfgNetmap.state),
),
shard.WithPiloramaOptions(piloramaOpts...),
Expand Down Expand Up @@ -730,7 +741,7 @@ func initLocalStorage(c *cfg) {
tombstoneSrc := tsourse.NewSource(tssPrm)

tombstoneSource := tombstone.NewChecker(
tombstone.WithLogger(c.log),
tombstone.WithLogger(&c.log),
tombstone.WithTombstoneSource(tombstoneSrc),
)

Expand Down Expand Up @@ -827,12 +838,16 @@ func (c *cfg) configWatcher(ctx context.Context) {
case <-ch:
c.log.Info("SIGHUP has been received, rereading configuration...")

err := c.readConfig(c.appCfg)
err := c.applicationConfiguration.readConfig(c.appCfg)
if err != nil {
c.log.Error("configuration reading", zap.Error(err))
continue
}

// TODO: use function over runtimeConfiguration and applicationConfiguration
// it can't be done until shardOpts
c.runtimeConfiguration.logger.write(c.applicationConfiguration.logger)

var rcfg engine.ReConfiguration
for _, optsWithMeta := range c.shardOpts() {
rcfg.AddShard(optsWithMeta.metaPath, optsWithMeta.shOpts)
Expand Down
12 changes: 6 additions & 6 deletions cmd/neofs-node/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,22 +130,22 @@ func initContainerService(c *cfg) {
}

localMetrics := &localStorageLoad{
log: c.log,
log: &c.log,
engine: c.cfgObject.cfgLocalStorage.localStorage,
}

pubKey := c.key.PublicKey().Bytes()

resultWriter := &morphLoadWriter{
log: c.log,
log: &c.log,
cnrMorphClient: wrapperNoNotary,
key: pubKey,
}

loadAccumulator := loadstorage.New(loadstorage.Prm{})

loadPlacementBuilder := &loadPlacementBuilder{
log: c.log,
log: &c.log,
nmSrc: c.netMapSource,
cnrSrc: cnrSrc,
}
Expand All @@ -165,7 +165,7 @@ func initContainerService(c *cfg) {
},
Builder: routeBuilder,
},
loadroute.WithLogger(c.log),
loadroute.WithLogger(&c.log),
)

ctrl := loadcontroller.New(
Expand All @@ -175,7 +175,7 @@ func initContainerService(c *cfg) {
LocalAnnouncementTarget: loadRouter,
ResultReceiver: loadcontroller.SimpleWriterProvider(resultWriter),
},
loadcontroller.WithLogger(c.log),
loadcontroller.WithLogger(&c.log),
)

setContainerNotificationParser(c, startEstimationNotifyEvent, containerEvent.ParseStartEstimation)
Expand Down Expand Up @@ -232,7 +232,7 @@ func addContainerAsyncNotificationHandler(c *cfg, sTyp string, h event.Handler)
event.WorkerPoolHandler(
c.cfgContainer.workerPool,
h,
c.log,
&c.log,
),
)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/neofs-node/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func initControlService(c *cfg) {
c.cfgControlService.server = grpc.NewServer()

c.onShutdown(func() {
stopGRPC("NeoFS Control API", c.cfgControlService.server, c.log)
stopGRPC("NeoFS Control API", c.cfgControlService.server, &c.log)
})

control.RegisterControlServiceServer(c.cfgControlService.server, ctlSvc)
Expand Down
2 changes: 1 addition & 1 deletion cmd/neofs-node/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func initGRPC(c *cfg) {
srv := grpc.NewServer(serverOpts...)

c.onShutdown(func() {
stopGRPC("NeoFS Public API", srv, c.log)
stopGRPC("NeoFS Public API", srv, &c.log)
})

c.cfgGRPC.servers = append(c.cfgGRPC.servers, srv)
Expand Down
47 changes: 47 additions & 0 deletions cmd/neofs-node/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package main

import (
"fmt"

"github.com/nspcc-dev/neofs-node/cmd/neofs-node/config"
loggerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/logger"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
)

type configLogger struct {
level logger.Level
}

func (x *configLogger) read(c *config.Config) error {
switch strLvl := loggerconfig.Level(c); strLvl {
default:
return fmt.Errorf("unsupported level %s", strLvl)
case "debug":
x.level = logger.LevelDebug
case "info":
x.level = logger.LevelInfo
case "warn":
x.level = logger.LevelWarn
case "error":
x.level = logger.LevelError
}

return nil
}

type runtimeConfigLogger struct {
c logger.Config
}

func (x *runtimeConfigLogger) write(c configLogger) {
x.c.SetLevel(c.level)
}

func bindRuntimeConfigLogger(l *logger.Logger, c *runtimeConfigLogger) error {
err := logger.InitConfigurable(l, &c.c)
if err != nil {
return fmt.Errorf("init logger configurable at runtime: %w", err)
}

return nil
}
6 changes: 3 additions & 3 deletions cmd/neofs-node/morph.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func initMorphComponents(c *cfg) {

cli, err := client.New(c.key,
client.WithDialTimeout(morphconfig.DialTimeout(c.appCfg)),
client.WithLogger(c.log),
client.WithLogger(&c.log),
client.WithEndpoints(addresses...),
client.WithConnLostCallback(func() {
c.internalErr <- errors.New("morph connection has been lost")
Expand Down Expand Up @@ -193,14 +193,14 @@ func listenMorphNotifications(c *cfg) {
}

subs, err = subscriber.New(c.ctx, &subscriber.Params{
Log: c.log,
Log: &c.log,
StartFromBlock: fromSideChainBlock,
Client: c.cfgMorph.client,
})
fatalOnErr(err)

lis, err := event.NewListener(event.ListenerParams{
Logger: c.log,
Logger: &c.log,
Subscriber: subs,
WorkerPoolCapacity: listenerPoolCap,
})
Expand Down
2 changes: 1 addition & 1 deletion cmd/neofs-node/netmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func addNewEpochAsyncNotificationHandler(c *cfg, h event.Handler) {
event.WorkerPoolHandler(
c.cfgNetmap.workerPool,
h,
c.log,
&c.log,
),
)
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/neofs-node/notificator.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,24 +124,24 @@ func initNotifications(c *cfg) {
nodeconfig.Notification(c.appCfg).KeyPath(),
),
nats.WithRootCA(nodeconfig.Notification(c.appCfg).CAPath()),
nats.WithLogger(c.log),
nats.WithLogger(&c.log),
)

c.cfgNotifications = cfgNotifications{
enabled: true,
nw: notificationWriter{
l: c.log,
l: &c.log,
w: natsSvc,
},
defaultTopic: topic,
}

n := notificator.New(new(notificator.Prm).
SetLogger(c.log).
SetLogger(&c.log).
SetNotificationSource(
&notificationSource{
e: c.cfgObject.cfgLocalStorage.localStorage,
l: c.log,
l: &c.log,
defaultTopic: topic,
}).
SetWriter(c.cfgNotifications.nw),
Expand Down
Loading

0 comments on commit dfe3908

Please sign in to comment.