New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pushsync retry next peer after error #947
Conversation
68470c7
to
3f18d96
Compare
/run beekeeper |
2 similar comments
/run beekeeper |
/run beekeeper |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice. I have some comments only about defers in the retry loop.
pkg/pushsync/pushsync.go
Outdated
} | ||
return nil, fmt.Errorf("closest peer: %w", err) | ||
} | ||
defer ps.accounting.Release(peer, receiptPrice) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This defer is inside a retry loop. I think that this call should happen after the current iteration ends, not after all iterations. Maybe to have some encapsulating function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both defer section I have left intentionally. I figured it would be fine to leave it for the end, however I see now it would be problem for accounting.
pkg/pushsync/pushsync.go
Outdated
ps.logger.Debugf("pushsync-push: %w", lastErr) | ||
continue | ||
} | ||
defer func() { go streamer.FullClose() }() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are errors after this defer, there may be multiple FullClose calls. I do not think that it is a problem, but it is not necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
once again, i suggest handling technical debt asap after release
pkg/pushsync/pushsync.go
Outdated
err error | ||
) | ||
|
||
if i == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is cleaner and easier to read if you pull this part before the for loop.
pkg/pushsync/pushsync.go
Outdated
} | ||
ps.metrics.ReceiptRTT.Observe(time.Since(receiptRTTTimer).Seconds()) | ||
|
||
// if you manage to get a tag, just increment the respective counter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should come after sending off to the first peer. L307
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved this intentionally after receiveReceipt
section, as it seemed pointless to mark it as sent if we did not receive confirmation. If the host fails after that, we might switch to another, and increment sent state again. I will move it back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually you must not increment it twice #843 if you do you go above split count
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved this intentionally after
receiveReceipt
section, as it seemed pointless to mark it as sent if we did not receive confirmation.
you made the wrong assumption. sent is incremented when the chunk hits the network
synced
incremented when u get the receipt back (no matter how many retries it took)
pkg/pushsync/pushsync.go
Outdated
// provided address addr. This function will ignore peers with addresses | ||
// provided in skipPeers. | ||
func (ps *PushSync) closestPeer(addr swarm.Address, skipPeers []swarm.Address) (swarm.Address, error) { | ||
closest := swarm.Address{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name return arguments and automatically. get the zero value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am aware of this, however I am not a great fan of named return arguments. I mostly use them to stay in line with some existing code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm here you actually introduced one https://github.com/ethersphere/bee/pull/947/files#diff-1f5e1454bf78aa94ff061f30fb1b70e522169b317f53acdd13042a864561f41fR144
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well yes, as I commented, if there are usages in existing (surrounding) code, I try to fit new code to be similar.
pkg/pushsync/pushsync.go
Outdated
} | ||
switch dcmp { | ||
case 0: | ||
// do nothing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cannot happen as you returned on L371
pkg/pushsync/pushsync.go
Outdated
closest = peer | ||
return false, false, nil | ||
} | ||
dcmp, err := swarm.DistanceCmp(addr.Bytes(), closest.Bytes(), peer.Bytes()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think this is needed. The iterator should take care of this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I get this comment.
I have checked (before completing the PR) about how the ClosestPeer
works. It is expected that this function always goes through ALL peers, and this compare function then decides which is the "closest".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is true. This is needed as ClosestPeer
iterates on all peers, as @zbiljic described. This part was required in retrieval also, to avoid upstream requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh. my. god. apologies then. our technical debt is way bigger than i thought
pkg/pushsync/pushsync.go
Outdated
err := ps.peerSuggester.EachPeerRev(func(peer swarm.Address, po uint8) (bool, bool, error) { | ||
for _, a := range skipPeers { | ||
if a.Equal(peer) { | ||
return false, false, nil | ||
} | ||
} | ||
if closest.IsZero() { | ||
closest = peer | ||
return false, false, nil | ||
} | ||
dcmp, err := swarm.DistanceCmp(addr.Bytes(), closest.Bytes(), peer.Bytes()) | ||
if err != nil { | ||
return false, false, fmt.Errorf("distance compare error. addr %s closest %s peer %s: %w", addr.String(), closest.String(), peer.String(), err) | ||
} | ||
switch dcmp { | ||
case 0: | ||
// do nothing | ||
case -1: | ||
// current peer is closer | ||
closest = peer | ||
case 1: | ||
// closest is already closer to chunk | ||
// do nothing | ||
} | ||
return false, false, nil | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
err := ps.peerSuggester.EachPeerRev(func(peer swarm.Address, po uint8) (bool, bool, error) { | |
for _, a := range skipPeers { | |
if a.Equal(peer) { | |
return false, false, nil | |
} | |
} | |
if closest.IsZero() { | |
closest = peer | |
return false, false, nil | |
} | |
dcmp, err := swarm.DistanceCmp(addr.Bytes(), closest.Bytes(), peer.Bytes()) | |
if err != nil { | |
return false, false, fmt.Errorf("distance compare error. addr %s closest %s peer %s: %w", addr.String(), closest.String(), peer.String(), err) | |
} | |
switch dcmp { | |
case 0: | |
// do nothing | |
case -1: | |
// current peer is closer | |
closest = peer | |
case 1: | |
// closest is already closer to chunk | |
// do nothing | |
} | |
return false, false, nil | |
}) | |
err := ps.peerSuggester.EachPeerRev(func(peer swarm.Address, po uint8) (bool, bool, error) { | |
if contains(skipPeers, peer) { | |
return false, false, nil | |
} | |
closest = peer | |
return true, false, nil | |
} | |
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As noted in another comment, EachPeerer
interface is sort-of misleading (also note that function used is EachPeerRev
, whose comment notes that it "iterates from farthest bin to closest"), as it is expected that all peers are processed.
pkg/pushsync/pushsync.go
Outdated
} | ||
|
||
return rec, nil | ||
return closest, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this entire functionshould be part of peerSuggester
pkg/topology/topology.go
Outdated
@@ -31,6 +31,11 @@ type PeerAdder interface { | |||
AddPeers(ctx context.Context, addr ...swarm.Address) error | |||
} | |||
|
|||
type Peerer interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i dont. see the need for several suggester interfaces.
the single one should be called topology.Router
and should expose a Nearest
function that always takes a skip array:
type Router interface {
Nearest(addr swarm.Address, skip ...swarm.Address) (peerAddr swarm.Address, err error)
}
return 0 | ||
} | ||
|
||
// EachPeer iterates from closest bin to farthest | ||
func (_ *mock) EachPeer(_ topology.EachPeerFunc) error { | ||
panic("not implemented") // TODO: Implement | ||
func (d *mock) EachPeer(f topology.EachPeerFunc) (err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
implement stop. condition if you expose this at all.
since the mock did not implement this i guess it is not used in which case only expose `ClosestPeer
` which always takes a skip list
return closest, nil | ||
} | ||
|
||
func (ps *PushSync) blocklistPeer(peer swarm.Address) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be abstracted in blocklist pkg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about this, it's just separate method as its called on two places, and has specific logging depending if error or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if such is needed in one protocol then inanother. one too
a67d90a
to
8a8267c
Compare
pkg/topology/topology.go
Outdated
// ClosestPeer returns the closest connected peer we have in relation to a | ||
// given chunk address. | ||
// This function will ignore peers with addresses provided in skipPeers. | ||
// Returns topology.ErrWantSelf in case base is the closest to the chunk. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
closest to the address
7b63bfe
to
9ea94bb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @zbiljic!
Updates
pushsync.PushChunkToClosest
function with the change where it will attempt multiple peers if one fails.Previous code only ever attempted to push to same peer (that was deemed as "closest"), even in the face of problems with that host. This change adds loop which attempts closest peer, and if there are some problems with that peer tries some other closest peer (from the available ones). Depending on the error, it may blocklist the peer for some time (one minute). It will try to maximum of
5
peers. If each of them fails it will return last error that was encountered.Note that new code still uses original
ClosestPeer
function, in which case it will again be possible that selected peer will be "self". If we want to change this, it would be possible.