Skip to content

Commit

Permalink
Add missing locks in agent and service code
Browse files Browse the repository at this point in the history
Signed-off-by: Alessandro Boch <aboch@docker.com>
  • Loading branch information
aboch committed Nov 29, 2016
1 parent dd0ddde commit 8dcf996
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 64 deletions.
137 changes: 87 additions & 50 deletions agent.go
Expand Up @@ -7,6 +7,7 @@ import (
"net"
"os"
"sort"
"sync"

"github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/stringid"
Expand Down Expand Up @@ -39,6 +40,7 @@ type agent struct {
advertiseAddr string
epTblCancel func()
driverCancelFuncs map[string][]func()
sync.Mutex
}

func getBindAddr(ifaceName string) (string, error) {
Expand Down Expand Up @@ -86,9 +88,16 @@ func resolveAddr(addrOrInterface string) (string, error) {
func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error {
drvEnc := discoverapi.DriverEncryptionUpdate{}

a := c.agent
a := c.getAgent()
if a == nil {
logrus.Debug("Skipping key change as agent is nil")
return nil
}

// Find the deleted key. If the deleted key was the primary key,
// a new primary key should be set before removing if from keyring.
c.Lock()
added := []byte{}
deleted := []byte{}
j := len(c.keys)
for i := 0; i < j; {
Expand Down Expand Up @@ -127,7 +136,7 @@ func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error {
if !same {
c.keys = append(c.keys, key)
if key.Subsystem == subsysGossip {
a.networkDB.SetKey(key.Key)
added = key.Key
}

if key.Subsystem == subsysIPSec {
Expand All @@ -136,6 +145,11 @@ func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error {
}
}
}
c.Unlock()

if len(added) > 0 {
a.networkDB.SetKey(added)
}

key, tag, err := c.getPrimaryKeyTag(subsysGossip)
if err != nil {
Expand Down Expand Up @@ -166,8 +180,10 @@ func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error {
}

func (c *controller) agentSetup() error {
c.Lock()
clusterProvider := c.cfg.Daemon.ClusterProvider

agent := c.agent
c.Unlock()
bindAddr := clusterProvider.GetLocalAddress()
advAddr := clusterProvider.GetAdvertiseAddress()
remote := clusterProvider.GetRemoteAddress()
Expand All @@ -176,7 +192,7 @@ func (c *controller) agentSetup() error {
listenAddr, _, _ := net.SplitHostPort(listen)

logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Remote-addr =%s", listenAddr, bindAddr, advAddr, remoteAddr)
if advAddr != "" && c.agent == nil {
if advAddr != "" && agent == nil {
if err := c.agentInit(listenAddr, bindAddr, advAddr); err != nil {
logrus.Errorf("Error in agentInit : %v", err)
} else {
Expand Down Expand Up @@ -208,6 +224,9 @@ func (c *controller) agentSetup() error {
// For a given subsystem getKeys sorts the keys by lamport time and returns
// slice of keys and lamport time which can used as a unique tag for the keys
func (c *controller) getKeys(subsys string) ([][]byte, []uint64) {
c.Lock()
defer c.Unlock()

sort.Sort(ByTime(c.keys))

keys := [][]byte{}
Expand All @@ -227,6 +246,8 @@ func (c *controller) getKeys(subsys string) ([][]byte, []uint64) {
// getPrimaryKeyTag returns the primary key for a given subsystem from the
// list of sorted key and the associated tag
func (c *controller) getPrimaryKeyTag(subsys string) ([]byte, uint64, error) {
c.Lock()
defer c.Unlock()
sort.Sort(ByTime(c.keys))
keys := []*types.EncryptionKey{}
for _, key := range c.keys {
Expand Down Expand Up @@ -265,13 +286,15 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr st

ch, cancel := nDB.Watch("endpoint_table", "", "")

c.Lock()
c.agent = &agent{
networkDB: nDB,
bindAddr: bindAddr,
advertiseAddr: advertiseAddr,
epTblCancel: cancel,
driverCancelFuncs: make(map[string][]func()),
}
c.Unlock()

go c.handleTableEvents(ch, c.handleEpTableEvent)

Expand All @@ -294,21 +317,22 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr st
}

func (c *controller) agentJoin(remote string) error {
if c.agent == nil {
agent := c.getAgent()
if agent == nil {
return nil
}

return c.agent.networkDB.Join([]string{remote})
return agent.networkDB.Join([]string{remote})
}

func (c *controller) agentDriverNotify(d driverapi.Driver) {
if c.agent == nil {
agent := c.getAgent()
if agent == nil {
return
}

d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{
Address: c.agent.advertiseAddr,
BindAddress: c.agent.bindAddr,
Address: agent.advertiseAddr,
BindAddress: agent.bindAddr,
Self: true,
})

Expand Down Expand Up @@ -339,11 +363,19 @@ func (c *controller) agentClose() {
return
}

var cancelList []func()

agent.Lock()
for _, cancelFuncs := range agent.driverCancelFuncs {
for _, cancel := range cancelFuncs {
cancel()
cancelList = append(cancelList, cancel)
}
}
agent.Unlock()

for _, cancel := range cancelList {
cancel()
}

agent.epTblCancel()

Expand All @@ -354,31 +386,33 @@ func (n *network) isClusterEligible() bool {
if n.driverScope() != datastore.GlobalScope {
return false
}

c := n.getController()
if c.agent == nil {
return false
}

return true
return n.getController().getAgent() != nil
}

func (n *network) joinCluster() error {
if !n.isClusterEligible() {
return nil
}

c := n.getController()
return c.agent.networkDB.JoinNetwork(n.ID())
agent := n.getController().getAgent()
if agent == nil {
return nil
}

return agent.networkDB.JoinNetwork(n.ID())
}

func (n *network) leaveCluster() error {
if !n.isClusterEligible() {
return nil
}

c := n.getController()
return c.agent.networkDB.LeaveNetwork(n.ID())
agent := n.getController().getAgent()
if agent == nil {
return nil
}

return agent.networkDB.LeaveNetwork(n.ID())
}

func (ep *endpoint) addDriverInfoToCluster() error {
Expand All @@ -390,10 +424,7 @@ func (ep *endpoint) addDriverInfoToCluster() error {
return nil
}

ctrlr := n.ctrlr
ctrlr.Lock()
agent := ctrlr.agent
ctrlr.Unlock()
agent := n.getController().getAgent()
if agent == nil {
return nil
}
Expand All @@ -415,10 +446,7 @@ func (ep *endpoint) deleteDriverInfoFromCluster() error {
return nil
}

ctrlr := n.ctrlr
ctrlr.Lock()
agent := ctrlr.agent
ctrlr.Unlock()
agent := n.getController().getAgent()
if agent == nil {
return nil
}
Expand All @@ -438,6 +466,7 @@ func (ep *endpoint) addServiceInfoToCluster() error {
}

c := n.getController()
agent := c.getAgent()
if !ep.isAnonymous() && ep.Iface().Address() != nil {
var ingressPorts []*PortConfig
if ep.svcID != "" {
Expand Down Expand Up @@ -466,8 +495,10 @@ func (ep *endpoint) addServiceInfoToCluster() error {
return err
}

if err := c.agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), buf); err != nil {
return err
if agent != nil {
if err := agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), buf); err != nil {
return err
}
}
}

Expand All @@ -481,6 +512,8 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error {
}

c := n.getController()
agent := c.getAgent()

if !ep.isAnonymous() {
if ep.svcID != "" && ep.Iface().Address() != nil {
var ingressPorts []*PortConfig
Expand All @@ -492,9 +525,10 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error {
return err
}
}

if err := c.agent.networkDB.DeleteEntry("endpoint_table", n.ID(), ep.ID()); err != nil {
return err
if agent != nil {
if err := agent.networkDB.DeleteEntry("endpoint_table", n.ID(), ep.ID()); err != nil {
return err
}
}
}
return nil
Expand All @@ -506,24 +540,23 @@ func (n *network) addDriverWatches() {
}

c := n.getController()
agent := c.getAgent()
if agent == nil {
return
}
for _, tableName := range n.driverTables {
c.Lock()
if c.agent == nil {
c.Unlock()
return
}
ch, cancel := c.agent.networkDB.Watch(tableName, n.ID(), "")
c.agent.driverCancelFuncs[n.ID()] = append(c.agent.driverCancelFuncs[n.ID()], cancel)
c.Unlock()

ch, cancel := agent.networkDB.Watch(tableName, n.ID(), "")
agent.Lock()
agent.driverCancelFuncs[n.ID()] = append(agent.driverCancelFuncs[n.ID()], cancel)
agent.Unlock()
go c.handleTableEvents(ch, n.handleDriverTableEvent)
d, err := n.driver(false)
if err != nil {
logrus.Errorf("Could not resolve driver %s while walking driver tabl: %v", n.networkType, err)
return
}

c.agent.networkDB.WalkTable(tableName, func(nid, key string, value []byte) bool {
agent.networkDB.WalkTable(tableName, func(nid, key string, value []byte) bool {
if nid == n.ID() {
d.EventNotify(driverapi.Create, nid, tableName, key, value)
}
Expand All @@ -538,11 +571,15 @@ func (n *network) cancelDriverWatches() {
return
}

c := n.getController()
c.Lock()
cancelFuncs := c.agent.driverCancelFuncs[n.ID()]
delete(c.agent.driverCancelFuncs, n.ID())
c.Unlock()
agent := n.getController().getAgent()
if agent == nil {
return
}

agent.Lock()
cancelFuncs := agent.driverCancelFuncs[n.ID()]
delete(agent.driverCancelFuncs, n.ID())
agent.Unlock()

for _, cancel := range cancelFuncs {
cancel()
Expand Down
11 changes: 9 additions & 2 deletions controller.go
Expand Up @@ -237,12 +237,13 @@ func New(cfgOptions ...config.Option) (NetworkController, error) {

func (c *controller) SetClusterProvider(provider cluster.Provider) {
c.Lock()
defer c.Unlock()
c.cfg.Daemon.ClusterProvider = provider
disableProviderCh := c.cfg.Daemon.DisableProvider
c.Unlock()
if provider != nil {
go c.clusterAgentInit()
} else {
c.cfg.Daemon.DisableProvider <- struct{}{}
disableProviderCh <- struct{}{}
}
}

Expand Down Expand Up @@ -295,6 +296,12 @@ func (c *controller) SetKeys(keys []*types.EncryptionKey) error {
return c.handleKeyChange(keys)
}

func (c *controller) getAgent() *agent {
c.Lock()
defer c.Unlock()
return c.agent
}

func (c *controller) clusterAgentInit() {
clusterProvider := c.cfg.Daemon.ClusterProvider
for {
Expand Down
13 changes: 4 additions & 9 deletions network.go
Expand Up @@ -1485,17 +1485,12 @@ func (n *network) Peers() []networkdb.PeerInfo {
return []networkdb.PeerInfo{}
}

var nDB *networkdb.NetworkDB
n.ctrlr.Lock()
if n.ctrlr.agentInitDone == nil && n.ctrlr.agent != nil {
nDB = n.ctrlr.agent.networkDB
agent := n.getController().getAgent()
if agent == nil {
return []networkdb.PeerInfo{}
}
n.ctrlr.Unlock()

if nDB != nil {
return n.ctrlr.agent.networkDB.Peers(n.id)
}
return []networkdb.PeerInfo{}
return agent.networkDB.Peers(n.ID())
}

func (n *network) DriverOptions() map[string]string {
Expand Down

0 comments on commit 8dcf996

Please sign in to comment.