Skip to content

Commit

Permalink
Changed ReapTable logic
Browse files Browse the repository at this point in the history
- Changed the loop per network. Previous implementation was taking a
  ReadLock to update the reapTime but now with the residualReapTime
  also the bulkSync is using the same ReadLock creating possible
  issues in concurrent read and update of the value.
  The new logic fetches the list of networks and proceed to the
  cleanup network by network locking the database and releasing it
  after each network. This should ensure a fair locking avoiding
  to keep the database blocked for too much time.

  Note: The ticker does not guarantee that the reap logic runs
  precisely every reapTimePeriod, actually documentation says that
  if the routine is too long will skip ticks. In case of slowdown
  of the process itself it is possible that the lifetime of the
  deleted entries increases, it still should not be a huge problem
  because now the residual reaptime is propagated among all the nodes
  a slower node will let the deleted entry being repropagate multiple
  times but the state will still remain consistent.

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
  • Loading branch information
Flavio Crisciani committed Sep 20, 2017
1 parent 7a0e447 commit b388729
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 57 deletions.
74 changes: 40 additions & 34 deletions networkdb/cluster.go
Expand Up @@ -321,43 +321,49 @@ func (nDB *NetworkDB) reapNetworks() {
}

func (nDB *NetworkDB) reapTableEntries() {
var paths []string

var nodeNetworks []string
// This is best effort, if the list of network changes will be picked up in the next cycle
nDB.RLock()
nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
entry, ok := v.(*entry)
if !ok {
return false
}

if !entry.deleting {
return false
}
if entry.reapTime > 0 {
entry.reapTime -= reapPeriod
return false
}
paths = append(paths, path)
return false
})
for nid := range nDB.networks[nDB.config.NodeName] {
nodeNetworks = append(nodeNetworks, nid)
}
nDB.RUnlock()

nDB.Lock()
for _, path := range paths {
params := strings.Split(path[1:], "/")
tname := params[0]
nid := params[1]
key := params[2]

okTable, okNetwork := nDB.deleteEntry(nid, tname, key)
if !okTable {
logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
}
if !okNetwork {
logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
}
cycleStart := time.Now()
// In order to avoid blocking the database for a long time, apply the garbage collection logic by network
// The lock is taken at the beginning of the cycle and the deletion is inline
for _, nid := range nodeNetworks {
nDB.Lock()
nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), func(path string, v interface{}) bool {
// timeCompensation compensate in case the lock took some time to be released
timeCompensation := time.Since(cycleStart)
entry, ok := v.(*entry)
if !ok || !entry.deleting {
return false
}

if entry.reapTime > reapPeriod+timeCompensation {
entry.reapTime -= reapPeriod + timeCompensation
return false
}

params := strings.Split(path[1:], "/")
nid := params[0]
tname := params[1]
key := params[2]

okTable, okNetwork := nDB.deleteEntry(nid, tname, key)
if !okTable {
logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
}
if !okNetwork {
logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
}

return false
})
nDB.Unlock()
}
nDB.Unlock()
}

func (nDB *NetworkDB) gossip() {
Expand Down Expand Up @@ -406,7 +412,7 @@ func (nDB *NetworkDB) gossip() {
// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
network.qMessagesSent += len(msgs)
if printStats {
logrus.Infof("NetworkDB stats - net:%s Entries:%d Queue qLen:%d netPeers:%d netMsg/s:%d",
logrus.Infof("NetworkDB stats - net:%s Entries:%d Queue qLen:%d netPeers:%d netMsg/s:%d",
nid, network.entriesNumber, broadcastQ.NumQueued(), broadcastQ.NumNodes(),
network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
network.qMessagesSent = 0
Expand Down
8 changes: 4 additions & 4 deletions networkdb/delegate.go
Expand Up @@ -238,8 +238,8 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
}

// All the entries marked for deletion should have a reapTime set greater than 0
// This case can happens if the cluster is running different versions of the engine where the old version does not have the
// field. In both cases we should raise a warning message
// This case can happen if the cluster is running different versions of the engine where the old version does not have the
// field. If that is not the case, this can be a BUG
if e.deleting && e.reapTime == 0 {
logrus.Warnf("handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?", tEvent)
e.reapTime = reapInterval
Expand All @@ -251,9 +251,9 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {

if err != nil && tEvent.Type == TableEventTypeDelete {
// If it is a delete event and we did not have a state for it, don't propagate to the application
// If the residual reapTime is lower than 1/6 of the total reapTime don't bother broadcasting it around
// If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around
// most likely the cluster is already aware of it, if not who will sync with this node will catch the state too.
return e.reapTime >= reapPeriod/6
return e.reapTime > reapPeriod/6
}

var op opType
Expand Down
42 changes: 29 additions & 13 deletions networkdb/networkdb.go
Expand Up @@ -473,7 +473,7 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {

entry := &entry{
ltime: oldEntry.ltime,
node: node,
node: oldEntry.node,
value: oldEntry.value,
deleting: true,
reapTime: reapInterval,
Expand Down Expand Up @@ -555,14 +555,26 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
nodeNetworks = make(map[string]*network)
nDB.networks[nDB.config.NodeName] = nodeNetworks
}
nodeNetworks[nid] = &network{id: nid, ltime: ltime}
nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
nDB.RLock()
defer nDB.RUnlock()
return len(nDB.networkNodes[nid])
},
RetransmitMult: 4,
n, ok := nodeNetworks[nid]
if !ok {
n = &network{
id: nid,
ltime: ltime,
tableBroadcasts: &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
nDB.RLock()
defer nDB.RUnlock()
return len(nDB.networkNodes[nid])
},
RetransmitMult: 4,
},
}
nodeNetworks[nid] = n
} else {
// The network object is already present, it is possible that is still waiting to be garbage collected
n.leaving = false
n.ltime = ltime
n.reapTime = 0
}
nDB.addNetworkNode(nid, nDB.config.NodeName)
networkNodes := nDB.networkNodes[nid]
Expand Down Expand Up @@ -684,8 +696,10 @@ func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interfac
_, okNetwork := nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
if !okNetwork {
// Add only if it is an insert not an update
n := nDB.networks[nDB.config.NodeName][nid]
n.entriesNumber++
n, ok := nDB.networks[nDB.config.NodeName][nid]
if ok {
n.entriesNumber++
}
}
return okTable, okNetwork
}
Expand All @@ -697,8 +711,10 @@ func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (bool, bool) {
_, okNetwork := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
if okNetwork {
// Remove only if the delete is successful
n := nDB.networks[nDB.config.NodeName][nid]
n.entriesNumber--
n, ok := nDB.networks[nDB.config.NodeName][nid]
if ok {
n.entriesNumber--
}
}
return okTable, okNetwork
}
6 changes: 6 additions & 0 deletions networkdb/networkdb_test.go
Expand Up @@ -473,6 +473,9 @@ func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) {
if len(dbs[0].networkNodes["network1"]) != 2 {
t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[0].networkNodes["network1"]), dbs[0].networkNodes["network1"])
}
if n, ok := dbs[0].networks[dbs[0].config.NodeName]["network1"]; !ok || n.leaving {
t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
}

// Wait for the propagation on db[1]
for i := 0; i < maxRetry; i++ {
Expand All @@ -484,6 +487,9 @@ func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) {
if len(dbs[1].networkNodes["network1"]) != 2 {
t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[1].networkNodes["network1"]), dbs[1].networkNodes["network1"])
}
if n, ok := dbs[1].networks[dbs[1].config.NodeName]["network1"]; !ok || n.leaving {
t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
}

// Try a quick leave/join
err = dbs[0].LeaveNetwork("network1")
Expand Down
2 changes: 1 addition & 1 deletion test/networkDb/README
@@ -1,7 +1,7 @@
SERVER

cd test/networkdb
env GOOS=linux go build -v server/testMain.go && docker build -t fcrisciani/networkdb-test .
env GOOS=linux go build -v testMain.go && docker build -t fcrisciani/networkdb-test .
(only for testkit case) docker push fcrisciani/networkdb-test

Run server: docker service create --name testdb --network net1 --replicas 3 --env TASK_ID="{{.Task.ID}}" -p mode=host,target=8000 fcrisciani/networkdb-test server 8000
Expand Down
10 changes: 5 additions & 5 deletions test/networkDb/dbclient/ndbClient.go
Expand Up @@ -567,7 +567,7 @@ func doWriteWaitLeaveJoin(ips []string, args []string) {
tableName := args[1]
parallelWriters, _ := strconv.Atoi(args[2])
writeTimeSec, _ := strconv.Atoi(args[3])
parallerlLeaver, _ := strconv.Atoi(args[4])
parallelLeaver, _ := strconv.Atoi(args[4])

// Start parallel writers that will create and delete unique keys
doneCh := make(chan resultTuple, parallelWriters)
Expand All @@ -586,23 +586,23 @@ func doWriteWaitLeaveJoin(ips []string, args []string) {

keysExpected := keyMap[totalWrittenKeys]
// The Leavers will leave the network
for i := 0; i < parallerlLeaver; i++ {
for i := 0; i < parallelLeaver; i++ {
logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i])
go leaveNetwork(ips[i], servicePort, networkName, doneCh)
// Once a node leave all the keys written previously will be deleted, so the expected keys will consider that as removed
keysExpected -= keyMap[ips[i]]
}
waitWriters(parallerlLeaver, false, doneCh)
waitWriters(parallelLeaver, false, doneCh)

// Give some time
time.Sleep(100 * time.Millisecond)

// The writers will join the network
for i := 0; i < parallerlLeaver; i++ {
for i := 0; i < parallelLeaver; i++ {
logrus.Infof("worker joinNetwork: %d on IP:%s", i, ips[i])
go joinNetwork(ips[i], servicePort, networkName, doneCh)
}
waitWriters(parallerlLeaver, false, doneCh)
waitWriters(parallelLeaver, false, doneCh)

// check table entries for 2 minutes
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
Expand Down

0 comments on commit b388729

Please sign in to comment.