Skip to content

Commit

Permalink
event: fix connection leak
Browse files Browse the repository at this point in the history
Make sure we close the connection when disabling event monitoring.

Closes #911.
  • Loading branch information
fsouza committed Aug 19, 2022
1 parent 5c8cf93 commit 61e51a2
Showing 1 changed file with 23 additions and 11 deletions.
34 changes: 23 additions & 11 deletions event.go
Expand Up @@ -93,6 +93,7 @@ type eventMonitoringState struct {
C chan *APIEvents
errC chan error
listeners []chan<- *APIEvents
closeConn func()
}

const (
Expand Down Expand Up @@ -229,6 +230,11 @@ func (eventState *eventMonitoringState) disableEventMonitoring() {
eventState.enabled = false
close(eventState.C)
close(eventState.errC)

if eventState.closeConn != nil {
eventState.closeConn()
eventState.closeConn = nil
}
}
}

Expand Down Expand Up @@ -290,16 +296,19 @@ func (eventState *eventMonitoringState) connectWithRetry(c *Client, opts EventsO
eventChan := eventState.C
errChan := eventState.errC
eventState.RUnlock()
err := c.eventHijack(opts, atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
closeConn, err := c.eventHijack(opts, atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
for ; err != nil && retries < maxMonitorConnRetries; retries++ {
waitTime := int64(retryInitialWaitTime * math.Pow(2, float64(retries)))
time.Sleep(time.Duration(waitTime) * time.Millisecond)
eventState.RLock()
eventChan = eventState.C
errChan = eventState.errC
eventState.RUnlock()
err = c.eventHijack(opts, atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
closeConn, err = c.eventHijack(opts, atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
}
eventState.Lock()
defer eventState.Unlock()
eventState.closeConn = closeConn
return err
}

Expand Down Expand Up @@ -343,7 +352,7 @@ func (eventState *eventMonitoringState) updateLastSeen(e *APIEvents) {
}
}

func (c *Client) eventHijack(opts EventsOptions, startTime int64, eventChan chan *APIEvents, errChan chan error) error {
func (c *Client) eventHijack(opts EventsOptions, startTime int64, eventChan chan *APIEvents, errChan chan error) (closeConn func(), err error) {
// on reconnect override initial Since with last event seen time
if startTime != 0 {
opts.Since = strconv.FormatInt(startTime, 10)
Expand All @@ -356,37 +365,38 @@ func (c *Client) eventHijack(opts EventsOptions, startTime int64, eventChan chan
address = c.endpointURL.Host
}
var dial net.Conn
var err error
if c.TLSConfig == nil {
dial, err = c.Dialer.Dial(protocol, address)
} else {
netDialer, ok := c.Dialer.(*net.Dialer)
if !ok {
return ErrTLSNotSupported
return nil, ErrTLSNotSupported
}
dial, err = tlsDialWithDialer(netDialer, protocol, address, c.TLSConfig)
}
if err != nil {
return err
return nil, err
}
//lint:ignore SA1019 the alternative doesn't quite work, so keep using the deprecated thing.
conn := httputil.NewClientConn(dial, nil)
req, err := http.NewRequest(http.MethodGet, uri, nil)
if err != nil {
return err
return nil, err
}
res, err := conn.Do(req)
if err != nil {
return err
return nil, err
}

keepRunning := int32(1)
//lint:ignore SA1019 the alternative doesn't quite work, so keep using the deprecated thing.
go func(res *http.Response, conn *httputil.ClientConn) {
defer conn.Close()
defer res.Body.Close()
decoder := json.NewDecoder(res.Body)
for {
for atomic.LoadInt32(&keepRunning) == 1 {
var event APIEvents
if err = decoder.Decode(&event); err != nil {
if err := decoder.Decode(&event); err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
c.eventMonitor.RLock()
if c.eventMonitor.enabled && c.eventMonitor.C == eventChan {
Expand All @@ -409,7 +419,9 @@ func (c *Client) eventHijack(opts EventsOptions, startTime int64, eventChan chan
c.eventMonitor.RUnlock()
}
}(res, conn)
return nil
return func() {
atomic.StoreInt32(&keepRunning, 0)
}, nil
}

// transformEvent takes an event and determines what version it is from
Expand Down

0 comments on commit 61e51a2

Please sign in to comment.