Skip to content

Commit

Permalink
Delegate handler cancel to the pool
Browse files Browse the repository at this point in the history
  • Loading branch information
ghettovoice committed Sep 30, 2021
1 parent c64d104 commit 214556e
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .idea/runConfigurations/go_test_pkg.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions transport/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,11 @@ func (handler *connectionHandler) readConnection() (<-chan sip.Message, <-chan e

go func() {
defer func() {
handler.Cancel()
handler.cancelOnce.Do(func() {
if err := handler.Connection().Close(); err != nil {
handler.Log().Errorf("connection close failed: %s", err)
}
})
prs.Stop()

if !streamed {
Expand Down Expand Up @@ -1019,7 +1023,7 @@ func (handler *connectionHandler) pipeOutputs(msgs <-chan sip.Message, errs <-ch
handler.Log().Trace("passing up connection expiry error...")

select {
case <-handler.canceled:
case <-handler.cancel:
return
case handler.errs <- err:
handler.Log().Trace("connection expiry error passed up")
Expand Down Expand Up @@ -1084,7 +1088,7 @@ func (handler *connectionHandler) pipeOutputs(msgs <-chan sip.Message, errs <-ch

// pass up
select {
case <-handler.canceled:
case <-handler.cancel:
return
case handler.output <- msg:
logger.Trace("SIP message passed up")
Expand Down Expand Up @@ -1119,7 +1123,7 @@ func (handler *connectionHandler) pipeOutputs(msgs <-chan sip.Message, errs <-ch
handler.Log().Trace("passing up error...")

select {
case <-handler.canceled:
case <-handler.cancel:
return
case handler.errs <- err:
handler.Log().Trace("error passed up")
Expand Down
10 changes: 7 additions & 3 deletions transport/listener_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,11 @@ func (handler *listenerHandler) Serve(done func()) {

func (handler *listenerHandler) acceptConnections(wg *sync.WaitGroup, conns chan<- Connection, errs chan<- error) {
defer func() {
handler.Cancel()
handler.cancelOnce.Do(func() {
if err := handler.Listener().Close(); err != nil {
handler.Log().Errorf("close listener failed: %s", err)
}
})
close(conns)
close(errs)

Expand Down Expand Up @@ -791,7 +795,7 @@ func (handler *listenerHandler) pipeOutputs(wg *sync.WaitGroup, conns <-chan Con
logger.Trace("passing up connection...")

select {
case <-handler.canceled:
case <-handler.cancel:
return
case handler.output <- conn:
logger.Trace("connection passed up")
Expand All @@ -817,7 +821,7 @@ func (handler *listenerHandler) pipeOutputs(wg *sync.WaitGroup, conns <-chan Con
handler.Log().Trace("passing up listener error...")

select {
case <-handler.canceled:
case <-handler.cancel:
return
case handler.errs <- err:
handler.Log().Trace("listener error passed up")
Expand Down

0 comments on commit 214556e

Please sign in to comment.