Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network:ws block byte limiter #5472

Merged
merged 30 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
17c532f
Base implementation, no testing
iansuvak Jun 6, 2023
80d71d4
Merge remote-tracking branch 'upstream/master' into ws-block-byte-lim…
iansuvak Jun 13, 2023
413a2bd
Make existing tests pass
iansuvak Jun 13, 2023
c0bf235
Merge remote-tracking branch 'upstream/master' into ws-block-byte-lim…
iansuvak Jun 16, 2023
503c439
proposl: count memory in blockServer, use callback to decrement mem c…
algonautshant Jun 16, 2023
2f43c59
Merge pull request #4 from algonautshant/shant/suggestion/ian/ws-bloc…
iansuvak Jun 17, 2023
e6c7908
rename CallWhenDone to Callback
iansuvak Jun 20, 2023
a6716ea
encoding: Update go-codec version. (#5471)
winder Jun 16, 2023
3529573
Chore: Use strings.Cut for clarity (#5474)
jannotti Jun 16, 2023
ff1cdfe
network: improve MsgOfInterest message handling (#5476)
cce Jun 16, 2023
68664af
ledger: refactor store module interfaces before kv impl merge (#5451)
icorderi Jun 16, 2023
d2d8f9d
tools: replace upload_metrics (#5470)
shiqizng Jun 16, 2023
5dcf685
Unify ws and http limits to block_service.go
iansuvak Jun 21, 2023
bd0ee61
call after succeeding or failing instead of before
iansuvak Jun 21, 2023
b1b8277
Merge remote-tracking branch 'upstream/master' into ws-block-byte-lim…
iansuvak Jun 21, 2023
42c34fb
noop test skeleton
iansuvak Jun 22, 2023
77771ed
Almost working network test
iansuvak Jun 22, 2023
83c8bea
network side test
iansuvak Jun 23, 2023
977ce11
use locking methods to create responsechannels
iansuvak Jun 23, 2023
5aa8e33
fix blockservice test
iansuvak Jun 23, 2023
f623eec
Merge remote-tracking branch 'upstream/master' into ws-block-byte-lim…
iansuvak Jun 26, 2023
46d9d53
add callbacks to shutdown cases
iansuvak Jun 26, 2023
72ec677
add another test to confirm draining explicitly
iansuvak Jun 26, 2023
84bb544
Change callback to OnMessageRelease()
iansuvak Jun 27, 2023
62414ed
Use a separate counter for memory
iansuvak Jun 27, 2023
fc1306b
move onRelease to OutgoingMessage
iansuvak Jun 29, 2023
6ba239e
add counters for both http and ws and don't share memory information …
iansuvak Jun 29, 2023
18f3869
remove the now unused network side metric
iansuvak Jun 29, 2023
e1b6411
fix reviewdog issue
iansuvak Jun 29, 2023
254ec5c
fix failing test
iansuvak Jun 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/localTemplate.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,9 +521,9 @@ type Local struct {
// only relevant if TxIncomingFilteringFlags is non-zero
TxIncomingFilterMaxSize uint64 `version[28]:"500000"`

// BlockServiceHTTPMemCap is the memory capacity in bytes which is allowed for the block service to use for HTTP block requests.
// BlockServiceMemCap is the memory capacity in bytes which is allowed for the block service to use for HTTP block requests.
// When it exceeds this capacity, it redirects the block requests to a different node
BlockServiceHTTPMemCap uint64 `version[28]:"500000000"`
BlockServiceMemCap uint64 `version[28]:"500000000"`
}

// DNSBootstrapArray returns an array of one or more DNS Bootstrap identifiers
Expand Down
2 changes: 1 addition & 1 deletion config/local_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var defaultLocal = Local{
Archival: false,
BaseLoggerDebugLevel: 4,
BlockServiceCustomFallbackEndpoints: "",
BlockServiceHTTPMemCap: 500000000,
BlockServiceMemCap: 500000000,
BroadcastConnectionsLimit: -1,
CadaverDirectory: "",
CadaverSizeTarget: 0,
Expand Down
2 changes: 1 addition & 1 deletion installer/config.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"Archival": false,
"BaseLoggerDebugLevel": 4,
"BlockServiceCustomFallbackEndpoints": "",
"BlockServiceHTTPMemCap": 500000000,
"BlockServiceMemCap": 500000000,
"BroadcastConnectionsLimit": -1,
"CadaverDirectory": "",
"CadaverSizeTarget": 0,
Expand Down
4 changes: 4 additions & 0 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@
var networkPrioPPCompressedSize = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_pp_compressed_size_total", Description: "cumulative size of all compressed PP"})
var networkPrioPPNonCompressedSize = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_pp_non_compressed_size_total", Description: "cumulative size of all non-compressed PP"})

var networkCatchupMessagesDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_ue_messages_dropped", Description: "number of (UE) block catchup request messages dropped due to being at byte limit"})

Check failure on line 145 in network/wsNetwork.go

View workflow job for this annotation

GitHub Actions / reviewdog-warnings

[Lint Warnings] reported by reviewdog 🐶 var `networkCatchupMessagesDropped` is unused (unused) Raw Output: network/wsNetwork.go:145:5: var `networkCatchupMessagesDropped` is unused (unused) var networkCatchupMessagesDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_ue_messages_dropped", Description: "number of (UE) block catchup request messages dropped due to being at byte limit"}) ^
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to add metrics, maybe we need one/same for the HTTP?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now it's unused since my change removed it? Should I remove it or add both?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want these both metrics

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note the Lint warning for this var being unused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. I was originally using it but had it removed. There's a conversation above asking if we should record this for just this, both this and http rejections or neither


// peerDisconnectionAckDuration defines the time we would wait for the peer disconnection to complete.
const peerDisconnectionAckDuration = 5 * time.Second

Expand Down Expand Up @@ -250,6 +252,8 @@
// is used to ensure fairness across peers in terms of processing
// messages.
processing chan struct{}

Callback func()
}

// Tag is a short string (2 bytes) marking a type of message
Expand Down
2 changes: 1 addition & 1 deletion network/wsNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ func TestWebsocketNetworkCancel(t *testing.T) {
msgs[50].ctx = ctx

for _, peer := range peers {
peer.sendBufferHighPrio <- sendMessages{msgs}
peer.sendBufferHighPrio <- sendMessages{msgs: msgs}
}

select {
Expand Down
39 changes: 32 additions & 7 deletions network/wsPeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ type Response struct {
}

type sendMessages struct {
msgs []sendMessage
msgs []sendMessage
callback func()
iansuvak marked this conversation as resolved.
Show resolved Hide resolved
}

type wsPeer struct {
Expand Down Expand Up @@ -410,7 +411,7 @@ func (wp *wsPeer) Respond(ctx context.Context, reqMsg IncomingMessage, responseT
}

select {
case wp.sendBufferBulk <- sendMessages{msgs: msg}:
case wp.sendBufferBulk <- sendMessages{msgs: msg, callback: reqMsg.Callback}:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this select case is not picked, you may leak the counter decrement.
For now, this likely won't matter, since ctx is canceled only when the service is shutting down.
If this is the case, we are still vulnerable to future changes.
Better to handle the case here and make the behavior robust irrespective of why or who closes or cancels.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, thanks!

case <-wp.closing:
wp.net.log.Debugf("peer closing %s", wp.conn.RemoteAddr().String())
return
Expand Down Expand Up @@ -713,6 +714,9 @@ func (wp *wsPeer) handleFilterMessage(msg IncomingMessage) {
}

func (wp *wsPeer) writeLoopSend(msgs sendMessages) disconnectReason {
if msgs.callback != nil {
defer msgs.callback()
}
for _, msg := range msgs.msgs {
select {
case <-msg.ctx.Done():
Expand All @@ -730,17 +734,17 @@ func (wp *wsPeer) writeLoopSend(msgs sendMessages) disconnectReason {
}

func (wp *wsPeer) writeLoopSendMsg(msg sendMessage) disconnectReason {
if len(msg.data) > maxMessageLength {
wp.net.log.Errorf("trying to send a message longer than we would receive: %d > %d tag=%s", len(msg.data), maxMessageLength, string(msg.data[0:2]))
// just drop it, don't break the connection
return disconnectReasonNone
}
if msg.msgTags != nil {
// when msg.msgTags is non-nil, the read loop has received a message-of-interest message that we want to apply.
// in order to avoid any locking, it sent it to this queue so that we could set it as the new outgoing message tag filter.
wp.sendMessageTag = msg.msgTags
return disconnectReasonNone
}
if len(msg.data) > maxMessageLength {
wp.net.log.Errorf("trying to send a message longer than we would receive: %d > %d tag=%s", len(msg.data), maxMessageLength, string(msg.data[0:2]))
// just drop it, don't break the connection
return disconnectReasonNone
}
// the tags are always 2 char long; note that this is safe since it's only being used for messages that we have generated locally.
tag := protocol.Tag(msg.data[:2])
if !wp.sendMessageTag[tag] {
Expand Down Expand Up @@ -804,9 +808,15 @@ func (wp *wsPeer) writeLoop() {
}
case data := <-wp.sendBufferBulk:
if writeErr := wp.writeLoopSend(data); writeErr != disconnectReasonNone {
if data.callback != nil {
data.callback()
}
cleanupCloseError = writeErr
return
}
if data.callback != nil {
data.callback()
}
}
}
}
Expand Down Expand Up @@ -920,6 +930,21 @@ func (wp *wsPeer) Close(deadline time.Time) {
wp.net.log.Infof("failed to CloseWithoutFlush to connection for %s", wp.conn.RemoteAddr().String())
}
}

// We need to loop through all of the messages with callbacks still in the send queue and call them
// to ensure that state of counters such as wsBlockBytesUsed is correct.
L:
for {
select {
case msgs := <-wp.sendBufferBulk:
if msgs.callback != nil {
msgs.callback()
}
default:
break L
}

}
// now call all registered closers
for _, f := range wp.closers {
f()
Expand Down
23 changes: 17 additions & 6 deletions rpcs/blockService.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func MakeBlockService(log logging.Logger, config config.Local, ledger LedgerForB
fallbackEndpoints: makeFallbackEndpoints(log, config.BlockServiceCustomFallbackEndpoints),
enableArchiverFallback: config.EnableBlockServiceFallbackToArchiver,
log: log,
memoryCap: config.BlockServiceHTTPMemCap,
memoryCap: config.BlockServiceMemCap,
}
if service.enableService {
net.RegisterHTTPHandler(BlockServiceBlockPath, service)
Expand Down Expand Up @@ -301,8 +301,19 @@ const datatypeUnsupportedErrMsg = "requested data type is unsupported"
func (bs *BlockService) handleCatchupReq(ctx context.Context, reqMsg network.IncomingMessage) {
target := reqMsg.Sender.(network.UnicastPeer)
var respTopics network.Topics
var n uint64

defer func() {
if n > 0 {
reqMsg.Callback = func() {
bs.mu.Lock()
bs.memoryUsed -= n
iansuvak marked this conversation as resolved.
Show resolved Hide resolved
bs.mu.Unlock()
}
bs.mu.Lock()
bs.memoryUsed += n
bs.mu.Unlock()
}
target.Respond(ctx, reqMsg, respTopics)
}()

Expand Down Expand Up @@ -338,7 +349,7 @@ func (bs *BlockService) handleCatchupReq(ctx context.Context, reqMsg network.Inc
[]byte(roundNumberParseErrMsg))}
return
}
respTopics = topicBlockBytes(bs.log, bs.ledger, basics.Round(round), string(requestType))
respTopics, n = topicBlockBytes(bs.log, bs.ledger, basics.Round(round), string(requestType))
return
}

Expand Down Expand Up @@ -416,7 +427,7 @@ func (bs *BlockService) rawBlockBytes(round basics.Round) ([]byte, error) {
return data, err
}

func topicBlockBytes(log logging.Logger, dataLedger LedgerForBlockService, round basics.Round, requestType string) network.Topics {
func topicBlockBytes(log logging.Logger, dataLedger LedgerForBlockService, round basics.Round, requestType string) (network.Topics, uint64) {
blk, cert, err := dataLedger.EncodedBlockCert(round)
if err != nil {
switch err.(type) {
Expand All @@ -425,7 +436,7 @@ func topicBlockBytes(log logging.Logger, dataLedger LedgerForBlockService, round
log.Infof("BlockService topicBlockBytes: %s", err)
}
return network.Topics{
network.MakeTopic(network.ErrorKey, []byte(blockNotAvailableErrMsg))}
network.MakeTopic(network.ErrorKey, []byte(blockNotAvailableErrMsg))}, 0
}
switch requestType {
case BlockAndCertValue:
Expand All @@ -434,10 +445,10 @@ func topicBlockBytes(log logging.Logger, dataLedger LedgerForBlockService, round
BlockDataKey, blk),
network.MakeTopic(
CertDataKey, cert),
}
}, uint64(len(blk) + len(cert))
default:
return network.Topics{
network.MakeTopic(network.ErrorKey, []byte(datatypeUnsupportedErrMsg))}
network.MakeTopic(network.ErrorKey, []byte(datatypeUnsupportedErrMsg))}, 0
}
}

Expand Down
2 changes: 1 addition & 1 deletion test/testdata/configs/config-v28.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"Archival": false,
"BaseLoggerDebugLevel": 4,
"BlockServiceCustomFallbackEndpoints": "",
"BlockServiceHTTPMemCap": 500000000,
"BlockServiceMemCap": 500000000,
AlgoAxel marked this conversation as resolved.
Show resolved Hide resolved
"BroadcastConnectionsLimit": -1,
"CadaverDirectory": "",
"CadaverSizeTarget": 0,
Expand Down