Permalink
Browse files

Integrate NATS with event subsystem

Signed-off-by: Kenfe-Mickael Laventure <mickael.laventure@gmail.com>
  • Loading branch information...
1 parent 934940a commit aa5ff88bbc669ef04f7dec160d579acb122f59bd @mlaventure mlaventure committed Dec 12, 2016
Showing with 165 additions and 68 deletions.
  1. +67 −24 cmd/containerd/main.go
  2. +1 −1 cmd/ctr/main.go
  3. +31 −0 events/nats.go
  4. +11 −7 events/poster.go
  5. +7 −5 events/transaction.go
  6. +7 −4 execution/events.go
  7. +3 −1 execution/executors/oci/oci.go
  8. +19 −0 execution/log.go
  9. +18 −25 execution/service.go
  10. +1 −1 log/context.go
@@ -13,14 +13,17 @@ import (
"strings"
"syscall"
+ gocontext "golang.org/x/net/context"
"google.golang.org/grpc"
- "github.com/Sirupsen/logrus"
"github.com/docker/containerd"
api "github.com/docker/containerd/api/execution"
+ "github.com/docker/containerd/events"
"github.com/docker/containerd/execution"
"github.com/docker/containerd/execution/executors/oci"
+ "github.com/docker/containerd/log"
metrics "github.com/docker/go-metrics"
+ "github.com/sirupsen/logrus"
"github.com/urfave/cli"
"github.com/nats-io/go-nats"
@@ -85,22 +88,10 @@ high performance container runtime
go serveMetrics(address)
}
- eventsURL, err := url.Parse(context.GlobalString("events-address"))
+ s, err := startNATSServer(context)
if err != nil {
- return err
- }
-
- no := stand.DefaultNatsServerOptions
- nOpts := &no
- nOpts.NoSigs = true
- parts := strings.Split(eventsURL.Host, ":")
- nOpts.Host = parts[0]
- if len(parts) == 2 {
- nOpts.Port, err = strconv.Atoi(parts[1])
- } else {
- nOpts.Port = nats.DefaultPort
+ return nil
}
- s := stand.RunServerWithOpts(nil, nOpts)
defer s.Shutdown()
path := context.GlobalString("socket")
@@ -121,24 +112,31 @@ high performance container runtime
}
}
- // Start events listener
- nc, err := nats.Connect(context.GlobalString("events-address"))
+ // Get events publisher
+ nec, err := getNATSPublisher(context)
if err != nil {
return err
}
- nec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
- if err != nil {
- nc.Close()
- return err
- }
defer nec.Close()
- execService, err := execution.New(executor, nec)
+ execService, err := execution.New(executor)
if err != nil {
return err
}
- server := grpc.NewServer()
+ // Intercept the GRPC call in order to populate the correct module path
+ interceptor := func(ctx gocontext.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+ ctx = log.WithModule(ctx, "containerd")
+ switch info.Server.(type) {
+ case api.ExecutionServiceServer:
+ ctx = log.WithModule(ctx, "execution")
+ ctx = events.WithPoster(ctx, events.GetNATSPoster(nec))
+ default:
+ fmt.Println("Unknown type: %#v", info.Server)
+ }
+ return handler(ctx, req)
+ }
+ server := grpc.NewServer(grpc.UnaryInterceptor(interceptor))
api.RegisterExecutionServiceServer(server, execService)
go serveGRPC(server, l)
@@ -201,3 +199,48 @@ func dumpStacks() {
buf = buf[:stackSize]
logrus.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
}
+
+func startNATSServer(context *cli.Context) (e *stand.StanServer, err error) {
+ eventsURL, err := url.Parse(context.GlobalString("events-address"))
+ if err != nil {
+ return nil, err
+ }
+
+ no := stand.DefaultNatsServerOptions
+ nOpts := &no
+ nOpts.NoSigs = true
+ parts := strings.Split(eventsURL.Host, ":")
+ nOpts.Host = parts[0]
+ if len(parts) == 2 {
+ nOpts.Port, err = strconv.Atoi(parts[1])
+ } else {
+ nOpts.Port = nats.DefaultPort
+ }
+ defer func() {
+ if r := recover(); r != nil {
+ e = nil
+ if _, ok := r.(error); !ok {
+ err = fmt.Errorf("failed to start NATS server: %v", r)
+ } else {
+ err = r.(error)
+ }
+ }
+ }()
+ s := stand.RunServerWithOpts(nil, nOpts)
+
+ return s, nil
+}
+
+func getNATSPublisher(context *cli.Context) (*nats.EncodedConn, error) {
+ nc, err := nats.Connect(context.GlobalString("events-address"))
+ if err != nil {
+ return nil, err
+ }
+ nec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
+ if err != nil {
+ nc.Close()
+ return nil, err
+ }
+
+ return nec, nil
+}
View
@@ -4,8 +4,8 @@ import (
"fmt"
"os"
- "github.com/Sirupsen/logrus"
"github.com/docker/containerd"
+ "github.com/sirupsen/logrus"
"github.com/urfave/cli"
)
View
@@ -0,0 +1,31 @@
+package events
+
+import (
+ "context"
+ "strings"
+
+ "github.com/docker/containerd/log"
+ nats "github.com/nats-io/go-nats"
+)
+
+type natsPoster struct {
+ nec *nats.EncodedConn
+}
+
+func GetNATSPoster(nec *nats.EncodedConn) Poster {
+ return &natsPoster{nec}
+}
+
+func (p *natsPoster) Post(ctx context.Context, e Event) {
+ subject := strings.Replace(log.GetModulePath(ctx), "/", ".", -1)
+ topic := getTopic(ctx)
+ if topic != "" {
+ subject = strings.Join([]string{subject, topic}, ".")
+ }
+
+ if subject == "" {
+ log.GetLogger(ctx).WithField("event", e).Warn("unable to post event, subject is empty")
+ }
+
+ p.nec.Publish(subject, e)
+}
View
@@ -3,8 +3,8 @@ package events
import (
"context"
- "github.com/Sirupsen/logrus"
"github.com/docker/containerd/log"
+ "github.com/sirupsen/logrus"
)
var (
@@ -13,21 +13,25 @@ var (
// Poster posts the event.
type Poster interface {
- Post(event Event)
+ Post(ctx context.Context, event Event)
}
type posterKey struct{}
+func WithPoster(ctx context.Context, poster Poster) context.Context {
+ return context.WithValue(ctx, posterKey{}, poster)
+}
+
func GetPoster(ctx context.Context) Poster {
- poster := ctx.Value(ctx)
+ poster := ctx.Value(posterKey{})
if poster == nil {
logger := log.G(ctx)
tx, _ := getTx(ctx)
topic := getTopic(ctx)
// likely means we don't have a configured event system. Just return
// the default poster, which merely logs events.
- return posterFunc(func(event Event) {
+ return posterFunc(func(ctx context.Context, event Event) {
fields := logrus.Fields{"event": event}
if topic != "" {
@@ -48,8 +52,8 @@ func GetPoster(ctx context.Context) Poster {
return poster.(Poster)
}
-type posterFunc func(event Event)
+type posterFunc func(ctx context.Context, event Event)
-func (fn posterFunc) Post(event Event) {
- fn(event)
+func (fn posterFunc) Post(ctx context.Context, event Event) {
+ fn(ctx, event)
}
View
@@ -18,24 +18,26 @@ func nexttxID() int64 {
}
type transaction struct {
+ ctx context.Context
id int64
parent *transaction // if nil, no parent transaction
finish sync.Once
start, end time.Time // informational
}
// begin creates a sub-transaction.
-func (tx *transaction) begin(poster Poster) *transaction {
+func (tx *transaction) begin(ctx context.Context, poster Poster) *transaction {
id := nexttxID()
child := &transaction{
+ ctx: ctx,
id: id,
parent: tx,
start: time.Now(),
}
// post the transaction started event
- poster.Post(child.makeTransactionEvent("begin")) // tranactions are really just events
+ poster.Post(ctx, child.makeTransactionEvent("begin")) // tranactions are really just events
return child
}
@@ -44,7 +46,7 @@ func (tx *transaction) begin(poster Poster) *transaction {
func (tx *transaction) commit(poster Poster) {
tx.finish.Do(func() {
tx.end = time.Now()
- poster.Post(tx.makeTransactionEvent("commit"))
+ poster.Post(tx.ctx, tx.makeTransactionEvent("commit"))
})
}
@@ -53,7 +55,7 @@ func (tx *transaction) rollback(poster Poster, cause error) {
tx.end = time.Now()
event := tx.makeTransactionEvent("rollback")
event = fmt.Sprintf("%s error=%q", event, cause.Error())
- poster.Post(event)
+ poster.Post(tx.ctx, event)
})
}
@@ -84,7 +86,7 @@ func getTx(ctx context.Context) (*transaction, bool) {
func WithTx(pctx context.Context) (ctx context.Context, commit func(), rollback func(err error)) {
poster := G(pctx)
parent, _ := getTx(pctx)
- tx := parent.begin(poster)
+ tx := parent.begin(pctx, poster)
return context.WithValue(pctx, txKey{}, tx), func() {
tx.commit(poster)
View
@@ -1,8 +1,11 @@
package execution
+import "time"
+
type ContainerEvent struct {
- ID string
- Action string
+ Timestamp time.Time
+ ID string
+ Action string
}
type ContainerExitEvent struct {
@@ -16,6 +19,6 @@ const (
)
const (
- containerEventsSubjectFormat = "containerd.execution.container.%s"
- containerProcessEventsSubjectFormat = "containerd.execution.container.%s.%s"
+ containerEventsTopicFormat = "container.%s"
+ containerProcessEventsTopicFormat = "container.%s.%s"
)
@@ -12,7 +12,9 @@ import (
"github.com/docker/containerd/execution"
)
-var ErrRootEmpty = errors.New("oci: runtime root cannot be an empty string")
+var (
+ ErrRootEmpty = errors.New("oci: runtime root cannot be an empty string")
+)
func New(root string) (*OCIRuntime, error) {
err := SetSubreaper(1)
View
@@ -0,0 +1,19 @@
+package execution
+
+import (
+ "context"
+
+ "github.com/docker/containerd/log"
+ "github.com/sirupsen/logrus"
+)
+
+var ctx context.Context
+
+func GetLogger(module string) *logrus.Entry {
+ if ctx == nil {
+ ctx = log.WithModule(context.Background(), "execution")
+ }
+
+ subCtx := log.WithModule(ctx, module)
+ return log.GetLogger(subCtx)
+}
Oops, something went wrong.

0 comments on commit aa5ff88

Please sign in to comment.