Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network:ws block byte limiter #5472

Merged
merged 30 commits into from Jun 29, 2023

Conversation

iansuvak
Copy link
Contributor

Summary

This is the websocket counterpart of the HTTP block server PR implemented here #5428

The goal of it is to limit the number of concurrent bytes worth of block requests that we can serve at a given time. It counts the bytes as used from the moment that they land in the send queue channel and subtracts once they are sent or the connection is terminated.

The tricky implementation choice here is where to track the number of bytes. Ideally we would be able to do it in the blockserver.go like the HTTP PR does but since the bytes aren't freed until the network package is done with them and network doesn't have access to the blockserver we would have to do this by changing the interface to allow messages to communicate back to the blockserver via a channel perhaps? All thoughts and opinions are much appreciated

Test Plan

Don't have the tests written yet since I wanted to get feedback on the approach beforehand.

Will write new tests focusing on ensuring that send queue is drained properly in different cases.

@codecov
Copy link

codecov bot commented Jun 16, 2023

Codecov Report

Merging #5472 (254ec5c) into master (68a4aba) will increase coverage by 0.02%.
The diff coverage is 85.00%.

@@            Coverage Diff             @@
##           master    #5472      +/-   ##
==========================================
+ Coverage   55.78%   55.81%   +0.02%     
==========================================
  Files         446      446              
  Lines       63253    63282      +29     
==========================================
+ Hits        35288    35322      +34     
+ Misses      25593    25584       -9     
- Partials     2372     2376       +4     
Impacted Files Coverage Δ
config/localTemplate.go 70.76% <ø> (ø)
rpcs/blockService.go 74.33% <83.33%> (+5.38%) ⬆️
network/wsPeer.go 70.53% <86.66%> (+0.29%) ⬆️
network/wsNetwork.go 72.87% <100.00%> (ø)

... and 7 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@@ -524,6 +524,9 @@ type Local struct {
// BlockServiceHTTPMemCap is the memory capacity in bytes which is allowed for the block service to use for HTTP block requests.
// When it exceeds this capacity, it redirects the block requests to a different node
BlockServiceHTTPMemCap uint64 `version[28]:"500000000"`

// BlockServiceWSMemCap is the memory capacity in bytes which is allowed for the block service to use for websocket block requests.
BlockServiceWSMemCap int64 `version[28]:"500000000"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can make a single parameter and use it for both HTTP and WS. We are having too many config params...

@@ -142,6 +142,8 @@ var networkPrioBatchesPPWithoutCompression = metrics.MakeCounter(metrics.MetricN
var networkPrioPPCompressedSize = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_pp_compressed_size_total", Description: "cumulative size of all compressed PP"})
var networkPrioPPNonCompressedSize = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_pp_non_compressed_size_total", Description: "cumulative size of all non-compressed PP"})

var networkCatchupMessagesDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_ue_messages_dropped", Description: "number of (UE) block catchup request messages dropped due to being at byte limit"})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to add metrics, maybe we need one/same for the HTTP?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now it's unused since my change removed it? Should I remove it or add both?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want these both metrics

@@ -470,6 +472,13 @@ type WebsocketNetwork struct {

// resolveSRVRecords is a function that resolves SRV records for a given service, protocol and name
resolveSRVRecords func(service string, protocol string, name string, fallbackDNSResolverAddress string, secure bool) (addrs []string, err error)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is too much complication in the network layer.
I have a prototype suggestion here:
iansuvak#4

Keep the accounting similar to #5428 and use a callback as in iansuvak#4

@iansuvak iansuvak changed the title WIP: network:ws block byte limiter network:ws block byte limiter Jun 23, 2023
@iansuvak iansuvak marked this pull request as ready for review June 23, 2023 18:15
// If we are over-capacity, we will not process the request
// respond to sender with error message
memUsed := atomic.LoadUint64(&bs.memoryUsed)
if memUsed > bs.memoryCap {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to just not return a response here? Or at least not give the peer memory information

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question.
We need to do something similar to the HTTP request. There we got it for free, using the protocol retry message.
Here, we need to implement on the requester side, how to react to this error.

In this case, the requester should request it from another peer. It cannot wait and request again, since these requests are time sensitive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what it already does after a 4 second timeout even if you return nothing so not returning a response should be the same as returning an error response

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for WS 4 seconds is a very long time if we want to address this properly.
The request will come from the agreement service, so it needs the block in milliseconds, not seconds :-)

If there is a system cause for not receiving the proposed block (i.e. network disruption), for the agreement to move forward will need the blocks serviced as fast as possible. 4 seconds might be okay, we get a 20 second round, but this may get longer very quickly, and we can easily do better.

Again, this is a product question: should be add a little bit more protocol implementation to address this rare situation or we leave it as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning an error in that case should still be good enough, but 4 second timeouts do happen somewhat regularly right now

network/wsPeer.go Outdated Show resolved Hide resolved
rpcs/blockService.go Outdated Show resolved Hide resolved
@@ -410,7 +411,7 @@ func (wp *wsPeer) Respond(ctx context.Context, reqMsg IncomingMessage, responseT
}

select {
case wp.sendBufferBulk <- sendMessages{msgs: msg}:
case wp.sendBufferBulk <- sendMessages{msgs: msg, callback: reqMsg.Callback}:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this select case is not picked, you may leak the counter decrement.
For now, this likely won't matter, since ctx is canceled only when the service is shutting down.
If this is the case, we are still vulnerable to future changes.
Better to handle the case here and make the behavior robust irrespective of why or who closes or cancels.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, thanks!

@iansuvak iansuvak requested review from cce and AlgoAxel June 28, 2023 20:10
@@ -142,6 +142,8 @@ var networkPrioBatchesPPWithoutCompression = metrics.MakeCounter(metrics.MetricN
var networkPrioPPCompressedSize = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_pp_compressed_size_total", Description: "cumulative size of all compressed PP"})
var networkPrioPPNonCompressedSize = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_pp_non_compressed_size_total", Description: "cumulative size of all non-compressed PP"})

var networkCatchupMessagesDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_ue_messages_dropped", Description: "number of (UE) block catchup request messages dropped due to being at byte limit"})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note the Lint warning for this var being unused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. I was originally using it but had it removed. There's a conversation above asking if we should record this for just this, both this and http rejections or neither

test/testdata/configs/config-v28.json Show resolved Hide resolved
Copy link
Contributor

@algorandskiy algorandskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess having a callback on when a response leaves this node is fine, but it only works for topics since Respond has IncomingMessage as an argument, so it is not a generic implementation so should not be part of IncomingMessage.

The Respond handler is used only in blockService (the usage in wsNetwork appears to be unreachable since the Respond action is not used.

I think Respond handler should be refactored to accept OutgoingMessage instead of responseTopics Topics especially OutgoingMessage has Topics field so no functionality will be lost. Adding OnRelease (or OnSent) to OutgoingMessage would make it more generic/usable.

Maybe call it "OnResponseSent" instead of "OnRelease"?

network/wsPeer.go Outdated Show resolved Hide resolved
@@ -142,6 +142,8 @@ var networkPrioBatchesPPWithoutCompression = metrics.MakeCounter(metrics.MetricN
var networkPrioPPCompressedSize = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_pp_compressed_size_total", Description: "cumulative size of all compressed PP"})
var networkPrioPPNonCompressedSize = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_pp_non_compressed_size_total", Description: "cumulative size of all non-compressed PP"})

var networkCatchupMessagesDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_ue_messages_dropped", Description: "number of (UE) block catchup request messages dropped due to being at byte limit"})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want these both metrics

algorandskiy
algorandskiy previously approved these changes Jun 29, 2023
Copy link
Contributor

@algorandskiy algorandskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, please fix the reviewdog and a failing test

}
atomic.AddUint64(&bs.wsMemoryUsed, (n))
}
target.Respond(ctx, reqMsg, outMsg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe log an error here?

@algorandskiy algorandskiy merged commit 0bc522d into algorand:master Jun 29, 2023
17 checks passed
case <-wp.closing:
outMsg.OnRelease()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't you need to check if OnRelease is nil or not?

wp.net.log.Debugf("peer closing %s", wp.conn.RemoteAddr().String())
return
case <-ctx.Done():
outMsg.OnRelease()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

9 participants