Skip to content

Commit

Permalink
Use of zap log package (#823)
Browse files Browse the repository at this point in the history
* add new log methods to Logger interface
* apply changes on log functions
* remove useless logs
  • Loading branch information
emmanuelm41 committed Oct 7, 2021
1 parent 5a9f689 commit ea4ad2f
Show file tree
Hide file tree
Showing 32 changed files with 278 additions and 352 deletions.
2 changes: 1 addition & 1 deletion chain/beacon/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (c *partialCache) getCache(id string, p *drand.PartialBeaconPacket) *roundC
toEvict := c.rcvd[idx][0]
round, ok := c.rounds[toEvict]
if !ok {
c.l.Error("cache", "miss", "node", idx, "not_present_for", p.GetRound())
c.l.Errorw("", "cache", "miss", "node", idx, "not_present_for", p.GetRound())
return nil
}
round.flushIndex(idx)
Expand Down
22 changes: 11 additions & 11 deletions chain/beacon/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ var partialCacheStoreLimit = 3
func (c *chainStore) runAggregator() {
lastBeacon, err := c.Last()
if err != nil {
c.l.Fatal("chain_aggregator", "loading", "last_beacon", err)
c.l.Fatalw("", "chain_aggregator", "loading", "last_beacon", err)
}

var cache = newPartialCache(c.l)
Expand All @@ -111,7 +111,7 @@ func (c *chainStore) runAggregator() {
shouldStore := isNotInPast && isNotTooFar
// check if we can reconstruct
if !shouldStore {
c.l.Debug("ignoring_partial", partial.p.GetRound(), "last_beacon_stored", lastBeacon.Round)
c.l.Debugw("", "ignoring_partial", partial.p.GetRound(), "last_beacon_stored", lastBeacon.Round)
break
}
// NOTE: This line means we can only verify partial signatures of
Expand All @@ -124,23 +124,23 @@ func (c *chainStore) runAggregator() {
cache.Append(partial.p)
roundCache := cache.GetRoundCache(partial.p.GetRound(), partial.p.GetPreviousSig())
if roundCache == nil {
c.l.Error("store_partial", partial.addr, "no_round_cache", partial.p.GetRound())
c.l.Errorw("", "store_partial", partial.addr, "no_round_cache", partial.p.GetRound())
break
}

c.l.Debug("store_partial", partial.addr, "round", roundCache.round, "len_partials", fmt.Sprintf("%d/%d", roundCache.Len(), thr))
c.l.Debugw("", "store_partial", partial.addr, "round", roundCache.round, "len_partials", fmt.Sprintf("%d/%d", roundCache.Len(), thr))
if roundCache.Len() < thr {
break
}

msg := roundCache.Msg()
finalSig, err := key.Scheme.Recover(c.crypto.GetPub(), msg, roundCache.Partials(), thr, n)
if err != nil {
c.l.Debug("invalid_recovery", err, "round", pRound, "got", fmt.Sprintf("%d/%d", roundCache.Len(), n))
c.l.Debugw("", "invalid_recovery", err, "round", pRound, "got", fmt.Sprintf("%d/%d", roundCache.Len(), n))
break
}
if err := key.Scheme.VerifyRecovered(c.crypto.GetPub().Commit(), msg, finalSig); err != nil {
c.l.Error("invalid_sig", err, "round", pRound)
c.l.Errorw("", "invalid_sig", err, "round", pRound)
break
}
cache.FlushRounds(partial.p.GetRound())
Expand All @@ -149,21 +149,21 @@ func (c *chainStore) runAggregator() {
PreviousSig: roundCache.prev,
Signature: finalSig,
}
c.l.Info("aggregated_beacon", newBeacon.Round)
c.l.Infow("", "aggregated_beacon", newBeacon.Round)
if c.tryAppend(lastBeacon, newBeacon) {
lastBeacon = newBeacon
break
}
// XXX store them for lfutur usage if it's a later round than what
// we have
c.l.Debug("new_aggregated", "not_appendable", "last", lastBeacon.String(), "new", newBeacon.String())
c.l.Debugw("", "new_aggregated", "not_appendable", "last", lastBeacon.String(), "new", newBeacon.String())
if c.shouldSync(lastBeacon, newBeacon) {
peers := toPeers(c.crypto.GetGroup().Nodes)
go func() {
// XXX Could do something smarter with context and cancellation
// if we got to the right round
if err := c.sync.Follow(context.Background(), newBeacon.Round, peers); err != nil {
c.l.Debug("chain_store", "unable to follow", "err", err)
c.l.Debugw("", "chain_store", "unable to follow", "err", err)
}
}()
}
Expand All @@ -179,7 +179,7 @@ func (c *chainStore) tryAppend(last, newB *chain.Beacon) bool {
}
if err := c.CallbackStore.Put(newB); err != nil {
// if round is ok but bytes are different, error will be raised
c.l.Error("chain_store", "error storing beacon", "err", err)
c.l.Errorw("", "chain_store", "error storing beacon", "err", err)
return false
}
select {
Expand Down Expand Up @@ -207,7 +207,7 @@ func (c *chainStore) RunSync(ctx context.Context, upTo uint64, peers []net.Peer)
peers = toPeers(c.crypto.GetGroup().Nodes)
}
if err := c.sync.Follow(ctx, upTo, peers); err != nil {
c.l.Debug("chain_store", "follow_finished", "err", err)
c.l.Debugw("", "chain_store", "follow_finished", "err", err)
}
}

Expand Down
44 changes: 22 additions & 22 deletions chain/beacon/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ var errOutOfRound = "out-of-round beacon request"
// forwards it to the round manager if it is a valid beacon.
func (h *Handler) ProcessPartialBeacon(c context.Context, p *proto.PartialBeaconPacket) (*proto.Empty, error) {
addr := net.RemoteAddress(c)
h.l.Debug("received", "request", "from", addr, "round", p.GetRound())
h.l.Debugw("", "received", "request", "from", addr, "round", p.GetRound())

nextRound, _ := chain.NextRound(h.conf.Clock.Now().Unix(), h.conf.Group.Period, h.conf.Group.GenesisTime)
currentRound := nextRound - 1
Expand All @@ -106,7 +106,7 @@ func (h *Handler) ProcessPartialBeacon(c context.Context, p *proto.PartialBeacon
// possible, if a node receives a packet very fast just before his local
// clock passed to the next round
if p.GetRound() > nextRound {
h.l.Error("process_partial", addr, "invalid_future_round", p.GetRound(), "current_round", currentRound)
h.l.Errorw("", "process_partial", addr, "invalid_future_round", p.GetRound(), "current_round", currentRound)
return nil, fmt.Errorf("invalid round: %d instead of %d", p.GetRound(), currentRound)
}

Expand All @@ -116,21 +116,21 @@ func (h *Handler) ProcessPartialBeacon(c context.Context, p *proto.PartialBeacon
shortPub := h.crypto.GetPub().Eval(1).V.String()[14:19]
// verify if request is valid
if err := key.Scheme.VerifyPartial(h.crypto.GetPub(), msg, p.GetPartialSig()); err != nil {
h.l.Error("process_partial", addr, "err", err,
h.l.Errorw("", "process_partial", addr, "err", err,
"prev_sig", shortSigStr(p.GetPreviousSig()),
"curr_round", currentRound,
"msg_sign", shortSigStr(msg),
"short_pub", shortPub)
return nil, err
}
h.l.Debug("process_partial", addr,
h.l.Debugw("", "process_partial", addr,
"prev_sig", shortSigStr(p.GetPreviousSig()),
"curr_round", currentRound, "msg_sign",
shortSigStr(msg), "short_pub", shortPub,
"status", "OK")
idx, _ := key.Scheme.IndexOf(p.GetPartialSig())
if idx == h.crypto.Index() {
h.l.Error("process_partial", addr,
h.l.Errorw("", "process_partial", addr,
"index_got", idx,
"index_our", h.crypto.Index(),
"advance_packet", p.GetRound(),
Expand All @@ -156,7 +156,7 @@ func (h *Handler) Store() chain.Store {
// Round 1 starts at genesis time, and is signing over the genesis seed
func (h *Handler) Start() error {
if h.conf.Clock.Now().Unix() > h.conf.Group.GenesisTime {
h.l.Error("genesis_time", "past", "call", "catchup")
h.l.Errorw("", "genesis_time", "past", "call", "catchup")
return errors.New("beacon: genesis time already passed. Call Catchup()")
}

Expand All @@ -165,7 +165,7 @@ func (h *Handler) Start() error {
h.Unlock()

_, tTime := chain.NextRound(h.conf.Clock.Now().Unix(), h.conf.Group.Period, h.conf.Group.GenesisTime)
h.l.Info("beacon", "start")
h.l.Infow("", "beacon", "start")
go h.run(tTime)

return nil
Expand Down Expand Up @@ -195,7 +195,7 @@ func (h *Handler) Transition(prevGroup *key.Group) error {
tRound := chain.CurrentRound(targetTime, h.conf.Group.Period, h.conf.Group.GenesisTime)
tTime := chain.TimeOfRound(h.conf.Group.Period, h.conf.Group.GenesisTime, tRound)
if tTime != targetTime {
h.l.Fatal("transition_time", "invalid_offset", "expected_time", tTime, "got_time", targetTime)
h.l.Fatalw("", "transition_time", "invalid_offset", "expected_time", tTime, "got_time", targetTime)
return nil
}

Expand All @@ -206,7 +206,7 @@ func (h *Handler) Transition(prevGroup *key.Group) error {
go h.run(targetTime)

// we run the sync up until (inclusive) one round before the transition
h.l.Debug("new_node", "following chain", "to_round", tRound-1)
h.l.Debugw("", "new_node", "following chain", "to_round", tRound-1)
h.chain.RunSync(context.Background(), tRound-1, toPeers(prevGroup.Nodes))

return nil
Expand All @@ -218,10 +218,10 @@ func (h *Handler) TransitionNewGroup(newShare *key.Share, newGroup *key.Group) {
tRound := chain.CurrentRound(targetTime, h.conf.Group.Period, h.conf.Group.GenesisTime)
tTime := chain.TimeOfRound(h.conf.Group.Period, h.conf.Group.GenesisTime, tRound)
if tTime != targetTime {
h.l.Fatal("transition_time", "invalid_offset", "expected_time", tTime, "got_time", targetTime)
h.l.Fatalw("", "transition_time", "invalid_offset", "expected_time", tTime, "got_time", targetTime)
return
}
h.l.Debug("transition", "new_group", "at_round", tRound)
h.l.Debugw("", "transition", "new_group", "at_round", tRound)
// register a callback such that when the round happening just before the
// transition is stored, then it switches the current share to the new one
targetRound := tRound - 1
Expand Down Expand Up @@ -275,7 +275,7 @@ func (h *Handler) Reset() {
// run will wait until it is supposed to start
func (h *Handler) run(startTime int64) {
chanTick := h.ticker.ChannelAt(startTime)
h.l.Debug("run_round", "wait", "until", startTime)
h.l.Debugw("", "run_round", "wait", "until", startTime)

var current roundInfo
setRunnig := sync.Once{}
Expand All @@ -296,10 +296,10 @@ func (h *Handler) run(startTime int64) {

lastBeacon, err := h.chain.Last()
if err != nil {
h.l.Error("beacon_loop", "loading_last", "err", err)
h.l.Errorw("", "beacon_loop", "loading_last", "err", err)
break
}
h.l.Debug("beacon_loop", "new_round", "round", current.round, "lastbeacon", lastBeacon.Round)
h.l.Debugw("", "beacon_loop", "new_round", "round", current.round, "lastbeacon", lastBeacon.Round)
h.broadcastNextPartial(current, lastBeacon)
// if the next round of the last beacon we generated is not the round we
// are now, that means there is a gap between the two rounds. In other
Expand All @@ -312,7 +312,7 @@ func (h *Handler) run(startTime int64) {
// network conditions allow for it.
// XXX find a way to start the catchup as soon as the runsync is
// done. Not critical but leads to faster network recovery.
h.l.Debug("beacon_loop", "run_sync_catchup", "last_is", lastBeacon, "should_be", current.round)
h.l.Debugw("", "beacon_loop", "run_sync_catchup", "last_is", lastBeacon, "should_be", current.round)
go h.chain.RunSync(context.Background(), current.round, nil)
}
case b := <-h.chain.AppendedBeaconNoSync():
Expand All @@ -332,7 +332,7 @@ func (h *Handler) run(startTime int64) {
}(current, b)
}
case <-h.close:
h.l.Debug("beacon_loop", "finished")
h.l.Debugw("", "beacon_loop", "finished")
return
}
}
Expand All @@ -357,7 +357,7 @@ func (h *Handler) broadcastNextPartial(current roundInfo, upon *chain.Beacon) {
h.l.Fatal("beacon_round", "err creating signature", "err", err, "round", round)
return
}
h.l.Debug("broadcast_partial", round, "from_prev_sig", shortSigStr(previousSig), "msg_sign", shortSigStr(msg))
h.l.Debugw("", "broadcast_partial", round, "from_prev_sig", shortSigStr(previousSig), "msg_sign", shortSigStr(msg))
metadata := common.NewMetadata(h.version.ToProto())
packet := &proto.PartialBeaconPacket{
Round: round,
Expand All @@ -371,12 +371,12 @@ func (h *Handler) broadcastNextPartial(current roundInfo, upon *chain.Beacon) {
continue
}
go func(i *key.Identity) {
h.l.Debug("beacon_round", round, "send_to", i.Address())
h.l.Debugw("", "beacon_round", round, "send_to", i.Address())
err := h.client.PartialBeacon(ctx, i, packet)
if err != nil {
h.l.Error("beacon_round", round, "err_request", err, "from", i.Address())
h.l.Errorw("", "beacon_round", round, "err_request", err, "from", i.Address())
if strings.Contains(err.Error(), errOutOfRound) {
h.l.Error("beacon_round", round, "node", i.Addr, "reply", "out-of-round")
h.l.Errorw("", "beacon_round", round, "node", i.Addr, "reply", "out-of-round")
}
return
}
Expand All @@ -399,7 +399,7 @@ func (h *Handler) Stop() {

h.stopped = true
h.running = false
h.l.Info("beacon", "stop")
h.l.Infow("", "beacon", "stop")
}

// StopAt will stop the handler at the given time. It is useful when
Expand All @@ -411,7 +411,7 @@ func (h *Handler) StopAt(stopTime int64) error {
return errors.New("can't stop in the past or present")
}
duration := time.Duration(stopTime-now) * time.Second
h.l.Debug("stop_at", stopTime, "sleep_for", duration.Seconds())
h.l.Debugw("", "stop_at", stopTime, "sleep_for", duration.Seconds())
h.conf.Clock.Sleep(duration)
h.Stop()
return nil
Expand Down
2 changes: 1 addition & 1 deletion chain/beacon/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (d *discrepancyStore) Put(b *chain.Beacon) error {
metrics.LastBeaconRound.Set(float64(b.GetRound()))
metrics.GroupSize.Set(float64(d.group.Len()))
metrics.GroupThreshold.Set(float64(d.group.Threshold))
d.l.Info("NEW_BEACON_STORED", b.String(), "time_discrepancy_ms", discrepancy)
d.l.Infow("", "NEW_BEACON_STORED", b.String(), "time_discrepancy_ms", discrepancy)
return nil
}

Expand Down
22 changes: 11 additions & 11 deletions chain/beacon/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *syncer) Follow(c context.Context, upTo uint64, nodes []net.Peer) error
s.Unlock()
}()

s.l.Debug("syncer", "starting", "up_to", upTo, "nodes", peersToString(nodes))
s.l.Debugw("", "syncer", "starting", "up_to", upTo, "nodes", peersToString(nodes))

// shuffle through the nodes
for _, n := range rand.Perm(len(nodes)) {
Expand All @@ -91,36 +91,36 @@ func (s *syncer) tryNode(global context.Context, upTo uint64, n net.Peer) bool {
FromRound: last.Round + 1,
})
if err != nil {
s.l.Debug("syncer", "unable_to_sync", "with_peer", n.Address(), "err", err)
s.l.Debugw("", "syncer", "unable_to_sync", "with_peer", n.Address(), "err", err)
return false
}

s.l.Debug("syncer", "start_follow", "with_peer", n.Address(), "from_round", last.Round+1)
s.l.Debugw("", "syncer", "start_follow", "with_peer", n.Address(), "from_round", last.Round+1)

for beaconPacket := range beaconCh {
s.l.Debug("syncer", "new_beacon_fetched", "with_peer", n.Address(), "from_round", last.Round+1, "got_round", beaconPacket.GetRound())
s.l.Debugw("", "syncer", "new_beacon_fetched", "with_peer", n.Address(), "from_round", last.Round+1, "got_round", beaconPacket.GetRound())
beacon := protoToBeacon(beaconPacket)

// verify the signature validity
if err := chain.VerifyBeacon(s.info.PublicKey, beacon); err != nil {
s.l.Debug("syncer", "invalid_beacon", "with_peer", n.Address(), "round", beacon.Round, "err", err, fmt.Sprintf("%+v", beacon))
s.l.Debugw("", "syncer", "invalid_beacon", "with_peer", n.Address(), "round", beacon.Round, "err", err, fmt.Sprintf("%+v", beacon))
return false
}

if err := s.store.Put(beacon); err != nil {
s.l.Debug("syncer", "unable to save", "with_peer", n.Address(), "err", err)
s.l.Debugw("", "syncer", "unable to save", "with_peer", n.Address(), "err", err)
return false
}
last = beacon
if last.Round == upTo {
s.l.Debug("syncer", "syncing finished to", "round", upTo)
s.l.Debugw("", "syncer", "syncing finished to", "round", upTo)
return true
}
}
// see if this was a cancellation from the call itself
select {
case <-global.Done():
s.l.Debug("syncer", "follow canceled", "err?", global.Err())
s.l.Debugw("", "syncer", "follow canceled", "err?", global.Err())
if global.Err() == nil {
return true
}
Expand All @@ -133,7 +133,7 @@ func (s *syncer) tryNode(global context.Context, upTo uint64, n net.Peer) bool {
func (s *syncer) SyncChain(req *proto.SyncRequest, stream proto.Protocol_SyncChainServer) error {
fromRound := req.GetFromRound()
addr := net.RemoteAddress(stream.Context())
s.l.Debug("syncer", "sync_request", "from", addr, "from_round", fromRound)
s.l.Debugw("", "syncer", "sync_request", "from", addr, "from_round", fromRound)

last, err := s.store.Last()
if err != nil {
Expand All @@ -149,7 +149,7 @@ func (s *syncer) SyncChain(req *proto.SyncRequest, stream proto.Protocol_SyncCha
s.store.Cursor(func(c chain.Cursor) {
for bb := c.Seek(fromRound); bb != nil; bb = c.Next() {
if err = stream.Send(beaconToProto(bb)); err != nil {
s.l.Debug("syncer", "streaming_send", "err", err)
s.l.Debugw("", "syncer", "streaming_send", "err", err)
return
}
}
Expand All @@ -163,7 +163,7 @@ func (s *syncer) SyncChain(req *proto.SyncRequest, stream proto.Protocol_SyncCha
s.store.AddCallback(addr, func(b *chain.Beacon) {
err := stream.Send(beaconToProto(b))
if err != nil {
s.l.Debug("syncer", "streaming_send", "err", err)
s.l.Debugw("", "syncer", "streaming_send", "err", err)
done <- nil
}
})
Expand Down
2 changes: 1 addition & 1 deletion chain/boltdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (b *boltStore) Len() int {

func (b *boltStore) Close() {
if err := b.db.Close(); err != nil {
log.DefaultLogger().Debug("boltdb", "close", "err", err)
log.DefaultLogger().Debugw("", "boltdb", "close", "err", err)
}
}

Expand Down
4 changes: 2 additions & 2 deletions client/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (c *watchAggregator) startAutoWatch(full bool) {
select {
case _, ok := <-results:
if !ok {
c.log.Info("watch_aggregator", "auto watch ended")
c.log.Infow("", "watch_aggregator", "auto watch ended")
break LOOP
}
case <-ctx.Done():
Expand All @@ -108,7 +108,7 @@ func (c *watchAggregator) startAutoWatch(full bool) {
case <-ctx.Done():
t.Stop()
}
c.log.Info("watch_aggregator", "retrying auto watch")
c.log.Infow("", "watch_aggregator", "retrying auto watch")
}
}()
}
Expand Down
Loading

0 comments on commit ea4ad2f

Please sign in to comment.