Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(dot/network): use sync.Pool for network message buffers #1600

Merged
merged 23 commits into from
May 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
5a38145
update README.md
noot May 13, 2021
117ebb8
update docs
noot May 13, 2021
2fca3d8
Merge branch 'development' into noot/update-readme
noot May 13, 2021
09e8ca9
Merge branch 'development' into noot/update-readme
dutterbutter May 14, 2021
328dcd6
Merge branch 'development' of github.com:ChainSafe/gossamer into deve…
noot May 18, 2021
040cdc4
use sync.Pool for msg buffers
noot May 18, 2021
3f5a7b6
cache preallocated bufs in *Service to prevent GC
noot May 19, 2021
4eceebf
cleanup and lint, turn off preallocations for tests
noot May 19, 2021
0680d7c
Merge branch 'development' of github.com:ChainSafe/gossamer into noot…
noot May 19, 2021
1ca19c6
remove hsBufPool as it's not needed
noot May 19, 2021
4d747f2
cleanup usage of message buffer getters/defer funcs
noot May 20, 2021
e648cf1
cleanup
noot May 20, 2021
e7abd0f
Merge branch 'development' into noot/network-pool
arijitAD May 20, 2021
19034b8
update network to use sizedBufferPool
noot May 20, 2021
0c71acd
Merge branch 'noot/network-pool' of github.com:ChainSafe/gossamer int…
noot May 20, 2021
823b5f8
pass reference to buf in pool.put
noot May 20, 2021
66ebd2b
restore file
noot May 20, 2021
5c33f4a
Merge branch 'development' into noot/network-pool
noot May 20, 2021
2955598
Merge branch 'development' into noot/network-pool
noot May 21, 2021
5634551
Merge branch 'development' into noot/network-pool
noot May 21, 2021
ecfa3c7
increase pool size to be min/max * 4
noot May 21, 2021
e666056
Merge branch 'noot/network-pool' of github.com:ChainSafe/gossamer int…
noot May 21, 2021
a4199e9
address comments
noot May 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions dot/network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ type Config struct {

// telemetryInterval how often to send telemetry metrics
telemetryInterval time.Duration

noPreAllocate bool // internal option
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename it to preAllocate
Because this is difficult to read

if !cfg.noPreAllocate {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want to pre-allocate by default so if this is changed to preAllocate then it will be false by default, but we won't know if it was set to false or if it defaulted to false, hence why it's noPreAllocate

}

// build checks the configuration, sets up the private key for the network service,
Expand Down
12 changes: 7 additions & 5 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

var errCannotValidateHandshake = errors.New("failed to validate handshake")

var maxHandshakeSize = unsafe.Sizeof(BlockAnnounceHandshake{}) //nolint
const maxHandshakeSize = unsafe.Sizeof(BlockAnnounceHandshake{}) //nolint

// Handshake is the interface all handshakes for notifications protocols must implement
type Handshake interface {
Expand Down Expand Up @@ -226,7 +226,7 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc
return
}

hs, err := readHandshake(stream, decodeBlockAnnounceHandshake)
hs, err := s.readHandshake(stream, decodeBlockAnnounceHandshake)
if err != nil {
logger.Trace("failed to read handshake", "protocol", info.protocolID, "peer", peer, "error", err)
_ = stream.Close()
Expand Down Expand Up @@ -294,9 +294,11 @@ func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer
}
}

func readHandshake(stream libp2pnetwork.Stream, decoder HandshakeDecoder) (Handshake, error) {
msgBytes := make([]byte, maxHandshakeSize)
tot, err := readStream(stream, msgBytes)
func (s *Service) readHandshake(stream libp2pnetwork.Stream, decoder HandshakeDecoder) (Handshake, error) {
msgBytes := s.bufPool.get()
defer s.bufPool.put(&msgBytes)

tot, err := readStream(stream, msgBytes[:])
if err != nil {
return nil, err
}
Expand Down
57 changes: 57 additions & 0 deletions dot/network/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2019 ChainSafe Systems (ON) Corp.
// This file is part of gossamer.
//
// The gossamer library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The gossamer library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the gossamer library. If not, see <http://www.gnu.org/licenses/>.

package network

// sizedBufferPool is a pool of buffers used for reading from streams
type sizedBufferPool struct {
c chan *[maxMessageSize]byte
}

func newSizedBufferPool(min, max int) (bp *sizedBufferPool) {
bufferCh := make(chan *[maxMessageSize]byte, max)

for i := 0; i < min; i++ {
buf := [maxMessageSize]byte{}
bufferCh <- &buf
}

return &sizedBufferPool{
c: bufferCh,
}
}

// get gets a buffer from the sizedBufferPool, or creates a new one if none are
// available in the pool. Buffers have a pre-allocated capacity.
func (bp *sizedBufferPool) get() [maxMessageSize]byte {
var buff *[maxMessageSize]byte
select {
case buff = <-bp.c:
// reuse existing buffer
default:
// create new buffer
buff = &[maxMessageSize]byte{}
}
return *buff
}

// put returns the given buffer to the sizedBufferPool.
func (bp *sizedBufferPool) put(b *[maxMessageSize]byte) {
select {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we clear the buffer when it comes back to the pool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no it doesn't make a difference since the buffer contents will be overwritten when we read into it

case bp.c <- b:
default: // Discard the buffer if the pool is full.
}
}
26 changes: 20 additions & 6 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
lightID = "/light/2"
blockAnnounceID = "/block-announces/1"
transactionsID = "/transactions/1"

maxMessageSize = 1024 * 1024 // 1mb for now
)

var (
Expand All @@ -70,6 +72,7 @@ type Service struct {
mdns *mdns
gossip *gossip
syncQueue *syncQueue
bufPool *sizedBufferPool

notificationsProtocols map[byte]*notificationsProtocol // map of sub-protocol msg ID to protocol info
notificationsMu sync.RWMutex
Expand Down Expand Up @@ -130,6 +133,18 @@ func NewService(cfg *Config) (*Service, error) {
return nil, err
}

// pre-allocate pool of buffers used to read from streams.
// initially allocate as many buffers as liekly necessary which is the number inbound streams we will have,
// which should equal average number of peers times the number of notifications protocols, which is currently 3.
var bufPool *sizedBufferPool
if cfg.noPreAllocate {
bufPool = &sizedBufferPool{
c: make(chan *[maxMessageSize]byte, cfg.MaxPeers*3),
}
} else {
bufPool = newSizedBufferPool((cfg.MaxPeers-cfg.MinPeers)*3/2, (cfg.MaxPeers+1)*3)
}

network := &Service{
ctx: ctx,
cancel: cancel,
Expand All @@ -146,6 +161,7 @@ func NewService(cfg *Config) (*Service, error) {
lightRequest: make(map[peer.ID]struct{}),
telemetryInterval: cfg.telemetryInterval,
closeCh: make(chan interface{}),
bufPool: bufPool,
}

network.syncQueue = newSyncQueue(network)
Expand Down Expand Up @@ -509,14 +525,12 @@ func isInbound(stream libp2pnetwork.Stream) bool {
}

func (s *Service) readStream(stream libp2pnetwork.Stream, decoder messageDecoder, handler messageHandler) {
var (
maxMessageSize uint64 = maxBlockResponseSize // TODO: determine actual max message size
msgBytes = make([]byte, maxMessageSize)
peer = stream.Conn().RemotePeer()
)
peer := stream.Conn().RemotePeer()
msgBytes := s.bufPool.get()
defer s.bufPool.put(&msgBytes)

for {
tot, err := readStream(stream, msgBytes)
tot, err := readStream(stream, msgBytes[:])
if err == io.EOF {
continue
} else if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions dot/network/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) {
cfg.Syncer = newMockSyncer()
}

cfg.noPreAllocate = true

srvc, err := NewService(cfg)
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s *Service) handleSyncMessage(stream libp2pnetwork.Stream, msg Message) er
return nil
}

var (
const (
blockRequestSize uint32 = 128
blockRequestBufferSize int = 6
blockResponseBufferSize int = 6
Expand Down