-
Notifications
You must be signed in to change notification settings - Fork 337
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(pushync): forwarder multiplexer #4008
Conversation
be6da59
to
1b90663
Compare
if ps.unwrap != nil { | ||
go ps.unwrap(chunk) | ||
} | ||
go ps.unwrap(chunk) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the ps.unwrap != nil check not required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no because it's provided in every pushsync.New...
// if the peer is closer to the chunk, AND it's a full node, we were selected for replication. Return early. | ||
if p.FullNode { | ||
if closer, _ := p.Address.Closer(chunkAddress, ps.address); closer { | ||
store := func(ctx context.Context) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could store
be moved to individual function, out of handler
? handler
looks quite big atm, so every reduction in it's size would simplify reading.
pkg/pushsync/pushsync.go
Outdated
err = fmt.Errorf("chunk %s deliver to peer %s: %w", ch.Address(), peer, err) | ||
return | ||
} | ||
func (ps *PushSync) push(ctx context.Context, resultChan chan<- receiptResult, doneChan <-chan struct{}, peer swarm.Address, ch swarm.Chunk, action accounting.Action) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is doneChan
needed in push
func? it seems redundant with ctx
, isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
context is not canceled ever
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from the perspective of this function, context should signal end of execution. having context and doneChan simultaneously is signaling that something is not right (in this case this context should be canceled, instead of closing doneChan). this probably has implications on upper levels, but nothing that couldn't be solved.
pkg/pushsync/pushsync.go
Outdated
err = fmt.Errorf("chunk %s deliver to peer %s: %w", ch.Address(), peer, err) | ||
return | ||
} | ||
func (ps *PushSync) push(ctx context.Context, resultChan chan<- receiptResult, doneChan <-chan struct{}, peer swarm.Address, ch swarm.Chunk, action accounting.Action) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from the perspective of this function, context should signal end of execution. having context and doneChan simultaneously is signaling that something is not right (in this case this context should be canceled, instead of closing doneChan). this probably has implications on upper levels, but nothing that couldn't be solved.
pkg/pushsync/pushsync.go
Outdated
case result := <-resultChan: | ||
|
||
inflight-- | ||
|
||
ps.measurePushPeer(result.pushTime, result.err, origin) | ||
|
||
if ps.warmedUp() && !errors.Is(result.err, accounting.ErrOverdraft) { | ||
ps.skipList.Add(ch.Address(), result.peer, sanctionWait) | ||
} | ||
|
||
if result.err == nil { | ||
return result.receipt, nil | ||
} | ||
|
||
ps.metrics.TotalFailedSendAttempts.Inc() | ||
logger.Debug("could not push to peer", "peer_address", result.peer, "error", result.err) | ||
|
||
if result.sent { | ||
sentErrorsLeft-- | ||
} | ||
sentErrorsLeft-- | ||
|
||
retry() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this case will process results of push operations and when first successful push has happens this function will return (line 415). when parallel forwards are also happening, it appears that there could be these issues:
- logic from this case statement will not execute; which could skew metrics and telemetry a bit
- race condition could prevent chunks to be forwarded to other peers if
pushToClosest
returns early after getting first successful push. this can happen becausecontext
and/ordoneChan
(whichever is relevant) would get closed therefore closing push goroutine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have a valid point with the early context cancellation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the done channel and the push result return now waits on the parent ctx's done channel instead.
The push func creates it's own context to allow each push attempt to run independently of the parent context
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for 1), wouldn't it be issue for TotalSendAttempts
counter is incremented 5 times (1 default forward + 4 multiplexing) and it's counterpart counter measuring errors TotalFailedSendAttempts
never even when last 4 push operations have failed? It looks like there will be no logging and no telemetry for multiplexed chunks?
defer func() { | ||
select { | ||
case resultChan <- receiptResult{pushTime: now, peer: peer, err: err, sent: pushed, receipt: &receipt}: | ||
case <-doneChan: | ||
case resultChan <- receiptResult{pushTime: now, peer: peer, err: err, receipt: receipt}: | ||
case <-parentCtx.Done(): | ||
} | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
push
seems to be inverting responsibility, from it's parent to itself (child) when processing result of push operation.
ideally push
shouldn't know anything about it's parent (whether it's live or not, or how to send back result). this could be achieved by returning promise like chan eg. (<-chan receiptResult)
which could be used by parent to read result whenever it's ready or result could be discarded if parent has ended.
count := 0 | ||
// Push the chunk to some peers in the neighborhood in parallel for replication. | ||
// Any errors here should NOT impact the rest of the handler. | ||
_ = ps.topologyDriver.EachConnectedPeer(func(peer swarm.Address, po uint8) (bool, bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it make sense to have logger with error when EachConnectedPeer
call fails?
Checklist
Description
See the linked issue.
Major changes are:
the forwarder node that can reach the neighborhood fires multiples requests to different peers.
as soon as the chunk lands in the neighborhood, it's stored and replicated.
the pushsync request now has a new 'replicate bool' field which tells the next peer to only store the chunk and not forward it.
Open API Spec Version Changes (if applicable)
Motivation and Context (Optional)
Related Issue (Optional)
Screenshots (if appropriate):