Skip to content

Commit

Permalink
[Cleanup] Avoid leaking deferred calls (#931)
Browse files Browse the repository at this point in the history
* [Cleanup] Avoid leaking deferred calls
* Adding fake clock for discrepancies in tests
  • Loading branch information
AnomalRoil committed Mar 21, 2022
1 parent c4a6e2d commit 2700ed5
Show file tree
Hide file tree
Showing 15 changed files with 87 additions and 59 deletions.
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
issues:
# Let us display all issues of one type at once
max-same-issues: 0
# Excluding configuration per-path, per-linter, per-text and per-source
exclude-rules:
- path: _test\.go
Expand Down
2 changes: 1 addition & 1 deletion chain/beacon/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func newChainStore(l log.Logger, cf *Config, cl net.ProtocolClient, c *cryptoSto
ss := NewSchemeStore(as, cf.Group.Scheme)

// we write some stats about the timing when new beacon is saved
ds := newDiscrepancyStore(ss, l, c.GetGroup())
ds := newDiscrepancyStore(ss, l, c.GetGroup(), cf.Clock)

// we can register callbacks on it
cbs := NewCallbackStore(ds)
Expand Down
2 changes: 1 addition & 1 deletion chain/beacon/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func NewBeaconTest(t *testing.T, n, thr int, period time.Duration, genesisTime i

for i := 0; i < n; i++ {
bt.CreateNode(t, i)
t.Logf("Creating node %d/%d", i, n)
t.Logf("Creating node %d/%d", i+1, n)
}
return bt
}
Expand Down
8 changes: 6 additions & 2 deletions chain/beacon/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"
"time"

clock "github.com/jonboulle/clockwork"

"github.com/drand/drand/common/scheme"

"github.com/drand/drand/chain"
Expand Down Expand Up @@ -95,21 +97,23 @@ type discrepancyStore struct {
chain.Store
l log.Logger
group *key.Group
clock clock.Clock
}

func newDiscrepancyStore(s chain.Store, l log.Logger, group *key.Group) chain.Store {
func newDiscrepancyStore(s chain.Store, l log.Logger, group *key.Group, cl clock.Clock) chain.Store {
return &discrepancyStore{
Store: s,
l: l,
group: group,
clock: cl,
}
}

func (d *discrepancyStore) Put(b *chain.Beacon) error {
if err := d.Store.Put(b); err != nil {
return err
}
actual := time.Now().UnixNano()
actual := d.clock.Now().UnixNano()
beaconID := d.group.ID
expected := chain.TimeOfRound(d.group.Period, d.group.GenesisTime, b.Round) * 1e9
discrepancy := float64(actual-expected) / float64(time.Millisecond)
Expand Down
2 changes: 1 addition & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func TestClientWithWatcherCtorError(t *testing.T) {
client.WithChainInfo(fakeChainInfo()),
client.WithWatcher(watcherCtor),
)
if err != watcherErr {
if !errors.Is(err, watcherErr) {
t.Fatal(err)
}
}
Expand Down
11 changes: 4 additions & 7 deletions client/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func ForURLs(urls []string, chainHash []byte) []client.Client {
func Ping(ctx context.Context, root string) error {
url := fmt.Sprintf("%s/health", root)

ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithTimeout(ctx, maxTimeoutHTTPRequest)
defer cancel()

req, err := nhttp.NewRequestWithContext(ctx, "GET", url, nhttp.NoBody)
Expand Down Expand Up @@ -174,14 +174,11 @@ func instrumentClient(url string, transport nhttp.RoundTripper) *nhttp.Client {
return &hc
}

func IsServerReady(addr string) error {
func IsServerReady(addr string) (er error) {
counter := 0

for {
ctx, cancel := context.WithTimeout(context.Background(), maxTimeoutHTTPRequest)
defer cancel()

err := Ping(ctx, "http://"+addr)
// Ping is wrapping its context with a Timeout on maxTimeoutHTTPRequest anyway.
err := Ping(context.Background(), "http://"+addr)
if err == nil {
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion client/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package http

import (
"context"
"errors"
"net/http"
"sync"
"testing"
Expand Down Expand Up @@ -172,7 +173,7 @@ func TestHTTPClientClose(t *testing.T) {
}

_, err = httpClient.Get(context.Background(), 0)
if err != errClientClosed {
if !errors.Is(err, errClientClosed) {
t.Fatal("unexpected error from closed client", err)
}

Expand Down
6 changes: 3 additions & 3 deletions client/optimizing.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ LOOP:
}
stats = append(stats, rr.stat)
res = rr.result
if rr.err != errEmptyClientUnsupportedGet && rr.err != nil {
if rr.err != nil && !errors.Is(rr.err, errEmptyClientUnsupportedGet) {
err = fmt.Errorf("%v - %w", err, rr.err)
} else if rr.err == nil {
err = nil
Expand All @@ -270,7 +270,7 @@ func get(ctx context.Context, client Client, round uint64) *requestResult {
var stat requestStat

// client failure, set a large RTT so it is sent to the back of the list
if err != nil && err != ctx.Err() {
if err != nil && !errors.Is(err, ctx.Err()) {
stat = requestStat{client, math.MaxInt64, start}
return &requestResult{client, res, err, &stat}
}
Expand Down Expand Up @@ -606,7 +606,7 @@ CLIENT_LOOP:
func (oc *optimizingClient) Info(ctx context.Context) (chainInfo *chain.Info, err error) {
clients := oc.fastestClients()
for _, c := range clients {
ctx, cancel := context.WithTimeout(context.Background(), oc.requestTimeout)
ctx, cancel := context.WithTimeout(ctx, oc.requestTimeout)
chainInfo, err = c.Info(ctx)
cancel()
if err == nil {
Expand Down
51 changes: 30 additions & 21 deletions cmd/drand-cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,35 +848,44 @@ func deleteBeaconCmd(c *cli.Context) error {
return err
}

var er error
for beaconID, storePath := range stores {
store, err := boltdb.NewBoltStore(path.Join(storePath, core.DefaultDBFolder), conf.BoltOptions())
if err != nil {
return fmt.Errorf("beacon id [%s] - invalid bolt store creation: %s", beaconID, err)
}
defer store.Close()

lastBeacon, err := store.Last()
if err != nil {
return fmt.Errorf("beacon id [%s] - can't fetch last beacon: %s", beaconID, err)
}
if startRound > lastBeacon.Round {
return fmt.Errorf("beacon id [%s] - given round is ahead of the chain: %d", beaconID, lastBeacon.Round)
}
if c.IsSet(verboseFlag.Name) {
fmt.Printf("beacon id [%s] - planning to delete %d beacons \n", beaconID, (lastBeacon.Round - startRound))
if er != nil {
return er
}
// Using an anonymous function to not leak the defer
er = func() error {
store, err := boltdb.NewBoltStore(path.Join(storePath, core.DefaultDBFolder), conf.BoltOptions())
if err != nil {
return fmt.Errorf("beacon id [%s] - invalid bolt store creation: %s", beaconID, err)
}
defer store.Close()

for round := startRound; round <= lastBeacon.Round; round++ {
err := store.Del(round)
lastBeacon, err := store.Last()
if err != nil {
return fmt.Errorf("beacon id [%s] - error deleting round %d: %s", beaconID, round, err)
return fmt.Errorf("beacon id [%s] - can't fetch last beacon: %s", beaconID, err)
}
if startRound > lastBeacon.Round {
return fmt.Errorf("beacon id [%s] - given round is ahead of the chain: %d", beaconID, lastBeacon.Round)
}
if c.IsSet(verboseFlag.Name) {
fmt.Printf("beacon id [%s] - deleted beacon round %d \n", beaconID, round)
fmt.Printf("beacon id [%s] - planning to delete %d beacons \n", beaconID, (lastBeacon.Round - startRound))
}
}

for round := startRound; round <= lastBeacon.Round; round++ {
err := store.Del(round)
if err != nil {
return fmt.Errorf("beacon id [%s] - error deleting round %d: %s", beaconID, round, err)
}
if c.IsSet(verboseFlag.Name) {
fmt.Printf("beacon id [%s] - deleted beacon round %d \n", beaconID, round)
}
}
return nil
}()
}
return nil

return err
}

func toArray(flags ...cli.Flag) []cli.Flag {
Expand Down
23 changes: 13 additions & 10 deletions core/drand_beacon_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,16 +827,19 @@ func (bp *BeaconProcess) Status(c context.Context, in *drand.StatusRequest) (*dr
}
// TODO check if TLS or not
p := net.CreatePeer(addr.GetAddress(), addr.GetTls())
// Simply try to ping him see if he replies
tc, cancel := context.WithTimeout(c, callMaxTimeout)
defer cancel()
_, err := bp.privGateway.Home(tc, p, &drand.HomeRequest{})
if err != nil {
bp.log.Debugw("Status asked remote", addr, " FAIL", err)
resp[addr.GetAddress()] = false
} else {
resp[addr.GetAddress()] = true
}
// we use an anonymous function to not leak the defer in the for loop
func() {
// Simply try to ping him see if he replies
tc, cancel := context.WithTimeout(c, callMaxTimeout)
defer cancel()
_, err := bp.privGateway.Home(tc, p, &drand.HomeRequest{})
if err != nil {
bp.log.Debugw("Status asked remote", addr, " FAIL", err)
resp[addr.GetAddress()] = false
} else {
resp[addr.GetAddress()] = true
}
}()
}
packet := &drand.StatusResponse{
Dkg: &dkgStatus,
Expand Down
23 changes: 15 additions & 8 deletions core/drand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ import (
"github.com/stretchr/testify/require"
)

func setFDLimit() {
fdOpen := 2000
_, max, err := unixGetLimit()
func setFDLimit(t *testing.T) {
fdOpen := uint64(3000)
curr, max, err := unixGetLimit()
if err != nil {
panic(err)
t.Fatal(err)
}
if err := unixSetLimit(uint64(fdOpen), max); err != nil {
panic(err)
if fdOpen <= curr {
t.Logf("Current limit is larger (%d) than ours (%d); not changing it.\n", curr, fdOpen)
return
} else if err := unixSetLimit(fdOpen, max); err != nil {
t.Fatal(err)
}
}

Expand Down Expand Up @@ -61,7 +64,7 @@ func TestRunDKGLarge(t *testing.T) {
t.Skip("skipping test in short mode.")
}

setFDLimit()
setFDLimit(t)

n := 22
expectedBeaconPeriod := 5 * time.Second
Expand Down Expand Up @@ -875,7 +878,11 @@ func TestDrandPublicStreamProxy(t *testing.T) {

root := dt.nodes[0]
dt.SetMockClock(t, group.GenesisTime)
dt.WaitUntilChainIsServing(t, dt.nodes[0])
err := dt.WaitUntilChainIsServing(t, dt.nodes[0])
if err != nil {
t.Log("Error waiting until chain is serving:", err)
t.Fail()
}

// do a few periods
for i := 0; i < 3; i++ {
Expand Down
7 changes: 5 additions & 2 deletions core/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,13 +433,16 @@ func (d *DrandTestScenario) SetMockClock(t *testing.T, targetUnixTime int64) {

// AdvanceMockClock advances the clock of all drand by the given duration
func (d *DrandTestScenario) AdvanceMockClock(t *testing.T, p time.Duration) {
t.Log("Advancing time by", p, "from", d.clock.Now().Unix())
for _, node := range d.nodes {
node.clock.Advance(p)
}
for _, node := range d.newNodes {
node.clock.Advance(p)
}
d.clock.Advance(p)
// we sleep to make sure everyone has the time to get the new time before continuing
time.Sleep(10 * time.Millisecond)
}

// CheckBeaconLength looks if the beacon chain on the given addresses is of the
Expand Down Expand Up @@ -515,7 +518,7 @@ func (d *DrandTestScenario) WaitUntilRound(t *testing.T, node *MockNode, round u
}

t.Logf("node %s is on %d round (vs expected %d), waiting some time to ask again...", node.addr, status.ChainStore.LastRound, round)
time.Sleep(1000 * time.Millisecond)
time.Sleep(d.period)
}
}

Expand All @@ -530,7 +533,7 @@ func (d *DrandTestScenario) WaitUntilChainIsServing(t *testing.T, node *MockNode
require.NoError(t, err)

if status.Beacon.IsServing {
t.Logf("node %s has its beacon chain running", node.addr)
t.Logf("node %s has its beacon chain running on round %d", node.addr, status.ChainStore.LastRound)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion lp2p/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func NewWithPubsub(ps *pubsub.PubSub, info *chain.Info, cache client.Cache) (*Cl
type UnsubFunc func()

// Sub subscribes to notfications about new randomness.
// Client instnace owns the channel after it is passed to Sub function,
// Client instance owns the channel after it is passed to Sub function,
// thus the channel should not be closed by library user
//
// It is recommended to use a buffered channel. If the channel is full,
Expand Down
3 changes: 2 additions & 1 deletion net/client_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package net
import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -122,7 +123,7 @@ func (g *grpcClient) PublicRandStream(
go func() {
for {
resp, err := stream.Recv()
if err == io.EOF {
if errors.Is(err, io.EOF) {
close(outCh)
return
}
Expand Down
1 change: 1 addition & 0 deletions net/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (g *ControlListener) Start() {
// Stop the listener and connections
func (g *ControlListener) Stop() {
g.conns.Stop()
g.lis.Close()
}

// ControlClient is a struct that implement control.ControlClient and is used to
Expand Down

0 comments on commit 2700ed5

Please sign in to comment.