Skip to content

Commit

Permalink
Fix a goroutine leak in api-server application.PodLogs and applicatio…
Browse files Browse the repository at this point in the history
…n.Watch (#1292)
  • Loading branch information
jessesuen committed Mar 19, 2019
1 parent b60067a commit ea1519d
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 9 deletions.
31 changes: 23 additions & 8 deletions server/application/application.go
Expand Up @@ -433,6 +433,11 @@ func (s *Server) Watch(q *ApplicationQuery, ws ApplicationService_WatchServer) e
if err != nil {
return err
}
defer w.Stop()
logCtx := log.NewEntry(log.New())
if q.Name != nil {
logCtx = logCtx.WithField("application", *q.Name)
}
claims := ws.Context().Value("claims")
done := make(chan bool)
go func() {
Expand All @@ -448,17 +453,17 @@ func (s *Server) Watch(q *ApplicationQuery, ws ApplicationService_WatchServer) e
Application: a,
})
if err != nil {
log.Warnf("Unable to send stream message: %v", err)
logCtx.Warnf("Unable to send stream message: %v", err)
}
}
}
done <- true
logCtx.Info("k8s application watch event channel closed")
close(done)
}()
select {
case <-ws.Context().Done():
w.Stop()
logCtx.Info("client watch grpc context closed")
case <-done:
w.Stop()
}
return nil
}
Expand Down Expand Up @@ -723,7 +728,10 @@ func (s *Server) PodLogs(q *ApplicationPodLogsQuery, ws ApplicationService_PodLo
if err != nil {
return err
}
logCtx := log.WithField("application", q.Name)
defer util.Close(stream)
done := make(chan bool)
gracefulExit := false
go func() {
scanner := bufio.NewScanner(stream)
for scanner.Scan() {
Expand All @@ -740,18 +748,25 @@ func (s *Server) PodLogs(q *ApplicationPodLogsQuery, ws ApplicationService_PodLo
TimeStamp: metaLogTime,
})
if err != nil {
log.Warnf("Unable to send stream message: %v", err)
logCtx.Warnf("Unable to send stream message: %v", err)
}
}
}
}
}

done <- true
if gracefulExit {
logCtx.Info("k8s pod logs scanner completed due to closed grpc context")
} else if err := scanner.Err(); err != nil {
logCtx.Warnf("k8s pod logs scanner failed with error: %v", err)
} else {
logCtx.Info("k8s pod logs scanner completed with EOF")
}
close(done)
}()
select {
case <-ws.Context().Done():
util.Close(stream)
logCtx.Info("client pod logs grpc context closed")
gracefulExit = true
case <-done:
}
return nil
Expand Down
4 changes: 3 additions & 1 deletion util/util.go
Expand Up @@ -33,7 +33,9 @@ type Closer interface {
// Close is a convenience function to close a object that has a Close() method, ignoring any errors
// Used to satisfy errcheck lint
func Close(c Closer) {
_ = c.Close()
if err := c.Close(); err != nil {
log.Warnf("failed to close %v: %v", c, err)
}
}

// DeleteFile is best effort deletion of a file
Expand Down

0 comments on commit ea1519d

Please sign in to comment.