Skip to content

Commit

Permalink
chore: Buffer remote candidates like local (#77)
Browse files Browse the repository at this point in the history
* chore: Buffer remote candidates like local

This was added for local candidates, and is required for remote
to prevent a race where they are added before a negotiation is
complete.

I removed the mutex earlier, because it would cause a different race.
I didn't realize the remote candidates wouldn't be buffered,
but with this change they are!

* Use local description instead

* Add logging for candidate flush

* Fix race with atomic bool

* Simplify locks

* Add mutex to flush

* Reset buffer

* Remove leak dependency to limit confusion

* Fix ordering

* Revert channel close

* Flush candidates after remote session description is set

* Bump up count to ensure race is fixed

* Use custom ICE dependency

* Fix data race

* Lower timeout to make for fast CI

* Add back mutex to prevent race

* Improve debug logging

* Lock on local description

* Flush local candidates uniquely

* Fix race

* Move mutex to prevent candidate send race

* Move lock to handshake so no race can occur

* Reduce timeout to improve test times

* Move unlock to defer

* Use flushed bool instead of checking remote
  • Loading branch information
kylecarbs committed Jan 27, 2022
1 parent 9329a50 commit 30dae97
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 9 deletions.
1 change: 1 addition & 0 deletions peer/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ func (c *Channel) closeWithError(err error) error {
c.conn.dcDisconnectListeners.Sub(1)
c.conn.dcFailedListeners.Sub(1)
c.conn.dcClosedWaitGroup.Done()

return err
}

Expand Down
32 changes: 25 additions & 7 deletions peer/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp
dcDisconnectChannel: make(chan struct{}),
dcFailedChannel: make(chan struct{}),
localCandidateChannel: make(chan webrtc.ICECandidateInit),
localSessionDescriptionChannel: make(chan webrtc.SessionDescription),
pendingCandidates: make([]webrtc.ICECandidateInit, 0),
localSessionDescriptionChannel: make(chan webrtc.SessionDescription),
remoteSessionDescriptionChannel: make(chan webrtc.SessionDescription),
}
if client {
Expand Down Expand Up @@ -120,8 +120,9 @@ type Conn struct {
localSessionDescriptionChannel chan webrtc.SessionDescription
remoteSessionDescriptionChannel chan webrtc.SessionDescription

pendingCandidates []webrtc.ICECandidateInit
pendingCandidatesMutex sync.Mutex
pendingCandidates []webrtc.ICECandidateInit
pendingCandidatesMutex sync.Mutex
pendingCandidatesFlushed bool

pingChannelID uint16
pingEchoChannelID uint16
Expand All @@ -141,15 +142,15 @@ func (c *Conn) init() error {
if iceCandidate == nil {
return
}
// ICE Candidates on a remote peer are reset when an offer
// is received. We must wait until the offer<->answer has
// been negotiated to flush candidates.
c.pendingCandidatesMutex.Lock()
defer c.pendingCandidatesMutex.Unlock()
if c.rtc.RemoteDescription() == nil {

if !c.pendingCandidatesFlushed {
c.opts.Logger.Debug(context.Background(), "adding local candidate to buffer")
c.pendingCandidates = append(c.pendingCandidates, iceCandidate.ToJSON())
return
}
c.opts.Logger.Debug(context.Background(), "adding local candidate")
select {
case <-c.closed:
break
Expand Down Expand Up @@ -282,10 +283,17 @@ func (c *Conn) negotiate() {

err := c.rtc.SetRemoteDescription(remoteDescription)
if err != nil {
c.pendingCandidatesMutex.Unlock()
_ = c.CloseWithError(xerrors.Errorf("set remote description (closed %v): %w", c.isClosed(), err))
return
}

if c.offerrer {
// ICE candidates reset when an offer/answer is set for the first
// time. If candidates flush before this point, a connection could fail.
c.flushPendingCandidates()
}

if !c.offerrer {
answer, err := c.rtc.CreateAnswer(&webrtc.AnswerOptions{})
if err != nil {
Expand All @@ -305,18 +313,27 @@ func (c *Conn) negotiate() {
return
case c.localSessionDescriptionChannel <- answer:
}

// Wait until the local description is set to flush candidates.
c.flushPendingCandidates()
}
}

// flushPendingCandidates writes all local candidates to the candidate send channel.
// The localCandidateChannel is expected to be serviced, otherwise this could block.
func (c *Conn) flushPendingCandidates() {
c.pendingCandidatesMutex.Lock()
defer c.pendingCandidatesMutex.Unlock()
for _, pendingCandidate := range c.pendingCandidates {
c.opts.Logger.Debug(context.Background(), "flushing local candidate")
select {
case <-c.closed:
return
case c.localCandidateChannel <- pendingCandidate:
}
}
c.pendingCandidates = make([]webrtc.ICECandidateInit, 0)
c.pendingCandidatesFlushed = true
c.opts.Logger.Debug(context.Background(), "flushed candidates")
}

Expand All @@ -328,6 +345,7 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit {

// AddRemoteCandidate adds a remote candidate to the RTC connection.
func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error {
c.opts.Logger.Debug(context.Background(), "adding remote candidate")
return c.rtc.AddICECandidate(i)
}

Expand Down
4 changes: 2 additions & 2 deletions peer/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ var (
// In CI resources are frequently contended, so increasing this value
// results in less flakes.
if os.Getenv("CI") == "true" {
return 4 * time.Second
return 3 * time.Second
}
return 100 * time.Millisecond
}()
failedTimeout = disconnectedTimeout * 4
failedTimeout = disconnectedTimeout * 3
keepAliveInterval = time.Millisecond * 2

// There's a global race in the vnet library allocation code.
Expand Down

0 comments on commit 30dae97

Please sign in to comment.