Skip to content

Commit

Permalink
Update agent to send the node TLS information along with the node des…
Browse files Browse the repository at this point in the history
…cription,

and to restart the session if the information has changed.

Signed-off-by: cyli <ying.li@docker.com>
  • Loading branch information
cyli committed Apr 4, 2017
1 parent 4fdfee5 commit 46b0f29
Show file tree
Hide file tree
Showing 7 changed files with 545 additions and 200 deletions.
134 changes: 83 additions & 51 deletions agent/agent.go
@@ -1,6 +1,7 @@
package agent

import (
"bytes"
"fmt"
"math/rand"
"reflect"
Expand Down Expand Up @@ -44,6 +45,8 @@ type Agent struct {
stopOnce sync.Once // only allow stop to be called once
closed chan struct{} // only closed in run
err error // read only after closed is closed

nodeUpdatePeriod time.Duration
}

// New returns a new agent, ready for task dispatch.
Expand All @@ -53,14 +56,15 @@ func New(config *Config) (*Agent, error) {
}

a := &Agent{
config: config,
sessionq: make(chan sessionOperation),
started: make(chan struct{}),
leaving: make(chan struct{}),
left: make(chan struct{}),
stopped: make(chan struct{}),
closed: make(chan struct{}),
ready: make(chan struct{}),
config: config,
sessionq: make(chan sessionOperation),
started: make(chan struct{}),
leaving: make(chan struct{}),
left: make(chan struct{}),
stopped: make(chan struct{}),
closed: make(chan struct{}),
ready: make(chan struct{}),
nodeUpdatePeriod: nodeUpdatePeriod,
}

a.worker = newWorker(config.DB, config.Executor, a)
Expand Down Expand Up @@ -182,13 +186,15 @@ func (a *Agent) run(ctx context.Context) {
log.G(ctx).Debug("(*Agent).run")
defer log.G(ctx).Debug("(*Agent).run exited")

nodeTLSInfo := a.config.NodeTLSInfo

// get the node description
nodeDescription, err := a.nodeDescriptionWithHostname(ctx)
nodeDescription, err := a.nodeDescriptionWithHostname(ctx, nodeTLSInfo)
if err != nil {
log.G(ctx).WithError(err).WithField("agent", a.config.Executor).Error("agent: node description unavailable")
}
// nodeUpdateTicker is used to periodically check for updates to node description
nodeUpdateTicker := time.NewTicker(nodeUpdatePeriod)
nodeUpdateTicker := time.NewTicker(a.nodeUpdatePeriod)
defer nodeUpdateTicker.Stop()

var (
Expand All @@ -214,6 +220,35 @@ func (a *Agent) run(ctx context.Context) {

a.worker.Listen(ctx, reporter)

updateNode := func() {
// skip updating if the registration isn't finished
if registered != nil {
return
}
// get the current node description
newNodeDescription, err := a.nodeDescriptionWithHostname(ctx, nodeTLSInfo)
if err != nil {
log.G(ctx).WithError(err).WithField("agent", a.config.Executor).Error("agent: updated node description unavailable")
}

// if newNodeDescription is nil, it will cause a panic when
// trying to create a session. Typically this can happen
// if the engine goes down
if newNodeDescription == nil {
return
}

// if the node description has changed, update it to the new one
// and close the session. The old session will be stopped and a
// new one will be created with the updated description
if !reflect.DeepEqual(nodeDescription, newNodeDescription) {
nodeDescription = newNodeDescription
// close the session
log.G(ctx).Info("agent: found node update")
session.sendError(nil)
}
}

for {
select {
case operation := <-sessionq:
Expand Down Expand Up @@ -247,7 +282,7 @@ func (a *Agent) run(ctx context.Context) {
}
}
case msg := <-session.messages:
if err := a.handleSessionMessage(ctx, msg); err != nil {
if err := a.handleSessionMessage(ctx, msg, nodeTLSInfo); err != nil {
log.G(ctx).WithError(err).Error("session message handler failed")
}
case sub := <-session.subscriptions:
Expand Down Expand Up @@ -305,33 +340,17 @@ func (a *Agent) run(ctx context.Context) {
}
session = newSession(ctx, a, delay, session.sessionID, nodeDescription)
registered = session.registered
case <-nodeUpdateTicker.C:
// skip this case if the registration isn't finished
if registered != nil {
continue
}
// get the current node description
newNodeDescription, err := a.nodeDescriptionWithHostname(ctx)
if err != nil {
log.G(ctx).WithError(err).WithField("agent", a.config.Executor).Error("agent: updated node description unavailable")
}

// if newNodeDescription is nil, it will cause a panic when
// trying to create a session. Typically this can happen
// if the engine goes down
if newNodeDescription == nil {
continue
}

// if the node description has changed, update it to the new one
// and close the session. The old session will be stopped and a
// new one will be created with the updated description
if !reflect.DeepEqual(nodeDescription, newNodeDescription) {
nodeDescription = newNodeDescription
// close the session
log.G(ctx).Info("agent: found node update")
session.sendError(nil)
case ev := <-a.config.NotifyTLSChange:
// the TLS info has changed, so force a check to see if we need to restart the session
if tlsInfo, ok := ev.(*api.NodeTLSInfo); ok {
nodeTLSInfo = tlsInfo
updateNode()
nodeUpdateTicker.Stop()
nodeUpdateTicker = time.NewTicker(a.nodeUpdatePeriod)
}
case <-nodeUpdateTicker.C:
// periodically check to see whether the node information has changed, and if so, restart the session
updateNode()
case <-a.stopped:
// TODO(stevvooe): Wait on shutdown and cleanup. May need to pump
// this loop a few times.
Expand All @@ -347,7 +366,7 @@ func (a *Agent) run(ctx context.Context) {
}
}

func (a *Agent) handleSessionMessage(ctx context.Context, message *api.SessionMessage) error {
func (a *Agent) handleSessionMessage(ctx context.Context, message *api.SessionMessage, nti *api.NodeTLSInfo) error {
seen := map[api.Peer]struct{}{}
for _, manager := range message.Managers {
if manager.Peer.Addr == "" {
Expand All @@ -358,18 +377,28 @@ func (a *Agent) handleSessionMessage(ctx context.Context, message *api.SessionMe
seen[*manager.Peer] = struct{}{}
}

if message.Node != nil {
if a.node == nil || !nodesEqual(a.node, message.Node) {
if a.config.NotifyNodeChange != nil {
a.config.NotifyNodeChange <- message.Node.Copy()
}
a.node = message.Node.Copy()
if err := a.config.Executor.Configure(ctx, a.node); err != nil {
log.G(ctx).WithError(err).Error("node configure failed")
}
var changes *NodeChanges
if message.Node != nil && (a.node == nil || !nodesEqual(a.node, message.Node)) {
if a.config.NotifyNodeChange != nil {
changes = &NodeChanges{Node: message.Node.Copy()}
}
a.node = message.Node.Copy()
if err := a.config.Executor.Configure(ctx, a.node); err != nil {
log.G(ctx).WithError(err).Error("node configure failed")
}
}
if len(message.RootCA) > 0 && !bytes.Equal(message.RootCA, nti.TrustRoot) {
if changes == nil {
changes = &NodeChanges{RootCert: message.RootCA}
} else {
changes.RootCert = message.RootCA
}
}

if changes != nil {
a.config.NotifyNodeChange <- changes
}

// prune managers not in list.
for peer := range a.config.ConnBroker.Remotes().Weights() {
if _, ok := seen[peer]; !ok {
Expand Down Expand Up @@ -517,12 +546,15 @@ func (a *Agent) Publisher(ctx context.Context, subscriptionID string) (exec.LogP
}

// nodeDescriptionWithHostname retrieves node description, and overrides hostname if available
func (a *Agent) nodeDescriptionWithHostname(ctx context.Context) (*api.NodeDescription, error) {
func (a *Agent) nodeDescriptionWithHostname(ctx context.Context, tlsInfo *api.NodeTLSInfo) (*api.NodeDescription, error) {
desc, err := a.config.Executor.Describe(ctx)

// Override hostname
if a.config.Hostname != "" && desc != nil {
desc.Hostname = a.config.Hostname
// Override hostname and TLS info
if desc != nil {
if a.config.Hostname != "" && desc != nil {
desc.Hostname = a.config.Hostname
}
desc.TLSInfo = tlsInfo
}
return desc, err
}
Expand Down

0 comments on commit 46b0f29

Please sign in to comment.