-
Notifications
You must be signed in to change notification settings - Fork 451
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
network:ws block byte limiter #5472
Changes from 20 commits
17c532f
80d71d4
413a2bd
c0bf235
503c439
2f43c59
e6c7908
a6716ea
3529573
ff1cdfe
68664af
d2d8f9d
5dcf685
bd0ee61
b1b8277
42c34fb
77771ed
83c8bea
977ce11
5aa8e33
f623eec
46d9d53
72ec677
84bb544
62414ed
fc1306b
6ba239e
18f3869
e1b6411
254ec5c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -142,6 +142,8 @@ | |
var networkPrioPPCompressedSize = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_pp_compressed_size_total", Description: "cumulative size of all compressed PP"}) | ||
var networkPrioPPNonCompressedSize = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_pp_non_compressed_size_total", Description: "cumulative size of all non-compressed PP"}) | ||
|
||
var networkCatchupMessagesDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_ue_messages_dropped", Description: "number of (UE) block catchup request messages dropped due to being at byte limit"}) | ||
Check failure on line 145 in network/wsNetwork.go GitHub Actions / reviewdog-warnings
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note the Lint warning for this var being unused There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed. I was originally using it but had it removed. There's a conversation above asking if we should record this for just this, both this and http rejections or neither |
||
|
||
// peerDisconnectionAckDuration defines the time we would wait for the peer disconnection to complete. | ||
const peerDisconnectionAckDuration = 5 * time.Second | ||
|
||
|
@@ -250,6 +252,8 @@ | |
// is used to ensure fairness across peers in terms of processing | ||
// messages. | ||
processing chan struct{} | ||
|
||
Callback func() | ||
} | ||
|
||
// Tag is a short string (2 bytes) marking a type of message | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -173,7 +173,8 @@ type Response struct { | |
} | ||
|
||
type sendMessages struct { | ||
msgs []sendMessage | ||
msgs []sendMessage | ||
callback func() | ||
iansuvak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
type wsPeer struct { | ||
|
@@ -410,7 +411,7 @@ func (wp *wsPeer) Respond(ctx context.Context, reqMsg IncomingMessage, responseT | |
} | ||
|
||
select { | ||
case wp.sendBufferBulk <- sendMessages{msgs: msg}: | ||
case wp.sendBufferBulk <- sendMessages{msgs: msg, callback: reqMsg.Callback}: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this select case is not picked, you may leak the counter decrement. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed, thanks! |
||
case <-wp.closing: | ||
wp.net.log.Debugf("peer closing %s", wp.conn.RemoteAddr().String()) | ||
return | ||
|
@@ -713,6 +714,9 @@ func (wp *wsPeer) handleFilterMessage(msg IncomingMessage) { | |
} | ||
|
||
func (wp *wsPeer) writeLoopSend(msgs sendMessages) disconnectReason { | ||
if msgs.callback != nil { | ||
defer msgs.callback() | ||
} | ||
for _, msg := range msgs.msgs { | ||
select { | ||
case <-msg.ctx.Done(): | ||
|
@@ -730,17 +734,17 @@ func (wp *wsPeer) writeLoopSend(msgs sendMessages) disconnectReason { | |
} | ||
|
||
func (wp *wsPeer) writeLoopSendMsg(msg sendMessage) disconnectReason { | ||
if len(msg.data) > maxMessageLength { | ||
wp.net.log.Errorf("trying to send a message longer than we would receive: %d > %d tag=%s", len(msg.data), maxMessageLength, string(msg.data[0:2])) | ||
// just drop it, don't break the connection | ||
return disconnectReasonNone | ||
} | ||
if msg.msgTags != nil { | ||
// when msg.msgTags is non-nil, the read loop has received a message-of-interest message that we want to apply. | ||
// in order to avoid any locking, it sent it to this queue so that we could set it as the new outgoing message tag filter. | ||
wp.sendMessageTag = msg.msgTags | ||
return disconnectReasonNone | ||
} | ||
if len(msg.data) > maxMessageLength { | ||
wp.net.log.Errorf("trying to send a message longer than we would receive: %d > %d tag=%s", len(msg.data), maxMessageLength, string(msg.data[0:2])) | ||
// just drop it, don't break the connection | ||
return disconnectReasonNone | ||
} | ||
// the tags are always 2 char long; note that this is safe since it's only being used for messages that we have generated locally. | ||
tag := protocol.Tag(msg.data[:2]) | ||
if !wp.sendMessageTag[tag] { | ||
|
@@ -920,6 +924,21 @@ func (wp *wsPeer) Close(deadline time.Time) { | |
wp.net.log.Infof("failed to CloseWithoutFlush to connection for %s", wp.conn.RemoteAddr().String()) | ||
} | ||
} | ||
|
||
// We need to loop through all of the messages with callbacks still in the send queue and call them | ||
// to ensure that state of counters such as wsBlockBytesUsed is correct. | ||
L: | ||
for { | ||
select { | ||
case msgs := <-wp.sendBufferBulk: | ||
if msgs.callback != nil { | ||
msgs.callback() | ||
} | ||
default: | ||
break L | ||
} | ||
|
||
} | ||
// now call all registered closers | ||
for _, f := range wp.closers { | ||
f() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ import ( | |
"strconv" | ||
"strings" | ||
"sync" | ||
"sync/atomic" | ||
|
||
"github.com/gorilla/mux" | ||
|
||
|
@@ -130,7 +131,7 @@ func MakeBlockService(log logging.Logger, config config.Local, ledger LedgerForB | |
fallbackEndpoints: makeFallbackEndpoints(log, config.BlockServiceCustomFallbackEndpoints), | ||
enableArchiverFallback: config.EnableBlockServiceFallbackToArchiver, | ||
log: log, | ||
memoryCap: config.BlockServiceHTTPMemCap, | ||
memoryCap: config.BlockServiceMemCap, | ||
} | ||
if service.enableService { | ||
net.RegisterHTTPHandler(BlockServiceBlockPath, service) | ||
|
@@ -301,11 +302,34 @@ const datatypeUnsupportedErrMsg = "requested data type is unsupported" | |
func (bs *BlockService) handleCatchupReq(ctx context.Context, reqMsg network.IncomingMessage) { | ||
target := reqMsg.Sender.(network.UnicastPeer) | ||
var respTopics network.Topics | ||
var n uint64 | ||
|
||
defer func() { | ||
if n > 0 { | ||
reqMsg.Callback = func() { | ||
bs.mu.Lock() | ||
bs.memoryUsed -= n | ||
iansuvak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
bs.mu.Unlock() | ||
} | ||
bs.mu.Lock() | ||
bs.memoryUsed += n | ||
bs.mu.Unlock() | ||
} | ||
target.Respond(ctx, reqMsg, respTopics) | ||
}() | ||
|
||
// If we are over-capacity, we will not process the request | ||
// respond to sender with error message | ||
memUsed := atomic.LoadUint64(&bs.memoryUsed) | ||
if memUsed > bs.memoryCap { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to just not return a response here? Or at least not give the peer memory information There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great question. In this case, the requester should request it from another peer. It cannot wait and request again, since these requests are time sensitive. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is what it already does after a 4 second timeout even if you return nothing so not returning a response should be the same as returning an error response There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for WS 4 seconds is a very long time if we want to address this properly. If there is a system cause for not receiving the proposed block (i.e. network disruption), for the agreement to move forward will need the blocks serviced as fast as possible. 4 seconds might be okay, we get a 20 second round, but this may get longer very quickly, and we can easily do better. Again, this is a product question: should be add a little bit more protocol implementation to address this rare situation or we leave it as is. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Returning an error in that case should still be good enough, but 4 second timeouts do happen somewhat regularly right now |
||
err := errMemoryAtCapacity{capacity: bs.memoryCap, used: memUsed} | ||
bs.log.Infof("BlockService handleCatchupReq: %s", err.Error()) | ||
respTopics = network.Topics{ | ||
network.MakeTopic(network.ErrorKey, []byte(err.Error())), | ||
} | ||
return | ||
} | ||
|
||
topics, err := network.UnmarshallTopics(reqMsg.Data) | ||
if err != nil { | ||
bs.log.Infof("BlockService handleCatchupReq: %s", err.Error()) | ||
|
@@ -338,7 +362,7 @@ func (bs *BlockService) handleCatchupReq(ctx context.Context, reqMsg network.Inc | |
[]byte(roundNumberParseErrMsg))} | ||
return | ||
} | ||
respTopics = topicBlockBytes(bs.log, bs.ledger, basics.Round(round), string(requestType)) | ||
respTopics, n = topicBlockBytes(bs.log, bs.ledger, basics.Round(round), string(requestType)) | ||
return | ||
} | ||
|
||
|
@@ -416,7 +440,7 @@ func (bs *BlockService) rawBlockBytes(round basics.Round) ([]byte, error) { | |
return data, err | ||
} | ||
|
||
func topicBlockBytes(log logging.Logger, dataLedger LedgerForBlockService, round basics.Round, requestType string) network.Topics { | ||
func topicBlockBytes(log logging.Logger, dataLedger LedgerForBlockService, round basics.Round, requestType string) (network.Topics, uint64) { | ||
blk, cert, err := dataLedger.EncodedBlockCert(round) | ||
if err != nil { | ||
switch err.(type) { | ||
|
@@ -425,7 +449,7 @@ func topicBlockBytes(log logging.Logger, dataLedger LedgerForBlockService, round | |
log.Infof("BlockService topicBlockBytes: %s", err) | ||
} | ||
return network.Topics{ | ||
network.MakeTopic(network.ErrorKey, []byte(blockNotAvailableErrMsg))} | ||
network.MakeTopic(network.ErrorKey, []byte(blockNotAvailableErrMsg))}, 0 | ||
} | ||
switch requestType { | ||
case BlockAndCertValue: | ||
|
@@ -434,10 +458,10 @@ func topicBlockBytes(log logging.Logger, dataLedger LedgerForBlockService, round | |
BlockDataKey, blk), | ||
network.MakeTopic( | ||
CertDataKey, cert), | ||
} | ||
}, uint64(len(blk) + len(cert)) | ||
default: | ||
return network.Topics{ | ||
network.MakeTopic(network.ErrorKey, []byte(datatypeUnsupportedErrMsg))} | ||
network.MakeTopic(network.ErrorKey, []byte(datatypeUnsupportedErrMsg))}, 0 | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want to add metrics, maybe we need one/same for the HTTP?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now it's unused since my change removed it? Should I remove it or add both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want these both metrics