Skip to content
This repository has been archived by the owner on Jan 21, 2020. It is now read-only.

Event SPI #424

Merged
merged 25 commits into from Mar 6, 2017
Merged

Event SPI #424

merged 25 commits into from Mar 6, 2017

Conversation

chungers
Copy link
Contributor

@chungers chungers commented Mar 5, 2017

This PR we introduce:

Features

  • A new SPI for events. This allows plugins to act as publisher of events and for clients to subscribes to receiving a continuous stream of events.
  • Event producers produce events at a given topic, eg. compute/instance/create or compute/instance/unhealthy. Subscribers can subscribe to topics at exactly matching path (e.g. compute/instance/unhealthy) or at coarser granularity (e.g. compute/instance) to receive events from the sub-topics.
  • This will be useful for
    • visualizing / monitoring what infrakit plugins are doing in real-time
    • health-checks
    • allow external systems to plug into infrakit by being producers of infrastructure events that infrakit can react to.
  • RPC plugin implemmentations
  • Testing mocks for the event plugin
  • End-to-end tests using event producers and consumers over unix socket

New CLI / Binary

A new subcommand event has been added. The syntax is the same as infrakit metadata. You can ls or tail:

  • ls lists all the event topics from all the plugins that export the Event interface.
  • tail followed by a path will let you watch the event stream in real-time.
    A new binary in examples/event/time demonstrates how to incorporate the Event SPI into your plugin implementation. The example implements
  • Event SPI - where multiple topics are available for subscription -- timers where events are triggered at different time intervals.
  • Metadata SPI - where paths can be read to get current time in unix time seconds or nanoseconds.

Running Examples

  1. Start up the examples/event/time plugin:
~/projects/src/github.com/docker/infrakit$ build/infrakit-event-time 
INFO[0000] Timer starting publish on channel: 0xc420019020 
INFO[0000] Listening at: /Users/davidchung/.infrakit/plugins/event-time 
INFO[0000] PID file at /Users/davidchung/.infrakit/plugins/event-time.pid 

Now run the commands below in another console:

  1. Use the command line:
~/projects/src/github.com/docker/infrakit$ # first list the plugins
~/projects/src/github.com/docker/infrakit$ infrakit plugin ls
NAME                	LISTEN
event-time          	/Users/davidchung/.infrakit/plugins/event-time
~/projects/src/github.com/docker/infrakit$ # now look at the metadata
~/projects/src/github.com/docker/infrakit$ infrakit metadata ls -a
total 6:
event-time/implements/Name
event-time/implements/Version
event-time/revision
event-time/time/now/nano
event-time/time/now/sec
event-time/version
~/projects/src/github.com/docker/infrakit$ # check the current time in nanos
~/projects/src/github.com/docker/infrakit$ infrakit metadata cat event-time/time/now/nano
1488789866632784717
~/projects/src/github.com/docker/infrakit$ # list all the topics exported by the plugins
~/projects/src/github.com/docker/infrakit$ infrakit event ls -a
total 9:
event-time/timer/hr/1
event-time/timer/min/1
event-time/timer/min/30
event-time/timer/min/5
event-time/timer/msec/100
event-time/timer/msec/500
event-time/timer/sec/1
event-time/timer/sec/30
event-time/timer/sec/5
~/projects/src/github.com/docker/infrakit$ # subscribe to 1 sec timers
~/projects/src/github.com/docker/infrakit$ infrakit event tail event-time/timer/sec/1
INFO[0000] Using str://{{.}} for rendering view.        
INFO[0000] Connecting to broker url= unix://event-time topic= timer/sec/1 opts= {/Users/davidchung/.infrakit/plugins /events} 
{timer/sec/1 timer 1sec 2017-03-06 00:45:37.719132916 -0800 PST 2017-03-06 00:45:37.721930887 -0800 PST 1000000000 <nil>}
{timer/sec/1 timer 1sec 2017-03-06 00:45:38.721185759 -0800 PST 2017-03-06 00:45:38.721453717 -0800 PST 1000000000 <nil>}
{timer/sec/1 timer 1sec 2017-03-06 00:45:39.717421175 -0800 PST 2017-03-06 00:45:39.717675721 -0800 PST 1000000000 <nil>}
{timer/sec/1 timer 1sec 2017-03-06 00:45:40.717192456 -0800 PST 2017-03-06 00:45:40.717379058 -0800 PST 1000000000 <nil>}
^C
~/projects/src/github.com/docker/infrakit$ # subscribe all the timers
~/projects/src/github.com/docker/infrakit$ infrakit event tail event-time/timer
INFO[0000] Using str://{{.}} for rendering view.        
INFO[0000] Connecting to broker url= unix://event-time topic= timer opts= {/Users/davidchung/.infrakit/plugins /events} 
{timer/msec/100 timer 100ms 2017-03-06 00:46:11.417528508 -0800 PST 2017-03-06 00:46:11.418699077 -0800 PST 100000000 <nil>}
{timer/msec/100 timer 100ms 2017-03-06 00:46:11.517331452 -0800 PST 2017-03-06 00:46:11.51756402 -0800 PST 100000000 <nil>}
{timer/msec/100 timer 100ms 2017-03-06 00:46:11.619800739 -0800 PST 2017-03-06 00:46:11.620470301 -0800 PST 100000000 <nil>}
{timer/msec/100 timer 100ms 2017-03-06 00:46:11.719664467 -0800 PST 2017-03-06 00:46:11.720045353 -0800 PST 100000000 <nil>}
{timer/sec/5 timer 5s 2017-03-06 00:46:11.719723881 -0800 PST 2017-03-06 00:46:11.720073479 -0800 PST 5000000000 <nil>}
{timer/sec/1 timer 1sec 2017-03-06 00:46:11.719745555 -0800 PST 2017-03-06 00:46:11.720116311 -0800 PST 1000000000 <nil>}
{timer/msec/500 timer 500ms 2017-03-06 00:46:11.719820651 -0800 PST 2017-03-06 00:46:11.720208091 -0800 PST 500000000 <nil>}
{timer/msec/100 timer 100ms 2017-03-06 00:46:11.817113488 -0800 PST 2017-03-06 00:46:11.817318189 -0800 PST 100000000 <nil>}
{timer/msec/100 timer 100ms 2017-03-06 00:46:11.920967283 -0800 PST 2017-03-06 00:46:11.921197552 -0800 PST 100000000 <nil>}
{timer/msec/100 timer 100ms 2017-03-06 00:46:12.020699438 -0800 PST 2017-03-06 00:46:12.020915343 -0800 PST 100000000 <nil>}
{timer/msec/100 timer 100ms 2017-03-06 00:46:12.118330906 -0800 PST 2017-03-06 00:46:12.118554275 -0800 PST 100000000 <nil>}
{timer/msec/100 timer 100ms 2017-03-06 00:46:12.221672931 -0800 PST 2017-03-06 00:46:12.221856288 -0800 PST 100000000 <nil>}
{timer/msec/500 timer 500ms 2017-03-06 00:46:12.221700576 -0800 PST 2017-03-06 00:46:12.221874173 -0800 PST 500000000 <nil>}
{timer/msec/100 timer 100ms 2017-03-06 00:46:12.321611026 -0800 PST 2017-03-06 00:46:12.321861276 -0800 PST 100000000 <nil>}
{timer/msec/100 timer 100ms 2017-03-06 00:46:12.420192906 -0800 PST 2017-03-06 00:46:12.42043839 -0800 PST 100000000 <nil>}
{timer/msec/100 timer 100ms 2017-03-06 00:46:12.520165201 -0800 PST 2017-03-06 00:46:12.520325714 -0800 PST 100000000 <nil>}
{timer/msec/100 timer 100ms 2017-03-06 00:46:12.617592594 -0800 PST 2017-03-06 00:46:12.617813329 -0800 PST 100000000 <nil>}
{timer/msec/100 timer 100ms 2017-03-06 00:46:12.71699105 -0800 PST 2017-03-06 00:46:12.717208461 -0800 PST 100000000 <nil>}
{timer/msec/500 timer 500ms 2017-03-06 00:46:12.717011135 -0800 PST 2017-03-06 00:46:12.717223299 -0800 PST 500000000 <nil>}
{timer/sec/1 timer 1sec 2017-03-06 00:46:12.717020755 -0800 PST 2017-03-06 00:46:12.717258628 -0800 PST 1000000000 <nil>}
^C

The presentation can be customized by the --url flag where template url can be used, or use str:// scheme for the url to inline template text:

~/projects/src/github.com/docker/infrakit$ infrakit event tail event-time/timer/sec/1 --url 'str://{{.Timestamp}},{{.Topic}},{{.ID}}'
INFO[0000] Using str://{{.Timestamp}},{{.Topic}},{{.ID}} for rendering view. 
INFO[0000] Connecting to broker url= unix://event-time topic= timer/sec/1 opts= {/Users/davidchung/.infrakit/plugins /events} 
2017-03-06 00:48:09.719806027 -0800 PST,timer/sec/1,1sec
2017-03-06 00:48:10.719944745 -0800 PST,timer/sec/1,1sec
2017-03-06 00:48:11.721211131 -0800 PST,timer/sec/1,1sec
2017-03-06 00:48:12.72119823 -0800 PST,timer/sec/1,1sec
^C
~/projects/src/github.com/docker/infrakit$ infrakit event tail event-time/timer/sec/1 --url 'str://{{.|jsonEncode}}'
INFO[0000] Using str://{{.|jsonEncode}} for rendering view. 
INFO[0000] Connecting to broker url= unix://event-time topic= timer/sec/1 opts= {/Users/davidchung/.infrakit/plugins /events} 
{
  "Topic": [
    "timer",
    "sec",
    "1"
  ],
  "Type": "timer",
  "ID": "1sec",
  "Timestamp": "2017-03-06T00:48:27.719566664-08:00",
  "Received": "2017-03-06T00:48:27.723088886-08:00",
  "Data": 1000000000
}
{
  "Topic": [
    "timer",
    "sec",
    "1"
  ],
  "Type": "timer",
  "ID": "1sec",
  "Timestamp": "2017-03-06T00:48:28.719978028-08:00",
  "Received": "2017-03-06T00:48:28.720249738-08:00",
  "Data": 1000000000
}
^C

David Chung added 13 commits February 2, 2017 19:23
…e#382)

Signed-off-by: David Chung <david.chung@docker.com>
Signed-off-by: David Chung <david.chung@docker.com>
Signed-off-by: David Chung <david.chung@docker.com>
Signed-off-by: David Chung <david.chung@docker.com>
Signed-off-by: David Chung <david.chung@docker.com>
Signed-off-by: David Chung <david.chung@docker.com>
…mentations

Signed-off-by: David Chung <david.chung@docker.com>
@GordonTheTurtle
Copy link

Please sign your commits following these rules:
https://github.com/docker/docker/blob/master/CONTRIBUTING.md#sign-your-work
The easiest way to do this is to amend the last commit:

$ git clone -b "group2-fsm" git@github.com:chungers/infrakit.git somewhere
$ cd somewhere
$ git rebase -i HEAD~842354411592
editor opens
change each 'pick' to 'edit'
save the file and quit
$ git commit --amend -s --no-edit
$ git rebase --continue # and repeat the amend for each commit
$ git push -f

Amending updates the existing PR. You DO NOT need to open a new one.

@chungers chungers added this to the v0.5 milestone Mar 5, 2017
Signed-off-by: David Chung <david.chung@docker.com>
David Chung added 8 commits March 5, 2017 00:44
Signed-off-by: David Chung <david.chung@docker.com>
Signed-off-by: David Chung <david.chung@docker.com>
Signed-off-by: David Chung <david.chung@docker.com>
Signed-off-by: David Chung <david.chung@docker.com>
Signed-off-by: David Chung <david.chung@docker.com>
Signed-off-by: David Chung <david.chung@docker.com>
Signed-off-by: David Chung <david.chung@docker.com>
@@ -140,6 +162,9 @@ func (b *Broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
func (b *Broker) run() {
for {
select {
case <-b.finish:
log.Infoln("Broker finished")
return

case subscription := <-b.newClients:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tested this PR with the code below.

func main() {
        client, _ := rpc.NewClient("/root/.infrakit/plugins/event")
        topics, _ := client.Topics()
        fmt.Printf("%v \n", topics)
        sub, _ := client.(spi.Subscriber)
        typed, err := sub.SubscribeOn(spi.NewTopic("timer/sec/1"))
        if err != nil {
                fmt.Errorf("%v \n", err)
        }
        for {
                select {
                case ret := <-typed:
                        fmt.Printf("%v \n", ret)
                }
        }
}

Then I found a problem that only one client can subscribe in one topic.
When a new client starts subscribing to a topic that another client already subscribes to, the old client will not receive any messages.
The cause of this problem is thought to be because Broker uses radix's insert, so if you try to add a new client with the same topic, old client will be overwritten.

Is this a specification?
If so, the old client's session should be closed. If not, we need to fix.
Perhaps it would be better to discuss this separately from this PR. In that case open a new issue and consider a patch.
What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I will fix this before merging it in. Thank you!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed this and a test case now has two clients subscribing to the same topic.

Short: "Timer event plugin",
}

name := cmd.Flags().String("name", "time", "Plugin name to advertise for discovery")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a trivial thing, I think that it is better to name it event-time in order to keep it in balance with other examples.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. Thanks!

David Chung added 2 commits March 6, 2017 00:16
Signed-off-by: David Chung <david.chung@docker.com>
Signed-off-by: David Chung <david.chung@docker.com>
…in events

Signed-off-by: David Chung <david.chung@docker.com>
@chungers chungers merged commit 83ffc26 into docker-archive:master Mar 6, 2017
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants