Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(dot/network): use sync.Pool for network message buffers #1600

Merged
merged 23 commits into from
May 21, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
5a38145
update README.md
noot May 13, 2021
117ebb8
update docs
noot May 13, 2021
2fca3d8
Merge branch 'development' into noot/update-readme
noot May 13, 2021
09e8ca9
Merge branch 'development' into noot/update-readme
dutterbutter May 14, 2021
328dcd6
Merge branch 'development' of github.com:ChainSafe/gossamer into deve…
noot May 18, 2021
040cdc4
use sync.Pool for msg buffers
noot May 18, 2021
3f5a7b6
cache preallocated bufs in *Service to prevent GC
noot May 19, 2021
4eceebf
cleanup and lint, turn off preallocations for tests
noot May 19, 2021
0680d7c
Merge branch 'development' of github.com:ChainSafe/gossamer into noot…
noot May 19, 2021
1ca19c6
remove hsBufPool as it's not needed
noot May 19, 2021
4d747f2
cleanup usage of message buffer getters/defer funcs
noot May 20, 2021
e648cf1
cleanup
noot May 20, 2021
e7abd0f
Merge branch 'development' into noot/network-pool
arijitAD May 20, 2021
19034b8
update network to use sizedBufferPool
noot May 20, 2021
0c71acd
Merge branch 'noot/network-pool' of github.com:ChainSafe/gossamer int…
noot May 20, 2021
823b5f8
pass reference to buf in pool.put
noot May 20, 2021
66ebd2b
restore file
noot May 20, 2021
5c33f4a
Merge branch 'development' into noot/network-pool
noot May 20, 2021
2955598
Merge branch 'development' into noot/network-pool
noot May 21, 2021
5634551
Merge branch 'development' into noot/network-pool
noot May 21, 2021
ecfa3c7
increase pool size to be min/max * 4
noot May 21, 2021
e666056
Merge branch 'noot/network-pool' of github.com:ChainSafe/gossamer int…
noot May 21, 2021
a4199e9
address comments
noot May 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions dot/network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ type Config struct {

// telemetryInterval how often to send telemetry metrics
telemetryInterval time.Duration

noPreAllocate bool // internal option
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename it to preAllocate
Because this is difficult to read

if !cfg.noPreAllocate {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want to pre-allocate by default so if this is changed to preAllocate then it will be false by default, but we won't know if it was set to false or if it defaulted to false, hence why it's noPreAllocate

}

// build checks the configuration, sets up the private key for the network service,
Expand Down
12 changes: 7 additions & 5 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

var errCannotValidateHandshake = errors.New("failed to validate handshake")

var maxHandshakeSize = unsafe.Sizeof(BlockAnnounceHandshake{}) //nolint
const maxHandshakeSize = unsafe.Sizeof(BlockAnnounceHandshake{}) //nolint

// Handshake is the interface all handshakes for notifications protocols must implement
type Handshake interface {
Expand Down Expand Up @@ -226,7 +226,7 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc
return
}

hs, err := readHandshake(stream, decodeBlockAnnounceHandshake)
hs, err := s.readHandshake(stream, decodeBlockAnnounceHandshake)
if err != nil {
logger.Trace("failed to read handshake", "protocol", info.protocolID, "peer", peer, "error", err)
_ = stream.Close()
Expand Down Expand Up @@ -294,9 +294,11 @@ func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer
}
}

func readHandshake(stream libp2pnetwork.Stream, decoder HandshakeDecoder) (Handshake, error) {
msgBytes := make([]byte, maxHandshakeSize)
tot, err := readStream(stream, msgBytes)
func (s *Service) readHandshake(stream libp2pnetwork.Stream, decoder HandshakeDecoder) (Handshake, error) {
msgBytes := s.getMessageBuffer()
defer s.cleanupMessageBuffer(msgBytes)

tot, err := readStream(stream, msgBytes[:])
if err != nil {
return nil, err
}
Expand Down
68 changes: 62 additions & 6 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const (
lightID = "/light/2"
blockAnnounceID = "/block-announces/1"
transactionsID = "/transactions/1"

maxMessageSize = maxBlockResponseSize
)

var (
Expand All @@ -72,6 +74,9 @@ type Service struct {
mdns *mdns
gossip *gossip
syncQueue *syncQueue
bufPool *sync.Pool
bufMap *sync.Map //map[*[maxMessageSize]byte]struct{}
bufCount int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use atomic counter instead. This might lead to race condition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also use channels instead of bufMap for the keeping the reference of the buffer.
Using channel you can easily get it's length so need of bufCount

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed in favour of sizedBufferPool


notificationsProtocols map[byte]*notificationsProtocol // map of sub-protocol msg ID to protocol info
notificationsMu sync.RWMutex
Expand Down Expand Up @@ -132,6 +137,30 @@ func NewService(cfg *Config) (*Service, error) {
return nil, err
}

// pre-allocate pool of buffers used to read from streams.
// initially allocate as many buffers as minimally necessary which is the number inbound streams we will have,
// which should equal min peers times the number of notifications protocols, which is currently 3.
//
// the purpose of the bufMap is to keep the reference to the buffers inside the *Service
// at all times to prevent the buffers from being garbage collected.
// with this addition, they are only garbage collected once the *Service is no longer
// used, which is when the node shuts down.
bufPool := new(sync.Pool)
bufPool.New = func() interface{} {
return [maxMessageSize]byte{}
}
bufMap := new(sync.Map)

var bufCount int
if !cfg.noPreAllocate {
Copy link
Contributor

@arijitAD arijitAD May 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you switch if with else it will be cleaner.

if cfg.noPreAllocate {
    bufPool = newSizedBufferPool(cfg.MinPeers*3, cfg.MaxPeers*3)
} else {
    bufPool = &sizedBufferPool{
	c: make(chan *[maxMessageSize]byte, cfg.MaxPeers*3),
    }
}

for i := 0; i < cfg.MinPeers*3; i++ {
buf := [maxBlockResponseSize]byte{}
bufPool.Put(buf) //nolint
bufMap.Store(&buf, struct{}{})
}
bufCount = cfg.MinPeers * 3
}

network := &Service{
ctx: ctx,
cancel: cancel,
Expand All @@ -148,6 +177,9 @@ func NewService(cfg *Config) (*Service, error) {
lightRequest: make(map[peer.ID]struct{}),
telemetryInterval: cfg.telemetryInterval,
closeCh: make(chan interface{}),
bufPool: bufPool,
bufMap: bufMap,
bufCount: bufCount,
}

network.syncQueue = newSyncQueue(network)
Expand Down Expand Up @@ -550,15 +582,39 @@ func isInbound(stream libp2pnetwork.Stream) bool {
return stream.Stat().Direction == libp2pnetwork.DirInbound
}

func (s *Service) getMessageBuffer() [maxMessageSize]byte {
buf := s.bufPool.Get()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can directly do this.

return s.bufPool.Get().([maxMessageSize]byte)

Sync.Pool will never return nil since you have defined New for it and since we are using the sync pool for specifically storing [maxMessageSize]byte. Interface conversion will never panic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed in favour of sizedBufferPool

if buf == nil {
return [maxMessageSize]byte{}
}

msgBytes, ok := buf.([maxMessageSize]byte)
if !ok {
return [maxMessageSize]byte{}
}

return msgBytes
}

func (s *Service) cleanupMessageBuffer(buf [maxMessageSize]byte) {
s.bufPool.Put(buf) //nolint

logger.Trace("message buffers allocated", "count", s.bufCount)
if s.bufCount >= s.cfg.MaxPeers*3 {
return
}

s.bufMap.Store(&buf, struct{}{})
s.bufCount++
}

func (s *Service) readStream(stream libp2pnetwork.Stream, decoder messageDecoder, handler messageHandler) {
var (
maxMessageSize uint64 = maxBlockResponseSize // TODO: determine actual max message size
msgBytes = make([]byte, maxMessageSize)
peer = stream.Conn().RemotePeer()
)
peer := stream.Conn().RemotePeer()
msgBytes := s.getMessageBuffer()
defer s.cleanupMessageBuffer(msgBytes)

for {
tot, err := readStream(stream, msgBytes)
tot, err := readStream(stream, msgBytes[:])
if err == io.EOF {
continue
} else if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions dot/network/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) {
cfg.Syncer = newMockSyncer()
}

cfg.noPreAllocate = true

srvc, err := NewService(cfg)
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s *Service) handleSyncMessage(stream libp2pnetwork.Stream, msg Message) er
return nil
}

var (
const (
blockRequestSize uint32 = 128
blockRequestBufferSize int = 6
blockResponseBufferSize int = 6
Expand Down