Skip to content

Loading…

support listeners for events #72

Merged
merged 4 commits into from

6 participants

@jhulten

Change adds two methods to client:

  • AddEventListener adds a channel to receive *APIEvents If this is the first listener, event monitoring is started.
  • RemoveEventListener causes a channel to no longer receive *APIEvents If this is the last listener, event monitoring is stopped.
Jeffrey Hulten support listeners for events
Change adds two methods to client:

- AddEventListener adds a channel to receive *APIEvents
  If this is the first listener, event monitoring is started.
- RemoveEventListener causes a channel to no longer receive *APIEvents
  If this is the last listener, event monitoring is stopped.
479c4e2
@jhulten

Addresses #66

/cc @fsouza

@cheneydeng

@jhulten hey man, are you still working on this feature?The CI still failed.
this is the feature which i need, please fix this :-) thank you very much,if you have no time recently,i think you can cancel this pr and i'm glad to contribute on it.

@jhulten

I will work on it today.

Jeffrey Hulten added some commits
@jhulten

There we go. I think I got it all. Quite the learning experience.

/cc @fsouza Ready to merge.

@cheneydeng

@fsouza
hey man ,how is the progress about this usefull feature? Please give a review on it,thanks :-)

@jhulten

Ping @fsouza... Is something blocking accepting this PR?

@andrewsmedina
Collaborator

LGTM

@benmccann benmccann referenced this pull request in flynn-archive/flynn-host
Closed

Use upstream go-dockerclient #4

@benmccann

It will be great to have have event support. Flynn forked go-dockerclient and added an event stream:
flynn-archive@eb04377

I'd like to get Flynn using the upstream go/dockerclient again, so hopefully that would be possible after getting event support here.

@fsouza
Owner

Thanks @benmccann.

O liked flynn implementation. What about using it, @jhulten? We would need some tests, of course, but it looks good and simpler.

What do you think?

@andrewsmedina
Collaborator

I liked the flynn implementation but it needs some tests.

@jhulten

Either way.

@fsouza
Owner

@jhulten do you want to do this?

@jhulten

I can take a look at submitting a new PR with tests.

The only thing I see I cover in my code that is not covered here is reconnect logic on failure. @benmccann do you have thoughts on this?

@benmccann

No thoughts from me. I just noticed code in both repos that looked similar, but I didn't write any of it

@cheneydeng

@fsouza @jhulten
what's wrong with this pr?I'm waiting for using that in my project,will this be merged or closed?

@srid
@fsouza fsouza merged commit 7de6875 into fsouza:master

1 check passed

Details default The Travis CI build passed
@fsouza
Owner

It was missing docs and the proper code style, but I've fixed these issues and merged it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Feb 21, 2014
  1. support listeners for events

    Jeffrey Hulten committed
    Change adds two methods to client:
    
    - AddEventListener adds a channel to receive *APIEvents
      If this is the first listener, event monitoring is started.
    - RemoveEventListener causes a channel to no longer receive *APIEvents
      If this is the last listener, event monitoring is stopped.
Commits on Feb 22, 2014
  1. correct logic error on disabling monitoring

    Jeffrey Hulten committed
Commits on Feb 24, 2014
  1. fixed some race conditions

    Jeffrey Hulten committed
  2. caught closed chanel

    Jeffrey Hulten committed
Showing with 504 additions and 4 deletions.
  1. +1 −0 AUTHORS
  2. +326 −0 event.go
  3. +92 −0 event_test.go
  4. +37 −1 example_test.go
  5. +48 −3 testing/server.go
View
1 AUTHORS
@@ -11,3 +11,4 @@ Philippe Lafoucrière <philippe.lafoucriere@tech-angels.com>
Sridhar Ratnakumar <sridharr@activestate.com>
Tim Schindler <tim@catalyst-zero.com>
Wiliam Souza <wiliamsouza83@gmail.com>
+Jeffrey Hulten <jhulten@gmail.com>
View
326 event.go
@@ -0,0 +1,326 @@
+// Copyright 2013 go-dockerclient authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package docker
+
+import (
+ "bufio"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "math"
+ "net"
+ "net/http"
+ "net/http/httputil"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+type APIEvents struct {
+ Status string
+ ID string
+ From string
+ Time int64
+}
+
+type eventMonitoringState struct {
+ sync.RWMutex
+ sync.WaitGroup
+ enabled bool
+ lastSeen *int64
+ C chan *APIEvents
+ errC chan error
+ listeners []chan *APIEvents
+}
+
+// event monitoring state is singleton
+var eventMonitor eventMonitoringState
+
+var maxMonitorConnRetries = 5
+var retryInitialWaitTime = float64(10)
+
+var ErrNoListeners = errors.New("No listeners present to recieve event")
+var ErrListenerExists = errors.New("Listener already exists for docker events")
+
+func (c *Client) AddEventListener(listener chan *APIEvents) error {
+ var err error
+ if !eventMonitor.isEnabled() {
+ err = eventMonitor.enableEventMonitoring(c)
+ if err != nil {
+ return err
+ }
+ }
+ err = eventMonitor.addListener(listener)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (c *Client) RemoveEventListener(listener chan *APIEvents) error {
+ err := eventMonitor.removeListener(listener)
+ if err != nil {
+ return err
+ }
+
+ if len(eventMonitor.listeners) == 0 {
+ err = eventMonitor.disableEventMonitoring()
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (eventState *eventMonitoringState) addListener(listener chan *APIEvents) error {
+
+ // lock to mutate internal state
+ eventState.Lock()
+ defer eventState.Unlock()
+
+ // return error if listener is in pool
+ if listenerExists(listener, &eventState.listeners) {
+ return ErrListenerExists
+ }
+
+ // add to waitgroup for listeners
+ eventState.Add(1)
+ // add listener to list
+ eventState.listeners = append(eventState.listeners, listener)
+ return nil
+}
+
+func (eventState *eventMonitoringState) removeListener(listener chan *APIEvents) error {
+
+ // lock to mutate internal state
+ eventState.Lock()
+ defer eventState.Unlock()
+
+ if listenerExists(listener, &eventState.listeners) {
+ // placeholder for new listener list
+ var newListeners []chan *APIEvents
+
+ // iterate on existing listeners, only adding non-matching listeners to new list
+ for _, l := range eventState.listeners {
+ if l != listener {
+ newListeners = append(newListeners, l)
+ }
+ }
+
+ // update listener list
+ eventState.listeners = newListeners
+
+ // release listener from waitgroup
+ eventState.Add(-1)
+ }
+ return nil
+}
+
+func listenerExists(a chan *APIEvents, list *[]chan *APIEvents) bool {
+ for _, b := range *list {
+ if b == a {
+ return true
+ }
+ }
+ return false
+}
+
+func (eventState *eventMonitoringState) enableEventMonitoring(c *Client) error {
+ // lock to mutate internal state
+ eventState.Lock()
+ defer eventState.Unlock()
+
+ // if event monitoring is disabled, initialize it and start monitoring
+ if !eventState.enabled {
+ eventState.enabled = true
+ var lastSeenDefault = int64(0)
+ eventState.lastSeen = &lastSeenDefault
+ eventState.C = make(chan *APIEvents, 100)
+ eventState.errC = make(chan error, 1)
+ go eventState.monitorEvents(c)
+ }
+ return nil
+}
+
+func (eventState *eventMonitoringState) disableEventMonitoring() error {
+
+ // Wait until all sendEvents are finished
+ eventState.Wait()
+
+ // lock to mutate internal state
+ eventState.Lock()
+ defer eventState.Unlock()
+
+ // if event monitoring is enables, close the channels
+ if eventState.enabled {
+ eventState.enabled = false
+ close(eventState.C)
+ close(eventState.errC)
+ }
+ return nil
+}
+
+func (eventState *eventMonitoringState) monitorEvents(c *Client) {
+
+ var err error
+
+ // wait for first listener
+ for eventState.noListeners() {
+ time.Sleep(10 * time.Millisecond)
+ }
+
+ if err = eventState.connectWithRetry(c); err != nil {
+ eventState.terminate(err)
+ }
+
+ for eventState.isEnabled() {
+ timeout := time.After(100 * time.Millisecond)
+ select {
+ case ev, ok := <-eventState.C:
+ if !ok {
+ // channel has been closed, exiting
+ return
+ }
+ // send the event
+ go eventState.sendEvent(ev)
+
+ // update lastSeen
+ go eventState.updateLastSeen(ev)
+
+ case err = <-eventState.errC:
+ if err == ErrNoListeners {
+ // if there are no listeners, exit normally
+ eventState.terminate(nil)
+ return
+ } else if err != nil {
+ // otherwise, trigger a restart via the error channel
+ defer func() { go eventState.monitorEvents(c) }()
+ return
+ }
+ case <-timeout:
+ continue
+ }
+ }
+}
+
+func (eventState *eventMonitoringState) connectWithRetry(c *Client) error {
+ var retries int
+ var err error
+ for err = c.eventHijack(atomic.LoadInt64(eventState.lastSeen), eventState.C, eventState.errC); err != nil && retries < maxMonitorConnRetries; retries++ {
+ waitTime := int64(retryInitialWaitTime * math.Pow(2, float64(retries)))
+ time.Sleep(time.Duration(waitTime) * time.Millisecond)
+ err = c.eventHijack(atomic.LoadInt64(eventState.lastSeen), eventState.C, eventState.errC)
+ }
+ return err
+}
+
+func (eventState *eventMonitoringState) noListeners() bool {
+ eventState.RLock()
+ defer eventState.RUnlock()
+ return len(eventState.listeners) == 0
+}
+
+func (eventState *eventMonitoringState) isEnabled() bool {
+ eventState.RLock()
+ defer eventState.RUnlock()
+ return eventState.enabled
+}
+
+func (eventState *eventMonitoringState) sendEvent(event *APIEvents) {
+
+ // ensure the listener list doesn't change out from under us
+ eventState.RLock()
+ defer eventState.RUnlock()
+
+ // add to waitgroup to make sure we don't close prematurely
+ eventState.Add(1)
+ defer eventState.Done()
+
+ if eventState.isEnabled() {
+ if eventState.noListeners() {
+ eventState.errC <- ErrNoListeners
+ }
+ for _, listener := range eventState.listeners {
+ listener <- event
+ }
+ }
+}
+
+func (eventState *eventMonitoringState) updateLastSeen(e *APIEvents) {
+ eventState.Lock()
+ defer eventState.Unlock()
+ if atomic.LoadInt64(eventState.lastSeen) < e.Time {
+ atomic.StoreInt64(eventState.lastSeen, e.Time)
+ }
+}
+
+func (eventState *eventMonitoringState) terminate(err error) {
+ eventState.disableEventMonitoring()
+}
+
+func (c *Client) eventHijack(startTime int64, eventChan chan *APIEvents, errChan chan error) error {
+
+ uri := "/events"
+
+ if startTime != 0 {
+ uri += fmt.Sprintf("?since=%d", startTime)
+ }
+
+ req, err := http.NewRequest("GET", c.getURL(uri), nil)
+ if err != nil {
+ return err
+ }
+
+ req.Header.Set("Content-Type", "plain/text")
+ protocol := c.endpointURL.Scheme
+ address := c.endpointURL.Path
+ if protocol != "unix" {
+ protocol = "tcp"
+ address = c.endpointURL.Host
+ }
+
+ dial, err := net.Dial(protocol, address)
+ if err != nil {
+ return err
+ }
+
+ clientconn := httputil.NewClientConn(dial, nil)
+ clientconn.Do(req)
+
+ conn, rwc := clientconn.Hijack()
+ if err != nil {
+ return err
+ }
+
+ go func(rwc io.Reader) {
+
+ defer clientconn.Close()
+ defer conn.Close()
+
+ scanner := bufio.NewScanner(rwc)
+ for scanner.Scan() {
+ line := scanner.Text()
+
+ // Only pay attention to lines that start as json objects
+ if strings.HasPrefix(line, "{") {
+ var e APIEvents
+ err = json.Unmarshal([]byte(line), &e)
+ if err != nil {
+ errChan <- err
+ }
+ eventChan <- &e
+ }
+
+ }
+ if err := scanner.Err(); err != nil {
+ errChan <- err
+ }
+ }(rwc)
+
+ return nil
+}
View
92 event_test.go
@@ -0,0 +1,92 @@
+// Copyright 2013 go-dockerclient authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package docker
+
+import (
+ "bufio"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "strings"
+ "testing"
+ "time"
+)
+
+func TestEventListeners(t *testing.T) {
+ response := `{"status":"create","id":"dfdf82bd3881","from":"base:latest","time":1374067924}
+{"status":"start","id":"dfdf82bd3881","from":"base:latest","time":1374067924}
+{"status":"stop","id":"dfdf82bd3881","from":"base:latest","time":1374067966}
+{"status":"destroy","id":"dfdf82bd3881","from":"base:latest","time":1374067970}
+`
+
+ var req http.Request
+ server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ rsc := bufio.NewScanner(strings.NewReader(response))
+ for rsc.Scan() {
+ w.Write([]byte(rsc.Text()))
+ w.(http.Flusher).Flush()
+ time.Sleep(10 * time.Millisecond)
+ }
+ req = *r
+ }))
+ defer server.Close()
+
+ client, err := NewClient(server.URL)
+ if err != nil {
+ t.Errorf("Failed to create client: %s", err)
+ }
+
+ listener := make(chan *APIEvents, 10)
+ defer func() { time.Sleep(10 * time.Millisecond); client.RemoveEventListener(listener) }()
+
+ err = client.AddEventListener(listener)
+ if err != nil {
+ t.Errorf("Failed to add event listener: %s", err)
+ }
+
+ timeout := time.After(1 * time.Second)
+ var count int
+
+ for {
+ select {
+ case msg := <-listener:
+ t.Logf("Recieved: %s", *msg)
+ count++
+ err = checkEvent(count, msg)
+ if err != nil {
+ t.Fatalf("Check event failed: %s", err)
+ }
+ if count == 4 {
+ return
+ }
+ case <-timeout:
+ t.Fatal("TestAddEventListener timed out waiting on events")
+ }
+ }
+}
+
+func checkEvent(index int, event *APIEvents) error {
+ if event.ID != "dfdf82bd3881" {
+ return fmt.Errorf("event ID did not match. Expected dfdf82bd3881 got %s", event.ID)
+ }
+ if event.From != "base:latest" {
+ return fmt.Errorf("event from did not match. Expected base:latest got %s", event.From)
+ }
+ var status string
+ switch index {
+ case 1:
+ status = "create"
+ case 2:
+ status = "start"
+ case 3:
+ status = "stop"
+ case 4:
+ status = "destroy"
+ }
+ if event.Status != status {
+ return fmt.Errorf("event status did not match. Expected %s got %s", status, event.Status)
+ }
+ return nil
+}
View
38 example_test.go
@@ -7,10 +7,11 @@ package docker_test
import (
"archive/tar"
"bytes"
- "github.com/fsouza/go-dockerclient"
"io"
"log"
"time"
+
+ "github.com/fsouza/go-dockerclient"
)
func ExampleClient_AttachToContainer() {
@@ -80,6 +81,7 @@ func ExampleClient_BuildImage() {
if err != nil {
log.Fatal(err)
}
+
t := time.Now()
inputbuf, outputbuf := bytes.NewBuffer(nil), bytes.NewBuffer(nil)
tr := tar.NewWriter(inputbuf)
@@ -97,3 +99,37 @@ func ExampleClient_BuildImage() {
log.Println("build image success, imageid:", imageid)
}
}
+
+func ExampleClient_ListenEvents() {
+ client, err := docker.NewClient("http://localhost:4243")
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ listener := make(chan *docker.APIEvents)
+ err = client.AddEventListener(listener)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ defer func() {
+
+ err = client.RemoveEventListener(listener)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ }()
+
+ timeout := time.After(1 * time.Second)
+
+ for {
+ select {
+ case msg := <-listener:
+ log.Println(msg)
+ case <-timeout:
+ break
+ }
+ }
+
+}
View
51 testing/server.go
@@ -12,9 +12,6 @@ import (
"encoding/json"
"errors"
"fmt"
- "github.com/fsouza/go-dockerclient"
- "github.com/fsouza/go-dockerclient/utils"
- "github.com/gorilla/mux"
mathrand "math/rand"
"net"
"net/http"
@@ -22,6 +19,10 @@ import (
"strings"
"sync"
"time"
+
+ "github.com/fsouza/go-dockerclient"
+ "github.com/fsouza/go-dockerclient/utils"
+ "github.com/gorilla/mux"
)
// DockerServer represents a programmable, concurrent (not much), HTTP server
@@ -73,6 +74,7 @@ func (s *DockerServer) buildMuxer() {
s.mux.Path("/images/json").Methods("GET").HandlerFunc(s.listImages)
s.mux.Path("/images/{id:.*}").Methods("DELETE").HandlerFunc(s.removeImage)
s.mux.Path("/images/{name:.*}/push").Methods("POST").HandlerFunc(s.pushImage)
+ s.mux.Path("/events").Methods("GET").HandlerFunc(s.listEvents)
}
// Stop stops the server.
@@ -442,3 +444,46 @@ func (s *DockerServer) removeImage(w http.ResponseWriter, r *http.Request) {
s.images[index] = s.images[len(s.images)-1]
s.images = s.images[:len(s.images)-1]
}
+
+func (s *DockerServer) listEvents(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "application/json")
+
+ var events [][]byte
+ count := mathrand.Intn(20)
+ for i := 0; i < count; i++ {
+ data, err := json.Marshal(s.generateEvent())
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
+ events = append(events, data)
+ }
+
+ w.WriteHeader(http.StatusOK)
+
+ for _, d := range events {
+ fmt.Fprintln(w, d)
+ time.Sleep(time.Duration(mathrand.Intn(200)) * time.Millisecond)
+ }
+}
+
+func (s *DockerServer) generateEvent() *docker.APIEvents {
+ var eventType string
+
+ switch mathrand.Intn(4) {
+ case 0:
+ eventType = "create"
+ case 1:
+ eventType = "start"
+ case 2:
+ eventType = "stop"
+ case 3:
+ eventType = "destroy"
+ }
+
+ return &docker.APIEvents{
+ ID: s.generateID(),
+ Status: eventType,
+ From: "mybase:latest",
+ Time: time.Now().Unix()}
+}
Something went wrong with that request. Please try again.