Skip to content
This repository was archived by the owner on Feb 8, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion containerd/api/grpc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer
}
t = from
}
events := s.sv.Events.Events(t)
events := s.sv.Events.Events(t, r.StoredOnly, r.Id)
defer s.sv.Events.Unsubscribe(events)
for e := range events {
tsp, err := ptypes.TimestampProto(e.Timestamp)
Expand Down
2 changes: 1 addition & 1 deletion containerd/containerd.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func daemon(sv *supervisor.Supervisor, address string) error {
}

func namespaceShare(sv *supervisor.Supervisor, namespace, state string) {
events := sv.Events.Events(time.Time{})
events := sv.Events.Events(time.Time{}, false, "")
containerCount := 0
for e := range events {
if e.Type == supervisor.EventContainerStart {
Expand Down
68 changes: 47 additions & 21 deletions supervisor/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,18 @@ type Event struct {
// Events() might be deadlocked

type SvEvents struct {
sync.RWMutex
subscribers map[chan Event]struct{}
eventLog []Event
subscriberLock sync.RWMutex
subscribers map[chan Event]struct{}

eventLog []Event
eventLock sync.Mutex
}

func (se *SvEvents) setupEventLog(logDir string) error {
if err := se.readEventLog(logDir); err != nil {
return err
}
events := se.Events(time.Time{})
events := se.Events(time.Time{}, false, "")
f, err := os.OpenFile(filepath.Join(logDir, "events.log"), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0755)
if err != nil {
return err
Expand All @@ -52,7 +54,9 @@ func (se *SvEvents) setupEventLog(logDir string) error {
go func() {
for e := range events {
glog.Infof("write event log: %v", e)
se.eventLock.Lock()
se.eventLog = append(se.eventLog, e)
se.eventLock.Unlock()
if err := enc.Encode(e); err != nil {
glog.Infof("containerd: fail to write event to journal")
}
Expand All @@ -61,6 +65,7 @@ func (se *SvEvents) setupEventLog(logDir string) error {
return nil
}

// Note: no locking - don't call after initialization
func (se *SvEvents) readEventLog(logDir string) error {
f, err := os.Open(filepath.Join(logDir, "events.log"))
if err != nil {
Expand All @@ -86,47 +91,68 @@ func (se *SvEvents) readEventLog(logDir string) error {

// Events returns an event channel that external consumers can use to receive updates
// on container events
func (se *SvEvents) Events(from time.Time) chan Event {
se.Lock()
defer se.Unlock()
func (se *SvEvents) Events(from time.Time, storedOnly bool, id string) chan Event {
c := make(chan Event, defaultEventsBufferSize)
se.subscribers[c] = struct{}{}

if storedOnly {
defer se.Unsubscribe(c)
}

// Do not allow the subscriber to unsubscript
se.subscriberLock.Lock()
defer se.subscriberLock.Unlock()

if !from.IsZero() {
// replay old event
for _, e := range se.eventLog {
// note: we lock and make a copy of history to avoid blocking
se.eventLock.Lock()
past := se.eventLog[:]
se.eventLock.Unlock()

for _, e := range past {
if e.Timestamp.After(from) {
c <- e
if id == "" || e.ID == id {
c <- e
}
}
}
// Notify the client that from now on it's live events
c <- Event{
Type: "live",
Timestamp: time.Now(),

if storedOnly {
close(c)
} else {
// Notify the client that from now on it's live events
c <- Event{
Type: "live",
Timestamp: time.Now(),
}
se.subscribers[c] = struct{}{}
}
}
return c
}

// Unsubscribe removes the provided channel from receiving any more events
func (se *SvEvents) Unsubscribe(sub chan Event) {
se.Lock()
defer se.Unlock()
delete(se.subscribers, sub)
close(sub)
se.subscriberLock.Lock()
defer se.subscriberLock.Unlock()
if _, ok := se.subscribers[sub]; ok {
delete(se.subscribers, sub)
close(sub)
}
}

// notifySubscribers will send the provided event to the external subscribers
// of the events channel
func (se *SvEvents) notifySubscribers(e Event) {
glog.Infof("notifySubscribers: %v", e)
se.RLock()
defer se.RUnlock()
se.subscriberLock.RLock()
defer se.subscriberLock.RUnlock()
for sub := range se.subscribers {
// do a non-blocking send for the channel
select {
case sub <- e:
default:
glog.Infof("containerd: event not sent to subscriber")
glog.Warningf("containerd: event not sent to subscriber")
}
}
}
2 changes: 1 addition & 1 deletion supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (sv *Supervisor) getProcess(container, processId string) *Process {
}

func (sv *Supervisor) reaper() {
events := sv.Events.Events(time.Time{})
events := sv.Events.Events(time.Time{}, false, "")
for e := range events {
if e.Type == EventExit {
go sv.reap(e.ID, e.PID)
Expand Down