diff --git a/containerd/api/grpc/server/server.go b/containerd/api/grpc/server/server.go index ac24bdc4..c45970c8 100644 --- a/containerd/api/grpc/server/server.go +++ b/containerd/api/grpc/server/server.go @@ -154,7 +154,7 @@ func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer } t = from } - events := s.sv.Events.Events(t) + events := s.sv.Events.Events(t, r.StoredOnly, r.Id) defer s.sv.Events.Unsubscribe(events) for e := range events { tsp, err := ptypes.TimestampProto(e.Timestamp) diff --git a/containerd/containerd.go b/containerd/containerd.go index 8507fd05..29089419 100644 --- a/containerd/containerd.go +++ b/containerd/containerd.go @@ -163,7 +163,7 @@ func daemon(sv *supervisor.Supervisor, address string) error { } func namespaceShare(sv *supervisor.Supervisor, namespace, state string) { - events := sv.Events.Events(time.Time{}) + events := sv.Events.Events(time.Time{}, false, "") containerCount := 0 for e := range events { if e.Type == supervisor.EventContainerStart { diff --git a/supervisor/events.go b/supervisor/events.go index 9f505510..56faad86 100644 --- a/supervisor/events.go +++ b/supervisor/events.go @@ -34,16 +34,18 @@ type Event struct { // Events() might be deadlocked type SvEvents struct { - sync.RWMutex - subscribers map[chan Event]struct{} - eventLog []Event + subscriberLock sync.RWMutex + subscribers map[chan Event]struct{} + + eventLog []Event + eventLock sync.Mutex } func (se *SvEvents) setupEventLog(logDir string) error { if err := se.readEventLog(logDir); err != nil { return err } - events := se.Events(time.Time{}) + events := se.Events(time.Time{}, false, "") f, err := os.OpenFile(filepath.Join(logDir, "events.log"), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0755) if err != nil { return err @@ -52,7 +54,9 @@ func (se *SvEvents) setupEventLog(logDir string) error { go func() { for e := range events { glog.Infof("write event log: %v", e) + se.eventLock.Lock() se.eventLog = append(se.eventLog, e) + se.eventLock.Unlock() if err := enc.Encode(e); err != nil { glog.Infof("containerd: fail to write event to journal") } @@ -61,6 +65,7 @@ func (se *SvEvents) setupEventLog(logDir string) error { return nil } +// Note: no locking - don't call after initialization func (se *SvEvents) readEventLog(logDir string) error { f, err := os.Open(filepath.Join(logDir, "events.log")) if err != nil { @@ -86,22 +91,41 @@ func (se *SvEvents) readEventLog(logDir string) error { // Events returns an event channel that external consumers can use to receive updates // on container events -func (se *SvEvents) Events(from time.Time) chan Event { - se.Lock() - defer se.Unlock() +func (se *SvEvents) Events(from time.Time, storedOnly bool, id string) chan Event { c := make(chan Event, defaultEventsBufferSize) - se.subscribers[c] = struct{}{} + + if storedOnly { + defer se.Unsubscribe(c) + } + + // Do not allow the subscriber to unsubscript + se.subscriberLock.Lock() + defer se.subscriberLock.Unlock() + if !from.IsZero() { // replay old event - for _, e := range se.eventLog { + // note: we lock and make a copy of history to avoid blocking + se.eventLock.Lock() + past := se.eventLog[:] + se.eventLock.Unlock() + + for _, e := range past { if e.Timestamp.After(from) { - c <- e + if id == "" || e.ID == id { + c <- e + } } } - // Notify the client that from now on it's live events - c <- Event{ - Type: "live", - Timestamp: time.Now(), + + if storedOnly { + close(c) + } else { + // Notify the client that from now on it's live events + c <- Event{ + Type: "live", + Timestamp: time.Now(), + } + se.subscribers[c] = struct{}{} } } return c @@ -109,24 +133,26 @@ func (se *SvEvents) Events(from time.Time) chan Event { // Unsubscribe removes the provided channel from receiving any more events func (se *SvEvents) Unsubscribe(sub chan Event) { - se.Lock() - defer se.Unlock() - delete(se.subscribers, sub) - close(sub) + se.subscriberLock.Lock() + defer se.subscriberLock.Unlock() + if _, ok := se.subscribers[sub]; ok { + delete(se.subscribers, sub) + close(sub) + } } // notifySubscribers will send the provided event to the external subscribers // of the events channel func (se *SvEvents) notifySubscribers(e Event) { glog.Infof("notifySubscribers: %v", e) - se.RLock() - defer se.RUnlock() + se.subscriberLock.RLock() + defer se.subscriberLock.RUnlock() for sub := range se.subscribers { // do a non-blocking send for the channel select { case sub <- e: default: - glog.Infof("containerd: event not sent to subscriber") + glog.Warningf("containerd: event not sent to subscriber") } } } diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index e981a794..dc9956a3 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -116,7 +116,7 @@ func (sv *Supervisor) getProcess(container, processId string) *Process { } func (sv *Supervisor) reaper() { - events := sv.Events.Events(time.Time{}) + events := sv.Events.Events(time.Time{}, false, "") for e := range events { if e.Type == EventExit { go sv.reap(e.ID, e.PID)