From 4272154ebe9fdd4c99b3fcc91d437d3760ff93e8 Mon Sep 17 00:00:00 2001 From: Will Rouesnel Date: Wed, 26 Oct 2016 02:51:04 +1100 Subject: [PATCH] Improve supervisor eventlog locking and API compliance. Add finer-grained locking to the supervisor eventlog based on the upstream docker containerd code. Adds full compliance for the Event() API by supporting queries for specific container IDs and stored events only, as required by newer Docker daemons in order to recover from hard shutdowns without deadlocking the docker daemon. --- containerd/api/grpc/server/server.go | 2 +- containerd/containerd.go | 2 +- supervisor/events.go | 68 +++++++++++++++++++--------- supervisor/supervisor.go | 2 +- 4 files changed, 50 insertions(+), 24 deletions(-) diff --git a/containerd/api/grpc/server/server.go b/containerd/api/grpc/server/server.go index ac24bdc4..c45970c8 100644 --- a/containerd/api/grpc/server/server.go +++ b/containerd/api/grpc/server/server.go @@ -154,7 +154,7 @@ func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer } t = from } - events := s.sv.Events.Events(t) + events := s.sv.Events.Events(t, r.StoredOnly, r.Id) defer s.sv.Events.Unsubscribe(events) for e := range events { tsp, err := ptypes.TimestampProto(e.Timestamp) diff --git a/containerd/containerd.go b/containerd/containerd.go index 8507fd05..29089419 100644 --- a/containerd/containerd.go +++ b/containerd/containerd.go @@ -163,7 +163,7 @@ func daemon(sv *supervisor.Supervisor, address string) error { } func namespaceShare(sv *supervisor.Supervisor, namespace, state string) { - events := sv.Events.Events(time.Time{}) + events := sv.Events.Events(time.Time{}, false, "") containerCount := 0 for e := range events { if e.Type == supervisor.EventContainerStart { diff --git a/supervisor/events.go b/supervisor/events.go index 9f505510..56faad86 100644 --- a/supervisor/events.go +++ b/supervisor/events.go @@ -34,16 +34,18 @@ type Event struct { // Events() might be deadlocked type SvEvents struct { - sync.RWMutex - subscribers map[chan Event]struct{} - eventLog []Event + subscriberLock sync.RWMutex + subscribers map[chan Event]struct{} + + eventLog []Event + eventLock sync.Mutex } func (se *SvEvents) setupEventLog(logDir string) error { if err := se.readEventLog(logDir); err != nil { return err } - events := se.Events(time.Time{}) + events := se.Events(time.Time{}, false, "") f, err := os.OpenFile(filepath.Join(logDir, "events.log"), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0755) if err != nil { return err @@ -52,7 +54,9 @@ func (se *SvEvents) setupEventLog(logDir string) error { go func() { for e := range events { glog.Infof("write event log: %v", e) + se.eventLock.Lock() se.eventLog = append(se.eventLog, e) + se.eventLock.Unlock() if err := enc.Encode(e); err != nil { glog.Infof("containerd: fail to write event to journal") } @@ -61,6 +65,7 @@ func (se *SvEvents) setupEventLog(logDir string) error { return nil } +// Note: no locking - don't call after initialization func (se *SvEvents) readEventLog(logDir string) error { f, err := os.Open(filepath.Join(logDir, "events.log")) if err != nil { @@ -86,22 +91,41 @@ func (se *SvEvents) readEventLog(logDir string) error { // Events returns an event channel that external consumers can use to receive updates // on container events -func (se *SvEvents) Events(from time.Time) chan Event { - se.Lock() - defer se.Unlock() +func (se *SvEvents) Events(from time.Time, storedOnly bool, id string) chan Event { c := make(chan Event, defaultEventsBufferSize) - se.subscribers[c] = struct{}{} + + if storedOnly { + defer se.Unsubscribe(c) + } + + // Do not allow the subscriber to unsubscript + se.subscriberLock.Lock() + defer se.subscriberLock.Unlock() + if !from.IsZero() { // replay old event - for _, e := range se.eventLog { + // note: we lock and make a copy of history to avoid blocking + se.eventLock.Lock() + past := se.eventLog[:] + se.eventLock.Unlock() + + for _, e := range past { if e.Timestamp.After(from) { - c <- e + if id == "" || e.ID == id { + c <- e + } } } - // Notify the client that from now on it's live events - c <- Event{ - Type: "live", - Timestamp: time.Now(), + + if storedOnly { + close(c) + } else { + // Notify the client that from now on it's live events + c <- Event{ + Type: "live", + Timestamp: time.Now(), + } + se.subscribers[c] = struct{}{} } } return c @@ -109,24 +133,26 @@ func (se *SvEvents) Events(from time.Time) chan Event { // Unsubscribe removes the provided channel from receiving any more events func (se *SvEvents) Unsubscribe(sub chan Event) { - se.Lock() - defer se.Unlock() - delete(se.subscribers, sub) - close(sub) + se.subscriberLock.Lock() + defer se.subscriberLock.Unlock() + if _, ok := se.subscribers[sub]; ok { + delete(se.subscribers, sub) + close(sub) + } } // notifySubscribers will send the provided event to the external subscribers // of the events channel func (se *SvEvents) notifySubscribers(e Event) { glog.Infof("notifySubscribers: %v", e) - se.RLock() - defer se.RUnlock() + se.subscriberLock.RLock() + defer se.subscriberLock.RUnlock() for sub := range se.subscribers { // do a non-blocking send for the channel select { case sub <- e: default: - glog.Infof("containerd: event not sent to subscriber") + glog.Warningf("containerd: event not sent to subscriber") } } } diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index e981a794..dc9956a3 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -116,7 +116,7 @@ func (sv *Supervisor) getProcess(container, processId string) *Process { } func (sv *Supervisor) reaper() { - events := sv.Events.Events(time.Time{}) + events := sv.Events.Events(time.Time{}, false, "") for e := range events { if e.Type == EventExit { go sv.reap(e.ID, e.PID)